Event Sourcing · Software Development

Data ingestion with Logstash and EventStore

Event Store is a database to store data as events in data streams

It allows to implement CQRS and Event Sourcing. It also allows to exchange messages between components using simple Pub/Sub pattern

It’s open source and there is a community edition free to use and a commercial license with support as well

This article is the written version of a Webinar and it is about data ingestion.

The challenge

To be able to ingest data from any csv files load the content and send it to the Event Store Http Api

To be precise the challenge is to convert this


to that (this is an example of Http Post to the Event Store Http API)

Method POST: http://localhost:2113/streams/inbound-data
Content-Type: application/vnd.eventstore.events+json

“eventId”: “fbf4b1a1-b4a3-4dfe-a01f-ec52c34e16e4”,
“eventType”: “InboundDataReceived”,
“data”: {
“message”:           “Europe,Italy,Clothes,Online,M,12/17/2013,278155219,1/10/2014,1165,109.28,35.84,127311.20,41753.60,85557.60”
“metadata”: { “host”: “box-1”, “path”: “/usr/data/sales.csv” }

And as you can see we need to set several different parts of the http post. The unique eventId and enrich each message with some more metadata like the host and the file path.

These data will be precious later to find the specific mapping for that particular data-source and to keep track of the origin

Data Ingestion from CSV to Microservices

The Api Gateway is our internal data collection endpoint. In my opinion it must be a simple Http Endpoint. There is no need for an heavy weight framework that can potentially add complexity and little value.

In this case we are using the existing Event Store Http Api as Api Gateway and Logstash to monitor folders and convert the csv lines in json messages.

Logstash is a powerful tool that can keep track of the last ingested position in case something goes wrong and file changes.

There are other tools available for this task like Telegraf but I have more experience with Logstash.

Once that the message are flowing through the Gateway Api we can then implement Domain Components or Microservices that subscribes to these messages and provide validation, mapping and Domain business logic. They will then save results in their own data streams as Domain Events. To avoid persisting the data in the inbound stream you can also think about to optionally set a retention period after which the messages are automatically removed from the system.

  1. Download Logstash and Event Store
  2. Create a folder “inbound” and copy and paste in a new logstash.yml file my gist https://gist.github.com/riccardone/18c176dab737631c9a216e89b79acdb7
  3. If you are on Windows create a logstash.bat file to easily run logstash. The content depend on your paths but in my case is:
    C:\services\logstash-6.2.4\bin\logstash -f C:\inbound\logstash.yml
  4. From a command line run logstash
  5. Create a subdirectory inbound/<youclientname>. Example: c:\inbound\RdnSoftware
  6. Copy a csv file in the folder and let the ingestion starts

With this Logstash input configuration, when you have a new client you can just add another folder and start ingesting the data. For production environment you can combine Logstash with Filebeat to distribute part of the ingestion workload on separate boxes.

In some scenarios expecially when the data are not yet discovered and you don’t have yet a  mapper it will be easy to direct the data straight into a semi structured data lake for further analysis.

Here is the webinar

Hope that you enjoy my little data ingestion example.