In the ingestion phase data went through the front door to enter our domain. In the domain area we want to process the data in case they mean something for our components, ignore them if not relevant or raise exception if the data are relevant but something is not right.
In the previous phase, the ingestion, the primary patterns are Publish Subscribe and Competing Consumers to make that step scalable . In this phase, the processing, we can use Event Sourcing or CRUD depending on requirements.
How we decide which pattern is good for the job is a tricky challenge. From a technical point of view, as a software architect or a developer required to… “just do it”, the temptation to go down the quick and dirty path or as opposite go down the cool and innovative solution are both there like devil and angel. Which one is what for you can depend by what you are use to do. An important aspect here is to be driven by the business requirement. At the end of the day, we get our salary only if we give back business value, right? But still, just listening business stakeholders and their wish to have the “thing” up and running quick is not always right.
A good question to ask during the initial discovery phase with the business is: is there a value for you if we keep the history of data? That way you can see the changes over time and eventually use it like a time machine to compare the state of that Policy today and last month for example.
If they react with joy, then Event Sourcing is a good candidate. If they don’t care, then maybe you can use CRUD to manage the current state and just use Pub/Sub messaging to communicate with other components. A message broker like Kafka, Kinesis, Azure Event Hub or the good olds NServiceBus, RabbitMq they can all do the job.
If you have decided to do Event Sourcing then what you need is a little toolbox and a solid Event Store. I usually use Event Store as it is a stream database built for Event Sourcing and the features that you need are all there. The toolbox that you need to help building the Event Sourced components is NOT a framework. In your toolbox there is the need only for a good Repository and a couple of types to mark messages as Commands or Events.
Years ago I implemented for my personal use a Repository and I call it Evento. You can find it here https://github.com/riccardone/Evento . It it usable as a nuget package but my suggestion is not to attach a dependency to it from your components. Just copy and paste the code in your solution from a Domain component to another. It’s lightweight and you can tailor made the features that you need from time to time.
I follow what Greg Young and other good inspiring people said here: the principle Don’t Repeat Yourself (DRY) is valid within a component. Don’t share code across components but just use copy and paste.
Before start looking at the tech stuff, we need to scratch the surface of the business requirement to model what will form the basic flow of commands and events in our Event Sourced component. Alberto Brandolini has defined a technique called Event Storming. To apply that, in my very simple way, set up a meeting in a room with a large whiteboard and bring with you sticky cards of different colours.
Usually Commands and Events are the important bits in it. They are messages with different meaning. A Command express the intent in its name to make a change while an Event is a declaration that something has changed and it carry the relevant changed data.
Moving down from the top level domain modelling, when I start implementing a Domain Component I usually shape it using the same parts, no matter what is the business requirement. Once that the mechanics between the parts is there then I start implementing the core behaviours inside one or more Domain Aggregates.
The parts composing an Event Sourced components in the way that I like at least are the following 3:
- The Host. This is the physical process that run the specific Domain Component. Another term to call it is a Domain Microservice. In that case is a Domain one as you can also have Microservices on the ingestion phase and on the output phase. There is no logic here. It’s just a shell that reference the other parts (adapter, core domain, other services). It is responsible to do one simple thing: injects any required dependencies in all the other parts and start the service operation. It could be a command line program ready to be wrapped in a Docker container.
- The Adapter. This is the layer of the onion where you subscribe to the input ingestion streams. When a message come through the ingestion end up in your handling function. Here you look for a mapping to adapt/convert the external message to well know internal Command. A weak schema technique is my favourite here. With that command, you use a DomainRepository to Get your Aggregate from the Event Store using an Id. Your DomainRepository must be able to look into the Event Store and if it found the Aggregate reapply all the related events to rebuild its current state. Then you send the command to the related function exposed by your Aggregate. At the end of the execution the Aggregate function will eventually raise one or more Domain Events. At this point your DomainRepository from the Adapter will save the Events calling a simple Set(aggregate) or Set(uncommitted-events)
- The Core Domain. This is the inner part of the onion. Here you usually have one class or module exposing the behaviours required to execute the business requirement. Following the Domain Driven Design jargon we can call that class or module… the Aggregate. In case your logic requires external data to be read in order to calculate something or take decision, then your functions must require these data other than the command.
All the modern messaging bus offer the possibility to horizontally scale your processing units alongside with your ingestion channels.
Scale the ingestion channel
Data producers publish messages into an ingestion channel. Using Kafka terminology that is defined a topic. A topic can be further split in partitions and each partition hosted in separate broker nodes. This technique can be also defined as Sharding where a partition is in fact a shard of the topic. The Data Producer, the client, then send messages to one specific partition for that topic.
Using Event Store as message broker, the ingestion channel is called stream. With the current version it is not possible to horizontally scale the ingestion like you do with Kafka, Kinesis, Azure Event Hub or RabbitMq. Sharding is not there yet but it’s not difficult to implement if you consider each Event Store instance a shard and your clients send data picking connections from the available shards.
Scale processing with Competing Consumers
Once that the data are in the ingestion channel then on the other side you have groups of consumers. Each group received at least once each message. this part is similar with all the above messaging system Event Store included. In Event Store if you want to scale your processing with Competing Consumers then you can use the Persistant Subscription model.
The characteristic of Competing Consumer pattern is that there is an offset recorded on the server for each group of consumer that keep the position of the last processed message. You need to deal with the possibility of handling twice the same message as the guarantee is not At Most Once but instead is At Least Once.
If you need to scale your processing then having multiple consumers competing in the same group can pose an interesting challenge called Temporal Decoupling.
The problem is caused by the fact that you don’t know how long each consumers will take to process a message and therefore you can have message A to be processed after message B because the 2 consumers are handling these messages in parallel.
To deal with this situation whenever the order is important for the business process, you can decouple the time of the processing from the time of the business event. With Event Store you can set in metadata an “applies” key that represent the business time of the Event to be used when anyone rebuild the state reapplying the past events.
In case that is not enough and you must process message A before message B, you can reduce the occurrence of this using a special consumer strategy called Pinned. This will not ensure that subsequent messages are handled by the same consumer. For that reason I usually leave the default Competing Consumer Strategy (round robin) and my processing logic will “park” message B if it is not yet the time for processing it, for example if it is received before message A and then later replay the parked events. You can eventually set a configurable interval here to rep;ay parked messages.