Event Sourcing · Software Development

Event Store Step-by-step

This is a draft public available to get feedback while I’m writing it. It will change day by day in the next coming weeks.

In this guide I will go through an E-Commerce scenario in order to show how to combine Publish Subscribe, Event Sourcing and CQRS to build loosely couple Microservices.

Our business requirement is to receive and process orders from a Web UI and present back to the user data and reports.

We will build our components using a mix of languages and platform with Node.Js+Express, C# and Java. This approach is to show how we can be platform independent having EventStore at the core of our architecture.

A key point of this article is to define an approach to reuse for most of requirements. The part that change from requirement to requirement is the behaviours inside the Aggregate and the handling functions that can make use of an application service or another. But the surrounding ‘mechanics’ that define the flow of messages and the different ways we handle them thought the 3 steps remains the same and it’s reusable. On each new requirement you will be able to copy and paste most of the repetitive code and like a sculptor you’ll get it better and better on each round.

Lesson 1: Getting Started

We are building a solution to process and store orders, payments and shipping. These slices of data will form together our solution like a cake. In DDD Jargon each part is an Aggregate and the whole solution could be a Bounded Context. We will implement the data ingestion as a gateway to send the data to our processing units. On the other side there will be our downstream components protecting each Aggregate. Each component can be represented as a bubble. If we look at one of these bubbles in the middle of our architecture, we can see different layer that like an onion protect the inner Aggregate. There is the host process responsible to keep the service running and inject the concrete dependencies. There is the Application Service where the external messages will eventually be mapped to well know Commands to be sent to the Aggregates. There is the inner Aggregate exposing all the behaviors representing our business feature.

In other words, our Processing Components handles commands and raise Domain Events. These events will be stored in Event Store streams following the Event Sourcing pattern.

When I try to visualize the flow of data through this Architecture I can see 3 basic steps

3 Steps Architecture

That pattern is very well reusable. It can be applied to all different requirements and you can re-use 90% of it each time. Each time you re-use it for a different requirement you can copy & paste almost all the generic code that form the mechanics like the Api, the adapter, the synchroniser and just focus on the very specific behaviours of your new use case. Each time you will improve it slightly, adapting it to the new requirement or simplifying part of it. These 3 components working together for a specific requirement remind me the elementary particles that work withing an atom.

From a physical point of view, Each step can be implemented with different Microservices. These Microservices receive and produces messages. These messages can be described as Commands and Events.

Event Store can be used at each step like a Swiss knife. For the data input, the api will publish messages to Event Store streams with a simple Pub/Sub pattern. For the processing, the Downstream Processing Components will subscribe from the input streams and then save the resulted Domain Events in Event Store durable streams. For the Data Output, the Output Adapters or Synchronizers will be notified with any relevant Domain Event to keep up-to-date a Read Model.

The same architecture exploded with an example of data sources and read models

3 Steps Architecture with elements

Lesson 2: Data Input

This section of our solution expose one or more endpoints to the external world. This play the role of gateway to allow external actors (Web or Mobile Applications, any message sender or ETL processes) to send data to our Processing Components. At this stage we don’t want to run any particular logic. This part will only publish raw data messages to input streams in Event Store. The pattern here is a simple Publish-Subscribe. On the other side there will be one or more group of consumers subscribing these input streams. That step is important in order to be able to horizontally scale our processing units using the Persistent Subscription model provided by Event Store implementing the Competing Consumers pattern.

One of the key points here is to let the external actors to push data in any format or shape. The conversion to something that we can understand will happen later, on the other side, in one of the Adapters that play the role of Anti Corruption Layers.

Let’s create this service using Node.Js + Express. We will use Yarn as package manager. We will use ‘node-eventstore-client’ as javascript EventStore client.

  1. Create a folder ‘shop-api’
  2. From a command line CD the project, init and add the following dependencies:
    1. yarn init
    2. yarn add express
    3. yarn add node-eventstore-client
    4. yarn add body-parser
    5. yarn add uuid
  3. Add the following javascript files

index.js

const express = require('express')
var bodyParser = require("body-parser")
var esClient = require('node-eventstore-client')
var ConnectionFactory = require('./connectionFactory')
var Sender = require('./sender')
var _validation = require('./validation')
const uuidv4 = require('uuid/v4')
var _connFactory = new ConnectionFactory(esClient, "admin", "changeit", "tcp://eventstore:1113", "shop-api")
var _sender = new Sender(_connFactory, "shop-input", uuidv4)

const app = express()
app.use(bodyParser.json())
const port = 3000

app.get('/', (req, res) => res.send('ok'))

app.post("/shop/api/v1", function (req, res) {
if (_validation.requestNotValid(req)) {
return res.status(400).send("Request body not valid");
}
_sender.send(req.body, "ShopDataReceived").then(function (result) {
res.send();
}).catch(error => {
console.error(error.message || error);
res
.status(500)
.send("There is a technical problem and the data has not been stored");
});
})

app.listen(port, () => console.log(`EventStore Shop-Api service listening on port ${port}!`))

sender.js

 module.exports = function (connFactory, publishTo, uuidv4) {
var _connFactory = connFactory
var _conn = connFactory.createEsConnection()
var _publishTo = publishTo || 'data-input'

var send = function (message, eventType, publishTo) {
if (!publishTo) {
publishTo = _publishTo;
}
var eventData = toEventData(message)
var event = _connFactory.createJsonEventData(eventData.eventId, eventData.data, eventData.metadata, eventType)
return _conn.appendToStream(publishTo, esClient.expectedVersion.any, event)
}

var toEventData = function (msg) {
var applies = msg.applies
var source = msg.source
delete msg.profile
delete msg.applies
delete msg.source
var eventData = {
eventId: uuidv4(),
data: msg,
metadata: { 'Applies': applies, 'Source': source }
}
return eventData
}

return { send: send }
}

connectionFactory.js

 module.exports = function (esClient, user, password, link, name) {
var createEsConnection = () => {
var esConnection = esClient.createConnection({ user: password }, link, name)
esConnection.on('error', err =>
console.error(`Error occurred on connection: ${err}`)
)
esConnection.on('closed', (reason) => {
console.error(`Connection closed, reason: ${reason}`)
console.error('dying...')
process.exit(1)
})
esConnection.once('connected', (tcpEndPoint) => {
console.info('Connected to eventstore at ' + tcpEndPoint.host + ':' + tcpEndPoint.port)
})
esConnection.connect().catch(err => console.error(err))
return esConnection
}

return {
createEsConnection: createEsConnection,
createJsonEventData: esClient.createJsonEventData
}
}

validation.js

 module.exports = {
requestNotValid: function (req) {
if (isEmptyObject(req.body)) { return true }
if (!req.body.applies) { return true }
if (!req.body.source) { return true }
if (!req.body.profile) { return true }
return false
}
}

function isEmptyObject(obj) {
return !Object.keys(obj).length
}

Start Event Store locally. You can just use a local Event Store instance with the –mem-db option in order to not persist data on disk yet

<your-eventstore-folder>EventStore.ClusterNode.exe --mem-db

Start the Shop-Api with the following command

<api-root-folder>yarn start

Your Shop-Api service will start connecting to the EventStore instance and listening on port 3000. In case Event Store goes down, also this Api Service will go down. This will eventually trigger some alerts in a production ready environment.

Lesson 3: Data Processing

Here we are in the middle of our architecture, the Domain Model. The Processing Components are listening from input streams in Event Store. When there is a match with anything interesting, the component will handle messages with raw data trying to find Adapters able to map them to proper Commands. If this happens then the Application Service will send the command to the internal Aggregate. That is, call a method passing the command. That method will then run some logic and raise one or more Domain Events containing the results of the processing.

Event Sourcing pattern

We need to pose particular attention here to the coupling with other components. If we need data in order to complete our processing we don’t call directly another Processing Component. If we do so we create a wrong coupling between the two components and we will end up with a monolith but… distributed šŸ™‚ If our Processing Component needs data, we will fill up a local cache. A different approach has to be made if we need to interact with an external service, for example a payment provider service. In that case our Billing service can call directly the external service and then raise a Domain Event when the payment is done. Can you see the point? We don’t couple directly our own Processing Components with each other.

Lesson 4: Build Read Models

This is the step where we want to let any one interested know what is happening within our Domain Model. What we build here are one or more Output Adapters or Synchronizers that will listen to any Domain Events and keep up to date Read Models. There will be one Output Adaper per Read Model. The pattern here is a simple Publish Subscribe. Any time a Domain Event is stored in a stream Event Store will then notify any Subscriber here. The Output Adapter need to match the Domain Event with a particular procedure where we take the data out of the Event and we save them in the external Read Model. A Read Model is a database where we can represent the current state of our entities. To get the current state of an Order or a Policy or whatever is your domain about we can just replay the Domain Events from the start and keep applying them as soon as they happen.

Leave a Reply