CQRS and Event Sourcing in a Simple Shop Example in C#

In the following article, I present CQRS and Event Sourcing, where we have two separate databases – a write database (Event Store) and a read database. The write database In the write database (Event Store), there are events, e.g., ProductCreated, ProductChanged, ProductDeleted. We do not store the current state of the object, but the history of changes. In my project, I used EventStoreDB for this purpose. The read database The read database stores read models for what we want to display to the user, and they are created based on events from the Event Store. In my project, I chose MongoDB as the read database. This gives us the following advantages: Optimization of write and read operations We can create different read models for different use cases Access to the change history There are also some disadvantages: Eventual consistency - the data in the read database is delayed in relation to the write database because it needs to be processed based on events. For example, a user might add an order but not see it immediately in their order history. Increased complexity of the system The entire process looks as follows: The system receives a command (e.g., CreateProduct) In the command handler, we call the aggregate's method, where we create an event (e.g., ProductCreated) and save it in the Event Store In the event handler, we update the read database The user sees data from the read database The class that facilitates communication with EventStoreDB handles event storage and event reading to reconstruct the aggregate’s state: public class EventStoreRepository : IEventStoreRepository { private readonly EventStoreClient _eventStoreClient; public EventStoreRepository(EventStoreClient eventStoreClient) { _eventStoreClient = eventStoreClient; } public async Task LoadAsync(Guid aggregateId) where T : AggregateRoot, new() { if (aggregateId == Guid.Empty) throw new ArgumentException(nameof(aggregateId)); var streamName = GetStreamName(aggregateId); var readStreamResult = _eventStoreClient.ReadStreamAsync( Direction.Forwards, streamName, StreamPosition.Start); if (await readStreamResult.ReadState == ReadState.StreamNotFound) return null; var events = new List(); await foreach (var @event in readStreamResult) { var json = Encoding.UTF8.GetString(@event.Event.Data.ToArray()); var type = Type.GetType(Encoding.UTF8.GetString(@event.Event.Metadata.ToArray())); var @object = JsonSerializer.Deserialize(json, type); var domainEvent = (IDomainEvent)@object; events.Add(domainEvent); } var aggregate = new T(); aggregate.LoadFromHistory(events); return aggregate; } public async Task SaveAsync(T aggregate) where T : AggregateRoot, new() { var events = aggregate.GetDomainEvents(); if (!events.Any()) return; var streamName = GetStreamName(aggregate.Id); var eventsToSave = new List(); foreach (var @event in events) { var eventData = new EventData( Uuid.NewUuid(), @event.GetType().Name, JsonSerializer.SerializeToUtf8Bytes((object)@event), Encoding.UTF8.GetBytes(@event.GetType().AssemblyQualifiedName)); eventsToSave.Add(eventData); } await _eventStoreClient.AppendToStreamAsync(streamName, StreamState.Any, eventsToSave); } private string GetStreamName(Guid aggregateId) { var streamName = $"{typeof(T).Name}-{aggregateId}"; return streamName; } } The whole project is available on GitHub: https://github.com/misiukanis/simple-shop

Mar 29, 2025 - 21:03
 0
CQRS and Event Sourcing in a Simple Shop Example in C#

In the following article, I present CQRS and Event Sourcing, where we have two separate databases – a write database (Event Store) and a read database.
The write database
In the write database (Event Store), there are events, e.g., ProductCreated, ProductChanged, ProductDeleted. We do not store the current state of the object, but the history of changes. In my project, I used EventStoreDB for this purpose.
The read database
The read database stores read models for what we want to display to the user, and they are created based on events from the Event Store. In my project, I chose MongoDB as the read database.

This gives us the following advantages:

  • Optimization of write and read operations
  • We can create different read models for different use cases
  • Access to the change history

There are also some disadvantages:

  • Eventual consistency - the data in the read database is delayed in relation to the write database because it needs to be processed based on events. For example, a user might add an order but not see it immediately in their order history.
  • Increased complexity of the system

The entire process looks as follows:

  • The system receives a command (e.g., CreateProduct)
  • In the command handler, we call the aggregate's method, where we create an event (e.g., ProductCreated) and save it in the Event Store
  • In the event handler, we update the read database
  • The user sees data from the read database

Event Sourcing

The class that facilitates communication with EventStoreDB handles event storage and event reading to reconstruct the aggregate’s state:

public class EventStoreRepository : IEventStoreRepository
{
    private readonly EventStoreClient _eventStoreClient;

    public EventStoreRepository(EventStoreClient eventStoreClient)
    {
        _eventStoreClient = eventStoreClient;
    }

    public async Task<T> LoadAsync<T>(Guid aggregateId) where T : AggregateRoot, new()
    {
        if (aggregateId == Guid.Empty)
            throw new ArgumentException(nameof(aggregateId));

        var streamName = GetStreamName<T>(aggregateId);            

        var readStreamResult = _eventStoreClient.ReadStreamAsync(
                Direction.Forwards,
                streamName,
                StreamPosition.Start);

        if (await readStreamResult.ReadState == ReadState.StreamNotFound)
            return null;

        var events = new List<IDomainEvent>();
        await foreach (var @event in readStreamResult)
        {
            var json = Encoding.UTF8.GetString(@event.Event.Data.ToArray());
            var type = Type.GetType(Encoding.UTF8.GetString(@event.Event.Metadata.ToArray()));
            var @object = JsonSerializer.Deserialize(json, type);
            var domainEvent = (IDomainEvent)@object;

            events.Add(domainEvent);
        }

        var aggregate = new T();
        aggregate.LoadFromHistory(events);
        return aggregate;
    }

    public async Task SaveAsync<T>(T aggregate) where T : AggregateRoot, new()
    {
        var events = aggregate.GetDomainEvents();
        if (!events.Any())
            return;

        var streamName = GetStreamName<T>(aggregate.Id);

        var eventsToSave = new List<EventData>();
        foreach (var @event in events)
        {
            var eventData = new EventData(
                Uuid.NewUuid(),
                @event.GetType().Name,
                JsonSerializer.SerializeToUtf8Bytes((object)@event),
                Encoding.UTF8.GetBytes(@event.GetType().AssemblyQualifiedName));

            eventsToSave.Add(eventData);
        }

        await _eventStoreClient.AppendToStreamAsync(streamName, StreamState.Any, eventsToSave);
    }

    private string GetStreamName<T>(Guid aggregateId)
    {
        var streamName = $"{typeof(T).Name}-{aggregateId}";
        return streamName;
    }
}

The whole project is available on GitHub: https://github.com/misiukanis/simple-shop