Brighter and Kafka: How to setup and use Brighter with Apache Kafka
This is the first article in a series where I’ll show how to connect and configure systems using Brighter. In this article, we’ll focus on integrating Brighter with Apache Kafka . Quick Introduction to Kafka Kafka is a distributed streaming platform designed for high-throughput, real-time message processing. Key concepts include: Streams: Process messages individually, with concurrency limited by the number of topic partitions. Core Components: Topic: A category/feed to which messages are published. Partition: A shard of a topic that enables parallel processing. Consumer Group: A set of consumers that collaboratively process messages from a topic. To integrate Kafka with Brighter, you’ll need: Topic Name: The target Kafka topic (e.g., greeting.topic). Number of Partitions: Determines concurrency limits. Consumer Group ID: Ensures message distribution among consumers. Requirement .NET 8 or superior A .NET project with these NuGet packages Paramore.Brighter.MessagingGateway.Kafka: Enables Kafka integration. Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection: Simplifies dependency injection. Paramore.Brighter.ServiceActivator.Extensions.Hosting: Hosts Brighter as a background service. Serilog.AspNetCore: For structured logging. Docker/podman: For local Kafka setup. Local Kafka Setup with Docker/Podman Use this docker-compose.yml to spin up Kafka, Zookeeper, and a UI: services: zookeeper: image: confluentinc/cp-zookeeper ports: - "2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: image: confluentinc/cp-kafka depends_on: - zookeeper healthcheck: test: kafka-topics --bootstrap-server kafka:29092 --list || exit 1 interval: 10s timeout: 10s retries: 5 ports: - "9092:9092" - "29092:29092" - "9997:9997" expose: - "29092" - "9092" environment: KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1" KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT" KAFKA_MIN_INSYNC_REPLICAS: "1" kafka-ui: image: provectuslabs/kafka-ui container_name: kafka-ui depends_on: - kafka ports: - "8088:8080" environment: KAFKA_CLUSTERS_0_NAME: kafka KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 KAFKA_CLUSTERS_0_METRICS_PORT: 9997 DYNAMIC_CONFIG_ENABLED: "true" Steps: Run podman-compose -f docker-compose.yml up -d (or docker-compose). Access the Kafka UI at http://localhost:8088 . Note: This uses PLAINTEXT for simplicity. Use SSL/SASL_SSL in production. Brighter Recap Before continue about Kafka configuration, let's recap what we already know about Brighter. Request (Command/Event) Brighter uses IRequest to mark objects for processing. Extend Command or Event: public class Greeting() : Event(Guid.NewGuid()) { public string Name { get; set; } = string.Empty; } Message Mapper Translates between Brighter requests and Kafka messages: public class GreetingMapper : IAmAMessageMapper { public Message MapToMessage(Greeting request) { var header = new MessageHeader(); header.Id = request.Id; header.TimeStamp = DateTime.UtcNow; header.Topic = "greeting.topic"; // The target topic to be publish header.MessageType = MessageType.MT_EVENT; var body = new MessageBody(JsonSerializer.Serialize(request)); return new Message(header, body); } public Greeting MapToRequest(Message message) { return JsonSerializer.Deserialize(message.Body.Bytes)!; } } Request Handler Processes incoming messages: public class GreetingHandler(ILogger logger) : RequestHandler { public override Greeting Handle(Greeting command) { logger.LogInformation("Hello {Name}", command.Name); return base.Handle(command); } } Configuring Kafka with Brighter Kafka Connection Define connection settings: var connection = new KafkaMessagingGatewayConfiguration { Name = "sample", // Application Name BootStrapServers = ["localhost:9092"], // Broker address SecurityProtocol = SecurityProtocol.Plaintext, // Use SSL in prod SaslMechanisms = SaslMechanism.Plain, SaslUsername = "admin", // For SASL authentication SaslPassword = "admin-secret" }; Kafka Consumer Configure subscriptions and channels: .AddServiceActivator(opt => { opt.Subscriptions = [ new KafkaSubscription( new SubscriptionName("kafka.greeting.subscription"), // The subscription name, it's used internal only, so you can put whatevery you want new ChannelNa

This is the first article in a series where I’ll show how to connect and configure systems using Brighter. In this article, we’ll focus on integrating Brighter with Apache Kafka .
Quick Introduction to Kafka
Kafka is a distributed streaming platform designed for high-throughput, real-time message processing. Key concepts include:
- Streams: Process messages individually, with concurrency limited by the number of topic partitions.
-
Core Components:
- Topic: A category/feed to which messages are published.
- Partition: A shard of a topic that enables parallel processing.
- Consumer Group: A set of consumers that collaboratively process messages from a topic.
To integrate Kafka with Brighter, you’ll need:
Topic Name: The target Kafka topic (e.g., greeting.topic).
Number of Partitions: Determines concurrency limits.
Consumer Group ID: Ensures message distribution among consumers.
Requirement
- .NET 8 or superior
- A .NET project with these NuGet packages
- Paramore.Brighter.MessagingGateway.Kafka: Enables Kafka integration.
- Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection: Simplifies dependency injection.
- Paramore.Brighter.ServiceActivator.Extensions.Hosting: Hosts Brighter as a background service.
- Serilog.AspNetCore: For structured logging.
- Docker/podman: For local Kafka setup.
Local Kafka Setup with Docker/Podman
Use this docker-compose.yml
to spin up Kafka, Zookeeper, and a UI:
services:
zookeeper:
image: confluentinc/cp-zookeeper
ports:
- "2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka
depends_on:
- zookeeper
healthcheck:
test: kafka-topics --bootstrap-server kafka:29092 --list || exit 1
interval: 10s
timeout: 10s
retries: 5
ports:
- "9092:9092"
- "29092:29092"
- "9997:9997"
expose:
- "29092"
- "9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
KAFKA_MIN_INSYNC_REPLICAS: "1"
kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
depends_on:
- kafka
ports:
- "8088:8080"
environment:
KAFKA_CLUSTERS_0_NAME: kafka
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
DYNAMIC_CONFIG_ENABLED: "true"
Steps:
- Run podman-compose -f docker-compose.yml up -d (or docker-compose).
- Access the Kafka UI at http://localhost:8088 .
Note: This uses PLAINTEXT
for simplicity. Use SSL
/SASL_SSL
in production.
Brighter Recap
Before continue about Kafka configuration, let's recap what we already know about Brighter.
Request (Command/Event)
Brighter uses IRequest to mark objects for processing. Extend Command or Event:
public class Greeting() : Event(Guid.NewGuid())
{
public string Name { get; set; } = string.Empty;
}
Message Mapper
Translates between Brighter requests and Kafka messages:
public class GreetingMapper : IAmAMessageMapper<Greeting>
{
public Message MapToMessage(Greeting request)
{
var header = new MessageHeader();
header.Id = request.Id;
header.TimeStamp = DateTime.UtcNow;
header.Topic = "greeting.topic"; // The target topic to be publish
header.MessageType = MessageType.MT_EVENT;
var body = new MessageBody(JsonSerializer.Serialize(request));
return new Message(header, body);
}
public Greeting MapToRequest(Message message)
{
return JsonSerializer.Deserialize<Greeting>(message.Body.Bytes)!;
}
}
Request Handler
Processes incoming messages:
public class GreetingHandler(ILogger<GreetingHandler> logger) : RequestHandler<Greeting>
{
public override Greeting Handle(Greeting command)
{
logger.LogInformation("Hello {Name}", command.Name);
return base.Handle(command);
}
}
Configuring Kafka with Brighter
Kafka Connection
Define connection settings:
var connection = new KafkaMessagingGatewayConfiguration
{
Name = "sample", // Application Name
BootStrapServers = ["localhost:9092"], // Broker address
SecurityProtocol = SecurityProtocol.Plaintext, // Use SSL in prod
SaslMechanisms = SaslMechanism.Plain,
SaslUsername = "admin", // For SASL authentication
SaslPassword = "admin-secret"
};
Kafka Consumer
Configure subscriptions and channels:
.AddServiceActivator(opt =>
{
opt.Subscriptions =
[
new KafkaSubscription<Greeting>(
new SubscriptionName("kafka.greeting.subscription"), // The subscription name, it's used internal only, so you can put whatevery you want
new ChannelName("greeting.topic"), // The topic name
new RoutingKey("greeting.topic"), // The topic name
groupId: "some-consumer-group", // The Kafka Group ID
makeChannels: OnMissingChannel.Create, // Tell to Brighter what to do when the topic not exists
numOfPartitions: 2, // The number of topic partition, it's only useful when you want to create Kafka topic via code
noOfPerformers: 2, // The number of subscription running in parallel, it doesn't make sense to be bigger than the number of partition
isAsync: false, // If you want to use RequestHandlerAsync
),
];
opt.ChannelFactory = new ChannelFactory(new KafkaMessageConsumerFactory(connection));
})
Kafka Producer
Set up the external bus for publishing:
.UseExternalBus(new KafkaProducerRegistryFactory(connection,
[
new KafkaPublication
{
MakeChannels = OnMissingChannel.Create,
NumPartitions = 2, // The number of topic partition, it's only useful when you want to create Kafka topic via code
Topic = new RoutingKey("greeting.topic"), // The topic name, and you should use this topic when you are mapping an object to message
},
]).Create()
);
Conclusion
Integrating Brighter with Kafka simplifies building scalable, message-driven systems.
Reference
- Full code: GitHub Repository