Event Sourcing · Software Development

Event Sourcing Step-by-step

This is a draft public available to get feedback while I’m writing it. It will change day by day in the next coming weeks.

In this guide I will go through an E-Commerce scenario in order to show how to combine Publish Subscribe, Event Sourcing and CQRS to build loosely couple Microservices.

Our business requirement is to receive and process orders from a Web UI and present back to the user data and reports.

Event Sourcing as I see it works in the following way
An initial message or event knock the input door from the external world…

  • it is converted to a Command
  • passed to an function to be processed
  • one or more domain Events containing the results are saved inside streams in order

If another component or system is interested to these data, it must be possible subscribe the events and synchronise/denormalise them in order to another cache/db
If the other component need to react as soon as the above domain Events are raised, some logic need to be in place to eventually start from #1 or not depending if that has already happened (idempotency).

The confusion that I can see around the basics is that many people use to create a chain of processing between components just sending events around through buses like the above initial message.

We will build our components using a mix of languages and platform with Node.Js+Express, C# and Java. This approach is to show how we can be platform independent having EventStore at the core of our architecture.

A key point of this article is to define an approach to reuse for most of requirements. The part that change from requirement to requirement is the behaviours inside the Aggregate and the handling functions that can make use of an application service or another. But the surrounding ‘mechanics’ that define the flow of messages and the different ways we handle them thought the 3 steps remains the same and it’s reusable. On each new requirement you will be able to copy and paste most of the repetitive code and like a sculptor you’ll get it better and better on each round.

Lesson 1: Getting Started

We are building a solution to process and store orders, payments and shipping. These slices of data will form together our solution like a cake. In DDD Jargon each part is an Aggregate and the whole solution could be a Bounded Context. We will implement the data ingestion as a gateway to send the data to our processing units. On the other side there will be our downstream components protecting each Aggregate. Each component can be represented as a bubble. If we look at one of these bubbles in the middle of our architecture, we can see different layer that like an onion protect the inner Aggregate. There is the host process responsible to keep the service running and inject the concrete dependencies. There is the Application Service where the external messages will eventually be mapped to well know Commands to be sent to the Aggregates. There is the inner Aggregate exposing all the behaviors representing our business feature.

In other words, our Processing Components handles commands and raise Domain Events. These events will be stored in Event Store streams following the Event Sourcing pattern.

When I try to visualize the flow of data through this Architecture I can see 3 basic steps

3 Steps Architecture

That pattern is very well reusable. It can be applied to all different requirements and you can re-use 90% of it each time. Each time you re-use it for a different requirement you can copy & paste almost all the generic code that form the mechanics like the Api, the adapter, the synchroniser and just focus on the very specific behaviours of your new use case. Each time you will improve it slightly, adapting it to the new requirement or simplifying part of it. These 3 components working together for a specific requirement remind me the elementary particles that work withing an atom.

From a physical point of view, Each step can be implemented with different Microservices. These Microservices receive and produces messages. These messages can be described as Commands and Events.

Event Store can be used at each step like a Swiss knife. For the data input, the api will publish messages to Event Store streams with a simple Pub/Sub pattern. For the processing, the Downstream Processing Components will subscribe from the input streams and then save the resulted Domain Events in Event Store durable streams. For the Data Output, the Output Adapters or Synchronizers will be notified with any relevant Domain Event to keep up-to-date a Read Model.

The same architecture exploded with an example of data sources and read models

3 Steps Architecture with elements

Lesson 2: Data Input

This section of our solution expose one or more endpoints to the external world. This play the role of gateway to allow external actors (Web or Mobile Applications, any message sender or ETL processes) to send data to our Processing Components. At this stage we don’t want to run any particular logic. This part will only publish raw data messages to input streams in Event Store. The pattern here is a simple Publish-Subscribe. On the other side there will be one or more group of consumers subscribing these input streams. That step is important in order to be able to horizontally scale our processing units using the Persistent Subscription model provided by Event Store implementing the Competing Consumers pattern.

One of the key points here is to let the external actors to push data in any format or shape. The conversion to something that we can understand will happen later, on the other side, in one of the Adapters that play the role of Anti Corruption Layers.

Let’s create this service using Node.Js + Express. We will use Yarn as package manager. We will use ‘node-eventstore-client’ as javascript EventStore client.

  1. Create a folder ‘shop-api’
  2. From a command line CD the project, init and add the following dependencies:
    1. yarn init
    2. yarn add express
    3. yarn add node-eventstore-client
    4. yarn add body-parser
    5. yarn add uuid
  3. Add the following javascript files

index.js

const express = require('express')
var bodyParser = require("body-parser")
var esClient = require('node-eventstore-client')
var ConnectionFactory = require('./connectionFactory')
var Sender = require('./sender')
var _validation = require('./validation')
const uuidv4 = require('uuid/v4')
var _connFactory = new ConnectionFactory(esClient, "admin", "changeit", "tcp://eventstore:1113", "shop-api")
var _sender = new Sender(_connFactory, "shop-input", uuidv4)

const app = express()
app.use(bodyParser.json())
const port = 3000

app.get('/', (req, res) => res.send('ok'))

app.post("/shop/api/v1", function (req, res) {
if (_validation.requestNotValid(req)) {
return res.status(400).send("Request body not valid");
}
_sender.send(req.body, "ShopDataReceived").then(function (result) {
res.send();
}).catch(error => {
console.error(error.message || error);
res
.status(500)
.send("There is a technical problem and the data has not been stored");
});
})

app.listen(port, () => console.log(`EventStore Shop-Api service listening on port ${port}!`))

sender.js

 module.exports = function (connFactory, publishTo, uuidv4) {
var _connFactory = connFactory
var _conn = connFactory.createEsConnection()
var _publishTo = publishTo || 'data-input'

var send = function (message, eventType, publishTo) {
if (!publishTo) {
publishTo = _publishTo;
}
var eventData = toEventData(message)
var event = _connFactory.createJsonEventData(eventData.eventId, eventData.data, eventData.metadata, eventType)
return _conn.appendToStream(publishTo, esClient.expectedVersion.any, event)
}

var toEventData = function (msg) {
var applies = msg.applies
var source = msg.source
delete msg.profile
delete msg.applies
delete msg.source
var eventData = {
eventId: uuidv4(),
data: msg,
metadata: { 'Applies': applies, 'Source': source }
}
return eventData
}

return { send: send }
}

connectionFactory.js

 module.exports = function (esClient, user, password, link, name) {
var createEsConnection = () => {
var esConnection = esClient.createConnection({ user: password }, link, name)
esConnection.on('error', err =>
console.error(`Error occurred on connection: ${err}`)
)
esConnection.on('closed', (reason) => {
console.error(`Connection closed, reason: ${reason}`)
console.error('dying...')
process.exit(1)
})
esConnection.once('connected', (tcpEndPoint) => {
console.info('Connected to eventstore at ' + tcpEndPoint.host + ':' + tcpEndPoint.port)
})
esConnection.connect().catch(err => console.error(err))
return esConnection
}

return {
createEsConnection: createEsConnection,
createJsonEventData: esClient.createJsonEventData
}
}

validation.js

 module.exports = {
requestNotValid: function (req) {
if (isEmptyObject(req.body)) { return true }
if (!req.body.applies) { return true }
if (!req.body.source) { return true }
if (!req.body.profile) { return true }
return false
}
}

function isEmptyObject(obj) {
return !Object.keys(obj).length
}

Start Event Store locally. You can just use a local Event Store instance with the –mem-db option in order to not persist data on disk yet

<your-eventstore-folder>EventStore.ClusterNode.exe --mem-db

Start the Shop-Api with the following command

<api-root-folder>yarn start

Your Shop-Api service will start connecting to the EventStore instance and listening on port 3000. In case Event Store goes down, also this Api Service will go down. This will eventually trigger some alerts in a production ready environment.

Lesson 3: Data Processing

Here we are in the middle of our architecture, the Domain Model. The Processing Components are listening from input streams in Event Store. When there is a match with anything interesting, the component will handle messages with raw data trying to find Adapters able to map them to proper Commands. If this happens then the Application Service will send the command to the internal Aggregate. That is, call a method passing the command. That method will then run some logic and raise one or more Domain Events containing the results of the processing.

Event Sourcing pattern

We need to pose particular attention here to the coupling with other components. If we need data in order to complete our processing we don’t call directly another Processing Component. If we do so we create a wrong coupling between the two components and we will end up with a monolith but… distributed 🙂 If our Processing Component needs data, we will fill up a local cache. A different approach has to be made if we need to interact with an external service, for example a payment provider service. In that case our Billing service can call directly the external service and then raise a Domain Event when the payment is done. Can you see the point? We don’t couple directly our own Processing Components with each other.

Our processing component need to connect to the input stream on one side and to the streams where the Domain Events will be saved on the other side. This can be one single Event Store or you can split the responsibilities and having one Event Store for the data ingestion and one separate Event Store for keeping the streams with Domain Events. You subscribe from the raw data input and you append to the Domain Events’s streams. Two separate Event Store connections. For the data ingestion you can eventually use a different queuing system especially if you need massive horizontal scalability. Using for the data in and for storing the Domain Events the same system is a big benefit in terms or operation maintenance, monitoring and error handling. Based on your performance requirement forecast you need to choose the trade of between data ingestion scalability and simplified operations. In this guide we can assume that we want to simplified operations and sacrifice the possiblity to scale the data ingestion. After all Event Store is capable of great performances and it can cover most of the performance needs required.

Let’s imagine first to sort out the subscription to the data ingestion streams. What we need is to build an Event Store connection

private IEventStoreConnection BuildConnection(string connName, Uri esLink)
        {
            return EventStoreConnection.Create(ConnectionSettings.Default, esLink, connName);
        }

We can use this method to build an Event Store connection. You can see that this method is wrapping a simple method exposed by the Client Api. In order to avoid attach a dependency to this api within my handling logic I usually use a similar abstraction that I call IConnectionBuilder. This Interface expose only a Build method make my handling logic be able to create and re-create a connection whenever is needed.

public interface IConnectionBuilder
    {
        UserCredentials Credentials { get; }
        IEventStoreConnection Build();
    }

Lesson 4: Build Read Models

This is the step where we want to let any one interested know what is happening within our Domain Model. What we build here are one or more Output Adapters or Synchronizers that will listen to any Domain Events and keep up to date Read Models. There will be one Output Adaper per Read Model. The pattern here is a simple Publish Subscribe. Any time a Domain Event is stored in a stream Event Store will then notify any Subscriber here. The Output Adapter need to match the Domain Event with a particular procedure where we take the data out of the Event and we save them in the external Read Model. A Read Model is a database where we can represent the current state of our entities. To get the current state of an Order or a Policy or whatever is your domain about we can just replay the Domain Events from the start and keep applying them as soon as they happen.