Domain Driven Design · Event Sourcing · Software Architecture

Distributed Architecture 01: The Ingestion

neurons are cells that are specialised to pass signals to individual target cells, and synapses are the means by which they do so

Introduction

A Distributed Software Architecture is formed by several parts that communicate with each other through messages. The pipes or protocols used to exchange communication are like synapses in our human neural system. Another similarity is to look at atoms. An atom is composed by a nucleus with electrons flying around to protect the inner particles.

This Ingestion step in my opinion is important to detach your inner nucleus, where the business logic is, from the external world. You can avoid it and just let any client directly pass messages to the Aggregates, your nucleus. It works but it removes a necessary junction to make your architecture flexible and scalable.

One of the points that I’m trying to make here is that when we receive a business requirement, we then start to build the shell around it. The ingestion is the outer part, the door from where the external world can communicate passing signals (messages) like synapses do.

Ingestion phase in an Event Driven Architecture

You can have any data source on the outside Publishing messages to the Channel Adapter. This adapter is used to decouple the outside sources from the inside message channel and avoid forcing external sources to conform to your Messaging Channel. I usually implement a Channel Adapter in the form of a lightweight Api. The protocols here should be something common like HTTP or even GRPC. If your sources can’t push the messages to the Api you can create a reader that Pull the messages and send to the Api. There are also several of these tools already built like FileBeat or Telegraf. The pattern between clients and the Channel Adapter is Request/Response. The pattern between you internal message channel and the internal processing components is Publish/Subscribe

From a tech point of view, there are plenty of software out there that can be used for the message channel. Examples are: RabbitMq, Kafka, Azure Event Hub, Kinesis, ZeroMq, NServiceBus. I usually use Event Store for this as it is also the database that I use internally for Event Sourcing to store the Domain Events during the processing phase. Having one tool allow me to reduce the overhead in term of infrastructure complexity (disclaimer: I’m currently working for Event Store). To build the Channel Adapter it really depends but I usually prefer a very light and standard (http/json) Api built using Node.js+Express.

Note: If you are working in a corporate or anything that seems to be… Enterprise I can imagine you to be surrounded by big “Enterprise integration systems”, right? 🙂 I know… someone else did that and now you must deal with it. In that case is not much difficult. You can use the outer system as a proxy to connect any client to your internal light endpoint. In that way, your internal endpoint can just use a certificate to establish a secure connection with the external proxy and the proxy can handle the authentication and authorisation bit with the external clients. That part is probably one of the few that these big “enterprise service buses” can do well.

I usually don’t put any logic in the Channel Adapter. A side benefit is that keeping it simple allow it to be easily reused from time to time with other projects that need to be separated. It just exposes one endpoint for each domain and it…

  • receives the raw external message
  • wraps it in a semi structured message with just the source name and the date
  • publishes it in the queue
  • returns an acknowledge message to the client

This approach let your clients free from any complicated schema to conform to. That is another big benefit. Your Channel Adapter basically take all, any shape like csv, xml, avro, text and even stuff that is wrong and let other parts later to deal with the mapping complexity and eventually publish exceptions in error channels if something is not right or just ignore the message. The mapping from the raw external message to a well know Domain Command is responsibility of the Anti Corruption Layers protecting each Domain Component. Weak Schema mapping is my preferred choice.

We can see now how this perfectly fit with having another component that eventually is responsible to react to mapping errors events and send back to clients notification or whatever is your business requirement.

Clients need results… synchronously

Whenever during an architectural discussion someone raise the problem that the asynchronous nature of a distributed architecture poses for clients that want to be notified synchronously you can reply… Is up to the client wait synchronously or asynchronously for a reply that can be a message notifying an error or an acknowledge or a file with a report for each input message.

The Id is not always a problem as you can use a combination of other fields to route back the right message to the right client. In case you need an Id other than the source Id than you can let the Channel Adapter to assign a Deterministic Id to the message before publish it and return it back to the client. This will require a bit of understanding of the input message and therefore is a trade off if you want to keep the Api light, fast and easy to reuse.

Another technique is to ask clients to set a specific field as an Id or anything can be used to reconnect the message with the waiting client. That information can then be preserved by all the Input and Processing components in the message metadata. Your notifier component can later use it to route the message to the right client.

It’s a trade off between ask your clients to follow some minimal rules before sending messages or you have to deal with the problem and make your logic more complex.

Scale the ingestion channel

Data producers publish messages into an ingestion channel. Using Kafka terminology that is defined a topic. A topic can be further split in partitions and each partition hosted in separate broker nodes. This technique can be also defined as Sharding where a partition is in fact a shard of the topic. The Data Producer, the client, then send messages to one specific partition for that topic.

Using Event Store as message broker, the ingestion channel is called stream. With the current version it is not possible to horizontally scale the ingestion like you can do with Kafka, Kinesis, Azure Event Hub or RabbitMq. Sharding is not there yet but it’s not difficult to implement if you consider each Event Store instance a shard and your clients send data picking connections from a list of available shards. It’s a trade off between using one tool reducing the infrastructure complexity or be more scalable at the price of more infrastructure burden.

Using Single or Multiple Input Streams

Having this ingestion phase in our architecture open interesting possibilities in term of data management.

When the external clients start sending data to our Api, no matter if the data are just users interacting with a Web UI or are streams of messages from other processes, these messages are not ready to be processed. This Channel Adapter is like a joint that introduce flexibility in our system.

The processing components will eventually save the original message in a Domain Event depending on requirements.

But the original message flowing through the input stream can now be discarded. If you use a messaging system like Kafka then you can let the messages expires. If you use Event Store your Api can publish messages to a single input stream and set $maxAge or you can use the category feature and let a separate process deleting or archiving these streams based on requirements.

Streams in Event Store can be prefixed with a category for better partitioning

Discover unstructured data

It could happen that your data producers are sending data that you don’t know. This is common for example when you are building integration and analysis systems dealing with disparete datasource that need to be on boarded and discovered.

The Channel Adapter can still enrich the messages with the data source name and/or Uri but when the messages start flowing through your Message Channel there are no mappings available for any Command in your Domain Components. You can then push Missed Mapping errors in your error channel and direct the original messages to fill up a data lake of unstructured data.

For this discovery task I usually use Elastic Search as it is a fast and easy schema less no-sql database that let business analysts to inspect the new data using Kibana. Once that the data are know then you can create a mapping in one or more of the Domain Components and replay the original messages.