Implementing the Outbox Pattern with PostgreSQL in Brighter
Introduction Building on our previous exploration of the outbox pattern with SQL Server, this article adapts the pattern to PostgreSQL while addressing its limitations. The goal is to ensure transactional consistency between database updates and message publishing. We’ll use .NET 8, Brighter, and PostgreSQL to handle order creation and event publishing across distributed systems. Project The main idea of this project is to send a command to create an order, when the order is create, it'll send 2 messages OrderPlaced & OrderPaid, in case we have a failure, we shouldn't send any message. Requirement .NET 8+ Podman (or Docker) to run local containers: PostgreSql RabbitMQ Brighter knowledge about RabbitMQ Nuget packages Paramore.Brighter.Extensions.DependencyInjection Paramore.Brighter.Extensions.Hosting Paramore.Brighter.MessagingGateway.RMQ Paramore.Brighter.Outbox.PostgreSql Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection Paramore.Brighter.ServiceActivator.Extensions.Hosting Messages For this project we will need these 3 message: CreateNewOrder, OrderPlaced and OrderPaid public class CreateNewOrder() : Command(Guid.NewGuid()) { public decimal Value { get; set; } } public class OrderPlaced() : Event(Guid.NewGuid()) { public string OrderId { get; set; } = string.Empty; public decimal Value { get; set; } } public class OrderPaid() : Event(Guid.NewGuid()) { public string OrderId { get; set; } = string.Empty; } Message Mappers Since only OrderPlaced and OrderPaid events are published to RabbitMQ, we need to implement mappers for them using JSON serialization public class OrderPlacedMapper : IAmAMessageMapper { public Message MapToMessage(OrderPlaced request) { var header = new MessageHeader(); header.Id = request.Id; header.TimeStamp = DateTime.UtcNow; header.Topic = "order-placed"; header.MessageType = MessageType.MT_EVENT; var body = new MessageBody(JsonSerializer.Serialize(request)); return new Message(header, body); } public OrderPlaced MapToRequest(Message message) { return JsonSerializer.Deserialize(message.Body.Bytes)!; } } public class OrderPaidMapper : IAmAMessageMapper { public Message MapToMessage(OrderPaid request) { var header = new MessageHeader(); header.Id = request.Id; header.TimeStamp = DateTime.UtcNow; header.Topic = "order-paid"; header.MessageType = MessageType.MT_EVENT; var body = new MessageBody(JsonSerializer.Serialize(request)); return new Message(header, body); } public OrderPaid MapToRequest(Message message) { return JsonSerializer.Deserialize(message.Body.Bytes)!; } } Request Handlers For OrderPlaced and OrderPaid we are going to log the received message. public class OrderPlaceHandler(ILogger logger) : RequestHandlerAsync { public override Task HandleAsync(OrderPlaced command, CancellationToken cancellationToken = default) { logger.LogInformation("{OrderId} placed with value {OrderValue}", command.OrderId, command.Value); return base.HandleAsync(command, cancellationToken); } } public class OrderPaidHandler(ILogger logger) : RequestHandlerAsync { public override Task HandleAsync(OrderPaid command, CancellationToken cancellationToken = default) { logger.LogInformation("{OrderId} paid", command.OrderId); return base.HandleAsync(command, cancellationToken); } } Create New Order The CreateNewOrder handler is going to wait for 10ms to emulate a process, then publish the OrderPlaced, if the value is mod 3 throw an exception (emulation a business error), otherwise publish OrderPaid. public class CreateNewOrderHandler(IAmACommandProcessor commandProcessor, ILogger logger) : RequestHandlerAsync { public override async Task HandleAsync(CreateNewOrder command, CancellationToken cancellationToken = default) { try { string id = Guid.NewGuid().ToString(); logger.LogInformation("Creating a new order: {OrderId}", id); await Task.Delay(10, cancellationToken); // emulating an process _ = commandProcessor.DepositPost(new OrderPlaced { OrderId = id, Value = command.Value }); if (command.Value % 3 == 0) { throw new InvalidOperationException("invalid value"); } _ = commandProcessor.DepositPost(new OrderPaid { OrderId = id }); return await base.HandleAsync(command, cancellationToken); } catch { logger.LogError("Invalid data"); throw; } } } Configuring PostgresSql To integrate the Outbox Pattern with PostgresSql, first ensure the OutboxMessages table exists. 1. SQL Table Schem

Introduction
Building on our previous exploration of the outbox pattern with SQL Server, this article adapts the pattern to PostgreSQL while addressing its limitations. The goal is to ensure transactional consistency between database updates and message publishing. We’ll use .NET 8, Brighter, and PostgreSQL to handle order creation and event publishing across distributed systems.
Project
The main idea of this project is to send a command to create an order, when the order is create, it'll send 2 messages OrderPlaced
& OrderPaid
, in case we have a failure, we shouldn't send any message.
Requirement
- .NET 8+
- Podman (or Docker) to run local containers:
- PostgreSql
- RabbitMQ
- Brighter knowledge about RabbitMQ
- Nuget packages
Messages
For this project we will need these 3 message: CreateNewOrder
, OrderPlaced
and OrderPaid
public class CreateNewOrder() : Command(Guid.NewGuid())
{
public decimal Value { get; set; }
}
public class OrderPlaced() : Event(Guid.NewGuid())
{
public string OrderId { get; set; } = string.Empty;
public decimal Value { get; set; }
}
public class OrderPaid() : Event(Guid.NewGuid())
{
public string OrderId { get; set; } = string.Empty;
}
Message Mappers
Since only OrderPlaced
and OrderPaid
events are published to RabbitMQ, we need to implement mappers for them using JSON serialization
public class OrderPlacedMapper : IAmAMessageMapper<OrderPlaced>
{
public Message MapToMessage(OrderPlaced request)
{
var header = new MessageHeader();
header.Id = request.Id;
header.TimeStamp = DateTime.UtcNow;
header.Topic = "order-placed";
header.MessageType = MessageType.MT_EVENT;
var body = new MessageBody(JsonSerializer.Serialize(request));
return new Message(header, body);
}
public OrderPlaced MapToRequest(Message message)
{
return JsonSerializer.Deserialize<OrderPlaced>(message.Body.Bytes)!;
}
}
public class OrderPaidMapper : IAmAMessageMapper<OrderPaid>
{
public Message MapToMessage(OrderPaid request)
{
var header = new MessageHeader();
header.Id = request.Id;
header.TimeStamp = DateTime.UtcNow;
header.Topic = "order-paid";
header.MessageType = MessageType.MT_EVENT;
var body = new MessageBody(JsonSerializer.Serialize(request));
return new Message(header, body);
}
public OrderPaid MapToRequest(Message message)
{
return JsonSerializer.Deserialize<OrderPaid>(message.Body.Bytes)!;
}
}
Request Handlers
For OrderPlaced
and OrderPaid
we are going to log the received message.
public class OrderPlaceHandler(ILogger<OrderPlaceHandler> logger) : RequestHandlerAsync<OrderPlaced>
{
public override Task<OrderPlaced> HandleAsync(OrderPlaced command, CancellationToken cancellationToken = default)
{
logger.LogInformation("{OrderId} placed with value {OrderValue}", command.OrderId, command.Value);
return base.HandleAsync(command, cancellationToken);
}
}
public class OrderPaidHandler(ILogger<OrderPaidHandler> logger) : RequestHandlerAsync<OrderPaid>
{
public override Task<OrderPaid> HandleAsync(OrderPaid command, CancellationToken cancellationToken = default)
{
logger.LogInformation("{OrderId} paid", command.OrderId);
return base.HandleAsync(command, cancellationToken);
}
}
Create New Order
The CreateNewOrder
handler is going to wait for 10ms to emulate a process, then publish the OrderPlaced
, if the value is mod 3 throw an exception (emulation a business error), otherwise publish OrderPaid
.
public class CreateNewOrderHandler(IAmACommandProcessor commandProcessor,
ILogger<CreateNewOrderHandler> logger) : RequestHandlerAsync<CreateNewOrder>
{
public override async Task<CreateNewOrder> HandleAsync(CreateNewOrder command, CancellationToken cancellationToken = default)
{
try
{
string id = Guid.NewGuid().ToString();
logger.LogInformation("Creating a new order: {OrderId}", id);
await Task.Delay(10, cancellationToken); // emulating an process
_ = commandProcessor.DepositPost(new OrderPlaced { OrderId = id, Value = command.Value });
if (command.Value % 3 == 0)
{
throw new InvalidOperationException("invalid value");
}
_ = commandProcessor.DepositPost(new OrderPaid { OrderId = id });
return await base.HandleAsync(command, cancellationToken);
}
catch
{
logger.LogError("Invalid data");
throw;
}
}
}
Configuring PostgresSql
To integrate the Outbox Pattern with PostgresSql, first ensure the OutboxMessages
table exists.
1. SQL Table Schema
CREATE TABLE IF NOT EXISTS "outboxmessages"
(
"id" BIGSERIAL NOT NULL,
"messageid" UUID NOT NULL,
"topic" VARCHAR(255) NULL,
"messagetype" VARCHAR(32) NULL,
"timestamp" TIMESTAMP NULL,
"correlationid" UUID NULL,
"replyto" VARCHAR(255) NULL,
"contenttype" VARCHAR(128) NULL,
"dispatched" TIMESTAMP NULL,
"headerbag" TEXT NULL,
"body" TEXT NULL,
PRIMARY KEY (Id)
);
2. Dependency Injection Setup
Register the outbox and transaction.
services
.AddServiceActivator(opt => { // Subscription setup (see previous article) })
.UsePostgreSqlOutbox(new PostgreSqlOutboxConfiguration(ConnectionString, "OutboxMessages"))
.UseOutboxSweeper(opt => opt.BatchSize = 10);
Why This Works:
-
UsePostgreSqlOutbox
links the outbox to PostgreSql. -
UseOutboxSweeper
configures background polling for undelivered messages.
3. Limitations and Workarounds
- Sync-Only Operations: Brighter’s PostgreSQL outbox does not support async methods.
- Transaction Barriers: Messages and database updates cannot share transactions, increasing the risk of inconsistency during failures(I tried to make it work, but I was always getting error about transaction was aborted).
Future Improvements
Brighter v10 aims to align PostgreSQL’s outbox implementation with SQL Server’s capabilities, including async support and transactional guarantees.
Conclusion
While PostgreSQL’s current Brighter integration has limitations(such as the lack of async support and transactional consistency) it remains viable for scenarios with moderate throughput. For critical workloads requiring robust transactional guarantees, SQL Server offers a mature implementation of the outbox pattern with proven consistency. By understanding these trade-offs, teams can select the appropriate tools based on their scalability and reliability requirements.