Implementando o Padrão Outbox com PostgreSQL no Brighter
Introdução Com base em nossa exploração anterior do padrão outbox com SQL Server, este artigo adapta o padrão para PostgreSQL, abordando suas limitações. O objetivo é garantir consistência transacional entre atualizações no banco de dados e publicação de mensagens. Utilizaremos .NET 8, Brighter e PostgreSQL para lidar com a criação de pedidos e publicação de eventos em sistemas distribuídos. Projeto A ideia principal deste projeto é enviar um comando para criar um pedido. Quando o pedido for criado, serão enviadas duas mensagens: OrderPlaced e OrderPaid. Em caso de falha, nenhuma mensagem deve ser enviada. Requisitos .NET 8+ Podman (ou Docker) para executar contêineres locais: PostgreSql RabbitMQ Conhecimento sobre Brighter e RabbitMQ Pacotes NuGet Paramore.Brighter.Extensions.DependencyInjection Paramore.Brighter.Extensions.Hosting Paramore.Brighter.MessagingGateway.RMQ Paramore.Brighter.Outbox.PostgreSql Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection Paramore.Brighter.ServiceActivator.Extensions.Hosting Mensagens Para este projeto, precisaremos de três mensagens: CreateNewOrder, OrderPlaced e OrderPaid. public class CreateNewOrder() : Command(Guid.NewGuid()) { public decimal Value { get; set; } } public class OrderPlaced() : Event(Guid.NewGuid()) { public string OrderId { get; set; } = string.Empty; public decimal Value { get; set; } } public class OrderPaid() : Event(Guid.NewGuid()) { public string OrderId { get; set; } = string.Empty; } Mapeadores de Mensagens Como apenas os eventos OrderPlaced e OrderPaid são publicados no RabbitMQ, precisamos implementar mapeadores para eles usando serialização JSON. public class OrderPlacedMapper : IAmAMessageMapper { public Message MapToMessage(OrderPlaced request) { var header = new MessageHeader(); header.Id = request.Id; header.TimeStamp = DateTime.UtcNow; header.Topic = "order-placed"; header.MessageType = MessageType.MT_EVENT; var body = new MessageBody(JsonSerializer.Serialize(request)); return new Message(header, body); } public OrderPlaced MapToRequest(Message message) { return JsonSerializer.Deserialize(message.Body.Bytes)!; } } public class OrderPaidMapper : IAmAMessageMapper { public Message MapToMessage(OrderPaid request) { var header = new MessageHeader(); header.Id = request.Id; header.TimeStamp = DateTime.UtcNow; header.Topic = "order-paid"; header.MessageType = MessageType.MT_EVENT; var body = new MessageBody(JsonSerializer.Serialize(request)); return new Message(header, body); } public OrderPaid MapToRequest(Message message) { return JsonSerializer.Deserialize(message.Body.Bytes)!; } } Manipuladores de Requisição Para OrderPlaced e OrderPaid, vamos registrar logs das mensagens recebidas. public class OrderPlaceHandler(ILogger logger) : RequestHandlerAsync { public override Task HandleAsync(OrderPlaced command, CancellationToken cancellationToken = default) { logger.LogInformation("{OrderId} placed with value {OrderValue}", command.OrderId, command.Value); return base.HandleAsync(command, cancellationToken); } } public class OrderPaidHandler(ILogger logger) : RequestHandlerAsync { public override Task HandleAsync(OrderPaid command, CancellationToken cancellationToken = default) { logger.LogInformation("{OrderId} paid", command.OrderId); return base.HandleAsync(command, cancellationToken); } } Criar Novo Pedido O manipulador CreateNewOrder aguardará 10ms para emular um processo, depois publicará OrderPlaced. Se o valor for divisível por 3, uma exceção será lançada (simulando um erro de negócio), caso contrário, OrderPaid será publicado. public class CreateNewOrderHandler(IAmACommandProcessor commandProcessor, ILogger logger) : RequestHandlerAsync { public override async Task HandleAsync(CreateNewOrder command, CancellationToken cancellationToken = default) { try { string id = Guid.NewGuid().ToString(); logger.LogInformation("Creating a new order: {OrderId}", id); await Task.Delay(10, cancellationToken); // emulando um processo _ = commandProcessor.DepositPost(new OrderPlaced { OrderId = id, Value = command.Value }); if (command.Value % 3 == 0) { throw new InvalidOperationException("invalid value"); } _ = commandProcessor.DepositPost(new OrderPaid { OrderId = id }); return await base.HandleAsync(command, cancellationToken); } catch { logger.LogError("Invalid data"); throw; } } } Configuração do PostgreSQL Para in

Introdução
Com base em nossa exploração anterior do padrão outbox com SQL Server, este artigo adapta o padrão para PostgreSQL, abordando suas limitações. O objetivo é garantir consistência transacional entre atualizações no banco de dados e publicação de mensagens. Utilizaremos .NET 8, Brighter e PostgreSQL para lidar com a criação de pedidos e publicação de eventos em sistemas distribuídos.
Projeto
A ideia principal deste projeto é enviar um comando para criar um pedido. Quando o pedido for criado, serão enviadas duas mensagens: OrderPlaced
e OrderPaid
. Em caso de falha, nenhuma mensagem deve ser enviada.
Requisitos
- .NET 8+
- Podman (ou Docker) para executar contêineres locais:
- PostgreSql
- RabbitMQ
- Conhecimento sobre Brighter e RabbitMQ
- Pacotes NuGet
Mensagens
Para este projeto, precisaremos de três mensagens: CreateNewOrder
, OrderPlaced
e OrderPaid
.
public class CreateNewOrder() : Command(Guid.NewGuid())
{
public decimal Value { get; set; }
}
public class OrderPlaced() : Event(Guid.NewGuid())
{
public string OrderId { get; set; } = string.Empty;
public decimal Value { get; set; }
}
public class OrderPaid() : Event(Guid.NewGuid())
{
public string OrderId { get; set; } = string.Empty;
}
Mapeadores de Mensagens
Como apenas os eventos OrderPlaced
e OrderPaid
são publicados no RabbitMQ, precisamos implementar mapeadores para eles usando serialização JSON.
public class OrderPlacedMapper : IAmAMessageMapper<OrderPlaced>
{
public Message MapToMessage(OrderPlaced request)
{
var header = new MessageHeader();
header.Id = request.Id;
header.TimeStamp = DateTime.UtcNow;
header.Topic = "order-placed";
header.MessageType = MessageType.MT_EVENT;
var body = new MessageBody(JsonSerializer.Serialize(request));
return new Message(header, body);
}
public OrderPlaced MapToRequest(Message message)
{
return JsonSerializer.Deserialize<OrderPlaced>(message.Body.Bytes)!;
}
}
public class OrderPaidMapper : IAmAMessageMapper<OrderPaid>
{
public Message MapToMessage(OrderPaid request)
{
var header = new MessageHeader();
header.Id = request.Id;
header.TimeStamp = DateTime.UtcNow;
header.Topic = "order-paid";
header.MessageType = MessageType.MT_EVENT;
var body = new MessageBody(JsonSerializer.Serialize(request));
return new Message(header, body);
}
public OrderPaid MapToRequest(Message message)
{
return JsonSerializer.Deserialize<OrderPaid>(message.Body.Bytes)!;
}
}
Manipuladores de Requisição
Para OrderPlaced
e OrderPaid
, vamos registrar logs das mensagens recebidas.
public class OrderPlaceHandler(ILogger<OrderPlaceHandler> logger) : RequestHandlerAsync<OrderPlaced>
{
public override Task<OrderPlaced> HandleAsync(OrderPlaced command, CancellationToken cancellationToken = default)
{
logger.LogInformation("{OrderId} placed with value {OrderValue}", command.OrderId, command.Value);
return base.HandleAsync(command, cancellationToken);
}
}
public class OrderPaidHandler(ILogger<OrderPaidHandler> logger) : RequestHandlerAsync<OrderPaid>
{
public override Task<OrderPaid> HandleAsync(OrderPaid command, CancellationToken cancellationToken = default)
{
logger.LogInformation("{OrderId} paid", command.OrderId);
return base.HandleAsync(command, cancellationToken);
}
}
Criar Novo Pedido
O manipulador CreateNewOrder
aguardará 10ms para emular um processo, depois publicará OrderPlaced
. Se o valor for divisível por 3, uma exceção será lançada (simulando um erro de negócio), caso contrário, OrderPaid
será publicado.
public class CreateNewOrderHandler(IAmACommandProcessor commandProcessor,
ILogger<CreateNewOrderHandler> logger) : RequestHandlerAsync<CreateNewOrder>
{
public override async Task<CreateNewOrder> HandleAsync(CreateNewOrder command, CancellationToken cancellationToken = default)
{
try
{
string id = Guid.NewGuid().ToString();
logger.LogInformation("Creating a new order: {OrderId}", id);
await Task.Delay(10, cancellationToken); // emulando um processo
_ = commandProcessor.DepositPost(new OrderPlaced { OrderId = id, Value = command.Value });
if (command.Value % 3 == 0)
{
throw new InvalidOperationException("invalid value");
}
_ = commandProcessor.DepositPost(new OrderPaid { OrderId = id });
return await base.HandleAsync(command, cancellationToken);
}
catch
{
logger.LogError("Invalid data");
throw;
}
}
}
Configuração do PostgreSQL
Para integrar o padrão Outbox com PostgreSQL, primeiro garanta que a tabela OutboxMessages
exista.
1. Esquema da Tabela SQL
CREATE TABLE IF NOT EXISTS "outboxmessages"
(
"id" BIGSERIAL NOT NULL,
"messageid" UUID NOT NULL,
"topic" VARCHAR(255) NULL,
"messagetype" VARCHAR(32) NULL,
"timestamp" TIMESTAMP NULL,
"correlationid" UUID NULL,
"replyto" VARCHAR(255) NULL,
"contenttype" VARCHAR(128) NULL,
"dispatched" TIMESTAMP NULL,
"headerbag" TEXT NULL,
"body" TEXT NULL,
PRIMARY KEY (Id)
);
2. Configuração da Injeção de Dependência
Registre o outbox e a transação.
services
.AddServiceActivator(opt => { /* Configuração de assinatura */ })
.UsePostgreSqlOutbox(new PostgreSqlOutboxConfiguration(ConnectionString, "OutboxMessages"))
.UseOutboxSweeper(opt => opt.BatchSize = 10);
Por Que Isso Funciona:
-
UsePostgreSqlOutbox
vincula o outbox ao PostgreSQL. -
UseOutboxSweeper
configura a verificação periódica para mensagens não entregues.
3. Limitações e Soluções Alternativas
- Operações Síncronas Apenas: O outbox do PostgreSQL no Brighter não suporta métodos assíncronos.
- Transações Separadas: Mensagens e atualizações no banco de dados não compartilham transações, aumentando o risco de inconsistência em falhas (tentei fazer funcionar, mas sempre recebia erros sobre transação abortada).
Melhorias Futuras
A versão 10 do Brighter visa alinhar a implementação do outbox no PostgreSQL com as capacidades do SQL Server, incluindo suporte a assíncrono e garantias transacionais.
Conclusão
Apesar das limitações atuais na integração do PostgreSQL com o Brighter (como falta de suporte assíncrono e consistência transacional), ele ainda é viável para cenários de throughput moderado. Para cargas críticas que exigem garantias robustas, o SQL Server oferece uma implementação madura do padrão outbox com consistência comprovada. Ao entender essas compensações, equipes podem escolher as ferramentas adequadas com base em escalabilidade e confiabilidade.