Reinventando a Roda: Criando seu próprio MediatR - Parte 3

No caminho o que importa é o caminho... ...e não o porto e ponto final! Chegamos ao fim da série. Mas a Ave Sangria nos ensina que o mais importante é a jornada. E de fato é isso. Foram três posts destrinchando trechos de código e explicando decisões técnicas, porém o mais importante não é o resultado, já que temos inúmeras bibliotecas que fazem a mesma coisa, e sim entender que é possível resolver problemas de forma efetiva. E a gente só consegue fazer isso codando! Pensando e codando! É isso que eu gostaria que você meu caro leitor, minha cara leitora entendesse. Por mais que tenhamos um monte de ferramentas com inteligência artificial nos ajudando, precisamos saber bem como resolver problemas. Precisamos entender bem como as coisas funcionam. E, acima de tudo, precisamos usar as ferramentas que temos em mãos, mas com a certeza do seu entendimento, da sua aplicabilidade. Bom, a única feature faltante em nossa biblioteca é a de notificações. A principal diferença entre um handler e uma notification é que o request/command aciona apenas um request handler. Já a notification pode ser "ouvida" por inúmeros notification handlers. As notificações servem para propagar eventos de forma totalmente desacoplada. Quando uma parte da aplicação quer informar que algo aconteceu, ela pode publicar essa mensagem e quem estiver ouvindo pode executar uma ação. Vamos imaginar que um usuário acabe de efetuar uma compra no seu sistema de e-commerce. Nesse momento pode-se emitir uma mensagem "Pedido efetuado com sucesso". Essa é a notificação. Podemos ter três partes distintas do próprio sistema que ouvem essa mensagem: A primeira vai enviar um e-mail com todas as informações da compra. A segunda interage com um sistema externo para obter informações sobre rastreio. A terceira registra o evento em um sistema interno de auditoria para manter o histórico das operações realizadas pelos usuários. Se você trabalha com sistemas de mensageria como RabbitMQ por exemplo, pode achar que esse processo poderia enviar a mensagem para um tópico ou fila e ter background services efetuando o processamento etc. Você está certo! Mas esse foi o exemplo que achei ser mais didático para explicar a parte de notificações... Se você acompanhou essa trilogia (parte 1 e parte 2), já deve imaginar como implementar isso. Se não consegue imaginar, não tem problema, eu explico. A ideia é seguir o mesmo conceito do RequestHandler, a diferença é que não podemos ter mais de um RequestHandler para um Request/Response e no caso do NotificationHandler podemos ter isso. Antes de avançar, pegue aquela Fanta Uva misturada com Yakult e vem comigo. Bora relembrar a estrutura do nosso projeto? TheMediator.Core ├── DependencyInjection/ │ └── DependencyInjection.cs ├── Executors/ │ ├── FilterExecutor.cs │ └── HandlerExecutor.cs ├── Inspectors/ │ └── ServiceInspector.cs ├── Models/ │ ├── ServiceCategory.cs │ ├── ServiceDescriptor.cs │ └── Void.cs ├── Registries/ │ ├── FilterRegistry.cs │ └── HandlerRegistry.cs ├── IRequestFilter.cs ├── IRequestHandler.cs ├── ISender.cs └── Usings.cs Posto isso, precisamos criar nossa interface principal, um registrador e um executor de Notification Handlers além de ajustar nossa classe de injeção de dependência para suportar o registro de notificações. A interface é simples e objetiva: public interface IRequestHandler where TRequest : notnull { Task HandleAsync(TRequest request, CancellationToken cancellationToken); } public interface IRequestHandler where TRequest : notnull { Task HandleAsync(TRequest request, CancellationToken cancellationToken); } Basicamente temos duas interfaces, uma para handlers com TRequest e outra para Handlers com TRequest e TResponse. A essa altura do campeonato, com dois posts falando exaustivamente disso, acredito que não preciso explicar o motivo, certo? ;p Agora vamos implementar nosso querido NotifierRegistry... Arquivo Registries/NotifierRegistry.cs: public class NotifierRegistry(IServiceCollection services) { private readonly List _notifications = new(); [ExcludeFromCodeCoverage] public NotificationConfiguration Configurations { get; set; } = NotificationConfiguration.None; public void Add() where TService : notnull { var mainType = typeof(TService); var service = ServiceInspector.CreateService(mainType, ServiceCategory.Notification); Add(service); } internal void Add(Models.ServiceDescriptor serviceDescriptor) { if (serviceDescriptor.Category != ServiceCategory.Notification) throw new InvalidOperationException($"{serviceDescriptor.MainType} is not a notification type!"); var current = _notifications.FirstOrDefault(s => s.MainType == serviceDescriptor.MainType && s.RequestType == serviceDescriptor.RequestType && s.ResponseType == Models.Void.Type && s.Category == serviceDe

Apr 30, 2025 - 01:58
 0
Reinventando a Roda: Criando seu próprio MediatR - Parte 3

No caminho o que importa é o caminho...
...e não o porto e ponto final!

Chegamos ao fim da série. Mas a Ave Sangria nos ensina que o mais importante é a jornada. E de fato é isso. Foram três posts destrinchando trechos de código e explicando decisões técnicas, porém o mais importante não é o resultado, já que temos inúmeras bibliotecas que fazem a mesma coisa, e sim entender que é possível resolver problemas de forma efetiva. E a gente só consegue fazer isso codando! Pensando e codando!

É isso que eu gostaria que você meu caro leitor, minha cara leitora entendesse. Por mais que tenhamos um monte de ferramentas com inteligência artificial nos ajudando, precisamos saber bem como resolver problemas. Precisamos entender bem como as coisas funcionam. E, acima de tudo, precisamos usar as ferramentas que temos em mãos, mas com a certeza do seu entendimento, da sua aplicabilidade.

Bom, a única feature faltante em nossa biblioteca é a de notificações. A principal diferença entre um handler e uma notification é que o request/command aciona apenas um request handler. Já a notification pode ser "ouvida" por inúmeros notification handlers.

As notificações servem para propagar eventos de forma totalmente desacoplada. Quando uma parte da aplicação quer informar que algo aconteceu, ela pode publicar essa mensagem e quem estiver ouvindo pode executar uma ação.

Vamos imaginar que um usuário acabe de efetuar uma compra no seu sistema de e-commerce.
Nesse momento pode-se emitir uma mensagem "Pedido efetuado com sucesso". Essa é a notificação.
Podemos ter três partes distintas do próprio sistema que ouvem essa mensagem:

  • A primeira vai enviar um e-mail com todas as informações da compra.
  • A segunda interage com um sistema externo para obter informações sobre rastreio.
  • A terceira registra o evento em um sistema interno de auditoria para manter o histórico das operações realizadas pelos usuários.

Se você trabalha com sistemas de mensageria como RabbitMQ por exemplo, pode achar que esse processo poderia enviar a mensagem para um tópico ou fila e ter background services efetuando o processamento etc. Você está certo! Mas esse foi o exemplo que achei ser mais didático para explicar a parte de notificações...

Se você acompanhou essa trilogia (parte 1 e parte 2), já deve imaginar como implementar isso. Se não consegue imaginar, não tem problema, eu explico. A ideia é seguir o mesmo conceito do RequestHandler, a diferença é que não podemos ter mais de um RequestHandler para um Request/Response e no caso do NotificationHandler podemos ter isso.

Antes de avançar, pegue aquela Fanta Uva misturada com Yakult e vem comigo.

Bora relembrar a estrutura do nosso projeto?

TheMediator.Core
├── DependencyInjection/
│   └── DependencyInjection.cs
├── Executors/
│   ├── FilterExecutor.cs
│   └── HandlerExecutor.cs
├── Inspectors/
│   └── ServiceInspector.cs
├── Models/
│   ├── ServiceCategory.cs
│   ├── ServiceDescriptor.cs
│   └── Void.cs
├── Registries/
│   ├── FilterRegistry.cs
│   └── HandlerRegistry.cs
├── IRequestFilter.cs
├── IRequestHandler.cs
├── ISender.cs
└── Usings.cs

Posto isso, precisamos criar nossa interface principal, um registrador e um executor de Notification Handlers além de ajustar nossa classe de injeção de dependência para suportar o registro de notificações.

A interface é simples e objetiva:

public interface IRequestHandler<in TRequest, TResponse>  
  where TRequest : notnull  
{  
    Task<TResponse> HandleAsync(TRequest request, CancellationToken cancellationToken);  
}  

public interface IRequestHandler<in TRequest>  
  where TRequest : notnull  
{  
    Task HandleAsync(TRequest request, CancellationToken cancellationToken);  
}

Basicamente temos duas interfaces, uma para handlers com TRequest e outra para Handlers com TRequest e TResponse. A essa altura do campeonato, com dois posts falando exaustivamente disso, acredito que não preciso explicar o motivo, certo? ;p

Agora vamos implementar nosso querido NotifierRegistry...

Arquivo Registries/NotifierRegistry.cs:

public class NotifierRegistry(IServiceCollection services)
{
    private readonly List<Models.ServiceDescriptor> _notifications = new();

    [ExcludeFromCodeCoverage]
    public NotificationConfiguration Configurations { get; set; } = NotificationConfiguration.None;

    public void Add<TService>()
        where TService : notnull
    {
        var mainType = typeof(TService);
        var service = ServiceInspector.CreateService(mainType, ServiceCategory.Notification);
        Add(service);
    }

    internal void Add(Models.ServiceDescriptor serviceDescriptor)
    {
        if (serviceDescriptor.Category != ServiceCategory.Notification)
            throw new InvalidOperationException($"{serviceDescriptor.MainType} is not a notification type!");

        var current = _notifications.FirstOrDefault(s =>
            s.MainType == serviceDescriptor.MainType &&
            s.RequestType == serviceDescriptor.RequestType &&
            s.ResponseType == Models.Void.Type &&
            s.Category == serviceDescriptor.Category);
        if (current is not null)
            throw new InvalidOperationException(
                $"Cannot register {serviceDescriptor.MainType}!.\n" +
                $"The Notification {current.MainType} is already registered");

        _notifications.Add(serviceDescriptor);
        services.AddSingleton(serviceDescriptor.MainType);
    }

    internal IReadOnlyCollection<Models.ServiceDescriptor> ListNotifiersByMessageType<TMessageType>()
        => _notifications.Where(s => s.RequestType == typeof(TMessageType) &&
                                     s.ResponseType == Models.Void.Type &&
                                     s.Category == ServiceCategory.Notification).ToList();
}

Essa classe é bem simples de se entender. Ela armazena uma lista de ServiceDescriptor através de dois métodos: Add() e Add(Models.ServiceDescriptor serviceDescriptor). O primeiro cria um ServiceDescriptor a partir do tipo de serviço (TService) e repassa ao segundo que por sua vez valida se esse mesmo já não foi registrado, caso não tenha sido, adiciona na lista. E ao armazenar, adiciona no sistema de injeção de dependências o tipo inserido.

Note que é possível ter vários tipos de Notification Handlers associados a uma mensagem, mas não podemos ter dois Notification Handlers iguais, isto é, como mesmo MainType.
Se não ficou claro, explico: Não é possível ter dois Notification Handlers do tipo EnviaEmailDeBoasVindasHandler registrados. A mesma classe não pode ser registrada duas vezes, é isso...

NotificationConfiguration é uma classe para configurar como a notificação vai ser enviada (e num futuro ter outras opções). Explico mais abaixo.

E por fim temos ListNotifiersByMessageType() que retorna uma lista de Notification Handlers a partir do TMessageType. Esse é o método usado pelo executor no momento de envio da notificação.

Um ponto importante é que não é necessário executar uma notificação em uma ordem específica, diferente dos filtros.

Arquivo Models/NotificationConfiguration.cs:

public class NotificationConfiguration(NotificationDeliveryMode deliveryMode)  
{  
  public NotificationDeliveryMode DeliveryMode { get; set; } = deliveryMode;  

  //TODO: implementar outras configurações...
}

public enum NotificationDeliveryMode  
{  
  FireAndForget,  
  WaitAll  
}

Ao enviar uma notificação podemos optar se esse envio vai ser FireAndForget, isto é, eu envio a notificação, mas não aguardo seu retorno, ou se eu espero todos os Notification Handlers executarem suas ações, WaitAll.

Esse comportamento é importante já que tem casos que eu não preciso me preocupar com quem está consumindo a mensagem, logo, seu retorno é indiferente.
Porém existem casos que é necessário aguardar o processamento da mensagem.
No caso da nossa biblioteca, a pessoa desenvolvedora escolhe...

Um ponto mais do que importante: Ao utilizar o modo FireAndForget caso o Notification Handler lance uma exception não tem como efetuar a captura por parte de quem enviou a notificação. Isso acontece pelo fato de não aguardarmos a finalização do método. Logo, é importante efetuar o tratamento de exceções dentro do Notification Handler...

E...

Outro ponto não menos importante: Caso escolha WaitAll tenha em mente que o processo todo pode demorar muito dependendo da quantidade Notification Handlers registrados. O lado positivo é que caso algum deles lance uma exceção é possível capturá-la. Não tem mágica: Chora Agora ou Ri depois...

Vamos ao executor...

Arquivo Executors/NotifierExecutor.cs:

public class NotifierExecutor(
    ILogger<NotifierExecutor> logger,
    IServiceProvider serviceProvider,
    NotifierRegistry notifierRegistry)
{
    public async Task PublishAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
        where TMessage : notnull
    {
        var services = notifierRegistry.ListNotifiersByMessageType<TMessage>();
        var channel = Channel.CreateBounded<Func<Task>>(new BoundedChannelOptions(services.Count));

        var sender = Task.Run(() => SendNotificationsAsync(message, services, channel, cancellationToken), cancellationToken);
        var receiver = Task.Run(() => ExecuteHandlersAsync(channel, cancellationToken), cancellationToken);

        await Task.WhenAll(sender, receiver);
    }

    private async Task ExecuteHandlersAsync<TMessage>(
        TMessage message,
        IReadOnlyCollection<Models.ServiceDescriptor> services,
        Channel<Func<Task>> channel,
        CancellationToken cancellationToken)
        where TMessage : notnull
    {
        logger.LogInformation("Publishing message {MessageType} to {NotificationCount} notifications",
            typeof(TMessage).Name, notificationsType.Count);

        try
        {
            foreach (var serviceDescriptor in services)
            {
                var handler = (INotificationHandler<TMessage>)serviceProvider.GetRequiredService(serviceDescriptor.MainType);
                await channel.Writer.WriteAsync(() =>
                {
                    logger.LogInformation("Sending message {MessageType} to {HandlerType}",
                        typeof(TMessage).Name, serviceDescriptor.MainType.Name);

                    return handler.HandleAsync(message, cancellationToken);
                }, cancellationToken);
            }
        }
        finally
        {
            channel.Writer.Complete();
        }
    }

    private async Task ReceiveNotificationsAsync(
        Channel<Func<Task>> channel,
        CancellationToken cancellationToken)
    {
        while (await channel.Reader.WaitToReadAsync(cancellationToken))
        {
            while (channel.Reader.TryRead(out var handlerTask))
            {
                if (notifierRegistry.Configurations.DeliveryMode == NotificationDeliveryMode.FireAndForget)
                    _ = Task.Run(handlerTask, cancellationToken).ConfigureAwait(false);
                else
                    await handlerTask();
            }
        }
    }
}

Como você pode imaginar, a classe NotifierExecutor é responsável por publicar mensagens para notificações registradas, utilizando um mecanismo de processamento assíncrono baseado em channels. O uso de channels permite gerenciar a comunicação entre produtores (quem envia as notificações) e consumidores (quem processa as notificações) de forma eficiente e segura em cenários concorrentes, que é o nosso caso.

Eu fiz um post falando sobre channels: Você já usou o System.Threading.Channels?. Recomendo muito a leitura desse post...

Caso você não tenha lido o post, vou fazer um resumo resumido aqui sobre channels:

O Channel é uma estrutura do dotnet que fornece um buffer seguro para comunicação entre tarefas assíncronas, thread-safe na veia! Ele é composto por dois componentes principais: Writer, componente usado para adicionar itens ao canal e o Reader, usado para, vejam só, consumir itens do canal.

Basicamente, essa biblioteca permite que a gente transmita dados de forma segura e sincronizada entre threads, tudo isso com muita performance. Muita mesmo. Aliás, esse é um dos segredos por de trás da performance absurda do Asp.Net Core.

O mais legal é que com Channels podemos desacoplar a produção e o consumo de tarefas, permitindo que ambas as operações sejam executadas em velocidades diferentes sem bloquear diretamente uma à outra.
No caso do NotifierExecutor, isso é essencial para gerenciar múltiplos handlers de notificações de forma eficiente.

Avançando, temos um método chamado PublishAsync. É aqui que o canal é criado e delimitamos a sua capacidade (Channel.CreateBounded) para a quantidade de handlers registrados para a mensagem. Cada item no canal é uma função (Func) que representa a execução de um handler de notificação.

O método SendNotificationsAsync é responsável por "varrer" os serviços registrados e adicionar as Func ao canal que, quando executadas, chamam o método HandleAsync de cada handler. Isso é feito de forma assíncrona, garantindo que o canal seja preenchido com as tarefas a serem processadas. Aqui temos o Write.

Já o Reader é feito dentro do método ExecuteHandlersAsync. Nele as Func são recebidas e executadas. Dependendo da configuração de entrega (NotificationDeliveryMode, lembra?), as tarefas podem ser executadas no modo FireAndForget, não aguardando a conclusão, ou WaitAll, aguardando a conclusão de cada tarefa.

Aliás, na parte do FireAndForget você notou o .ConfigureAwait(false)? Sabe pra que ele serve? Se não sabe, fiz um post garboso demais sobre o assunto: Async/Await: Task.ConfigureAwait, Deadlock e Pink Floyd.

Uma coisa que eu acho super bacanuda é o fato de ser muito simples resolver esse problema de envio de mensagens para inúmeros handlers usando channels. Nós temos uma classe simples, com umas 60 linhas e que resolve de forma muito performática esse problema.

O dotnet tem um conjunto de ferramentas espetaculares e é por isso que eu sempre recomendo ir a fundo no framework. É possível criar qualquer coisa com muita performance. Qualquer coisa mesmo!

Avancemos!

Agora precisamos criar a interface para publicação e sua implementação.

Arquivo IPublisher.cs:

public interface IPublisher  
{  
    Task PublishAsync<TMessage>(TMessage message, CancellationToken cancellationToken)  
      where TMessage : notnull;  
}  

internal class Publisher(NotifierExecutor notifierExecutor) : IPublisher  
{  
    public Task PublishAsync<TMessage>(TMessage message, CancellationToken cancellationToken)  
      where TMessage : notnull  
      => notifierExecutor.PublishAsync(message, cancellationToken);  
}

O código é autoexplicativo. Pegamos o NotifierExecutor e invocamos o método PublishAsync. Simples assim. A lógica toda foi explicada logo acima.

Certo, chegamos à parte final do post. E consequentemente da série.

Para fechar a conta e passar a régua precisamos ajustar nossa classe de injeção de dependência.

Arquivo DependencyInjection/DependencyInjection.cs:

No método AddTheMediator adicionamos o NotifierRegistry e o IPublisher no sistema de injeção de dependências :

...
services.AddSingleton<NotifierRegistry>(_ => configuration.Notifiers);

...

services.AddSingleton<IPublisher, Publisher>();

E na classe Configuration que fica dentro da classe DependencyInjection temos:

...

public NotifierRegistry Notifiers { get; } = new(services);

public Configuration AddServicesFromAssemblies(params Assembly[] assemblies)  
{  
    if (assemblies is null || assemblies.Length == 0)  
       throw new ArgumentNullException(nameof(assemblies), "Assemblies cannot be null or empty.");  

    assemblies .Select(ServiceInspector.GetServicesByAssembly)  
               .SelectMany(serviceType => serviceType)  
               .ToList()  
               .ForEach(serviceDescriptor =>  
               {  
                   if (serviceDescriptor.Category == ServiceCategory.Handler)  
                       Handlers.Add(serviceDescriptor);  
                   if (serviceDescriptor.Category == ServiceCategory.Notification)  
                       Notifiers.Add(serviceDescriptor);  
               });  
    return this;  
}

Criamos uma propriedade chamada Notifiers que nada mais é que o NotifierRegistry e no método AddServicesFromAssemblies incluímos o serviceDescriptor na lista de notificações. Não tem segredo!

Fechamos a conta. Agora vamos passar a régua!

Vamos configurar nosso playground para emitir notificações.

Arquivo Program.cs:

builder.Services.AddTheMediator(configuration =>  
{  
    configuration.AddServicesFromAssemblies(typeof(Program).Assembly);  

    configuration.Notifiers.Configurations.DeliveryMode = NotificationDeliveryMode.FireAndForget;  

    configuration.Filters.Add<MeasureTimeRequestFilter>();  
    configuration.Filters.Add<LoggerRequestFilter>();  
});

A única diferença é o NotificationDeliveryMode configurando o modo de envio como FireAndForget.

Agora vamos criar nosso primeiro Notification Handler:

Arquivo Notifications/CreateProductNotification.cs:

public class CreateProductNotification(ILogger<CreateProductNotification> logger)  
    : INotificationHandler<ProductResponse>  
{  
    public async Task HandleAsync(ProductResponse notification, CancellationToken cancellationToken)  
    {  
        logger.LogInformation("{Class} Product {ProductName} created", nameof(CreateProductNotification),  
 notification.Name);  
        await Task.Delay(1500, cancellationToken);  
    }
 } 

E para fechar, vamos alterar nosso handler CreateProductRequestHandler:

public class CreateProductRequestHandler(IPublisher publisher, IProductsRepository repository)  
    : IRequestHandler<ProductRequest, ProductResponse>  
{  
    public async Task<ProductResponse> HandleAsync(ProductRequest request, CancellationToken cancellationToken)  
    {  
        var response = await repository.Create(request, cancellationToken);  
        await publisher.PublishAsync(response, cancellationToken);  
        return response;  
 }}

Injetamos o IPublisher e invocamos o método PublishAsync passando o Response. Simples, simples, simples!

Agora dê um F5 e veja a mágica acontecer no console da aplicação.

...
info: TheMediator.Core.Executors.HandlerExecutor[0]
      Handler CreateProductRequestHandler for request type ProductRequest and response type ProductResponse executed successfully
info: TheMediator.Core.Executors.NotifierExecutor[0]
      Publishing message ProductResponse to 1 notifications
info: TheMediator.Core.Executors.NotifierExecutor[0]
      Sending message ProductResponse to CreateProductNotification
info: TheMediator.Playground.Notifications.CreateProductNotification[0]
      CreateProductNotification Product Playstation 5 created
...

Lindo, não? Eu omito alguns logs para poder focar no que importa: O envio da notificação.

Ahhh, você deve ter reparado naquele await Task.Delay(1500, cancellationToken) da classe CreateProductNotification, certo?

Faça o seguinte teste: Configure o NotificationDeliveryMode para WaitAll e rode novamente o Playground. Você vai notar que o processo vai esperar pela execução da notificação.

Brinque com o playground. Crie outras notificações. Mude o NotificationDeliveryMode e lance exceções dentro dos Handler Notifications. Acompanhe o fluxo da informação. Isso vai te ajudar muito a entender como as coisas funcionam.

E chegamos ao fim da nossa saga (aventura e não o padrão de transação entre microservices). Espero que todo esse conteúdo tenha sido útil para você. Para mim foi muito divertido, muito mesmo.

Lembrando que o código fonte está disponível em https://github.com/angelobelchior/TheMediator.

Um ponto interessante é que o código do repositório está mais atualizado do que o código do post. Não tem nada de tão relevante, apenas pequenos ajustes e outras melhorias...

Divirta-se e forte abraço.