Brighter e Kafka: Como Configurar e utilizar o Framework com o Apache Kafka

Este é o primeiro artigo de uma série onde mostrarei como conectar e configurar sistemas usando o Brighter. Neste artigo, focaremos na integração do Brighter com o Apache Kafka. Introdução Rápida ao Kafka O Kafka é uma plataforma distribuída de processamento de streaming projetada para alta taxa de transferência e processamento de mensagens em tempo real. Conceitos-chave incluem: Fluxos: Processamento de mensagens individualmente, com concorrência limitada pelo número de partições do tópico. Componentes Principais: Tópico (Topic): Uma categoria/canal para o qual mensagens são publicadas. Partição (Partition): Uma divisão de um tópico que permite processamento paralelo. Grupo de Consumidores (Consumer Group): Conjunto de consumidores que processam mensagens de um tópico de forma colaborativa. Para integrar o Kafka com o Brighter, você precisará: Nome do Tópico: O tópico Kafka alvo (ex: greeting.topic). Número de Partições: Define os limites de concorrência. Consumer Group ID: Garante a distribuição de mensagens entre os consumidores. Requisitos .NET 8 ou superior. Um projeto .NET com esses pacotes NuGet: Paramore.Brighter.MessagingGateway.Kafka: Permite integração com Kafka. Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection: Simplifica a injeção de dependência. Paramore.Brighter.ServiceActivator.Extensions.Hosting: Hospeda o Brighter como serviço em segundo plano. Serilog.AspNetCore: Para registro estruturado. Docker/podman: Para configuração local do Kafka. Configuração Local do Kafka com Docker/Podman Use esse docker-compose.yml para inicializar o Kafka, Zookeeper e uma interface: services: zookeeper: image: confluentinc/cp-zookeeper ports: - "2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: image: confluentinc/cp-kafka depends_on: - zookeeper healthcheck: test: kafka-topics --bootstrap-server kafka:29092 --list || exit 1 interval: 10s timeout: 10s retries: 5 ports: - "9092:9092" - "29092:29092" - "9997:9997" expose: - "29092" - "9092" environment: KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1" KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT" KAFKA_MIN_INSYNC_REPLICAS: "1" kafka-ui: image: provectuslabs/kafka-ui container_name: kafka-ui depends_on: - kafka ports: - "8088:8080" environment: KAFKA_CLUSTERS_0_NAME: kafka KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 KAFKA_CLUSTERS_0_METRICS_PORT: 9997 DYNAMIC_CONFIG_ENABLED: "true" Passos: Execute podman-compose -f docker-compose.yml up -d (ou docker-compose). Acesse a interface do Kafka em http://localhost:8088. Nota: Este exemplo usa PLAINTEXT para simplicidade. Use SSL/SASL_SSL em produção. Recapitulação do Brighter Antes de prosseguir com a configuração do Kafka, vamos recapitular o que já sabemos sobre o Brighter. Request (Comando/Evento) O Brighter usa IRequest para marcar objetos para processamento. Herde de Command ou Event: public class Greeting : Event(Guid.NewGuid()) { public string Name { get; set; } = string.Empty; } Mapeador de Mensagens Traduz entre request do Brighter e mensagens do Kafka: public class GreetingMapper : IAmAMessageMapper { public Message MapToMessage(Greeting request) { var header = new MessageHeader(); header.Id = request.Id; header.TimeStamp = DateTime.UtcNow; header.Topic = "greeting.topic"; // Tópico de destino para publicação header.MessageType = MessageType.MT_EVENT; var body = new MessageBody(JsonSerializer.Serialize(request)); return new Message(header, body); } public Greeting MapToRequest(Message message) { return JsonSerializer.Deserialize(message.Body.Bytes)!; } } Request Handler Processa mensagens recebidas: public class GreetingHandler(ILogger logger) : RequestHandler { public override Greeting Handle(Greeting command) { logger.LogInformation("Hello {Name}", command.Name); return base.Handle(command); } } Configurando o Kafka com o Brighter Conexão com o Kafka Defina as configurações de conexão: var connection = new KafkaMessagingGatewayConfiguration { Name = "sample", // Nome da aplicação BootStrapServers = ["localhost:9092"], // Endereço do broker SecurityProtocol = SecurityProtocol.Plaintext, // Use SSL em produção SaslMechanisms = SaslMechanism.Plain, SaslUsername = "admin", // Para autenticação SASL SaslPassword = "admin-secret" }; Consumidor Kafka Configu

Apr 8, 2025 - 09:12
 0
Brighter e Kafka: Como Configurar e utilizar o Framework com o Apache Kafka

Este é o primeiro artigo de uma série onde mostrarei como conectar e configurar sistemas usando o Brighter. Neste artigo, focaremos na integração do Brighter com o Apache Kafka.

Introdução Rápida ao Kafka

O Kafka é uma plataforma distribuída de processamento de streaming projetada para alta taxa de transferência e processamento de mensagens em tempo real. Conceitos-chave incluem:

  • Fluxos: Processamento de mensagens individualmente, com concorrência limitada pelo número de partições do tópico.
  • Componentes Principais:
    • Tópico (Topic): Uma categoria/canal para o qual mensagens são publicadas.
    • Partição (Partition): Uma divisão de um tópico que permite processamento paralelo.
    • Grupo de Consumidores (Consumer Group): Conjunto de consumidores que processam mensagens de um tópico de forma colaborativa.

Para integrar o Kafka com o Brighter, você precisará:
Nome do Tópico: O tópico Kafka alvo (ex: greeting.topic).
Número de Partições: Define os limites de concorrência.
Consumer Group ID: Garante a distribuição de mensagens entre os consumidores.

Requisitos

  1. .NET 8 ou superior.
  2. Um projeto .NET com esses pacotes NuGet:
  3. Docker/podman: Para configuração local do Kafka.

Configuração Local do Kafka com Docker/Podman

Use esse docker-compose.yml para inicializar o Kafka, Zookeeper e uma interface:

services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    ports:
      - "2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka
    depends_on:
      - zookeeper
    healthcheck:
      test: kafka-topics --bootstrap-server kafka:29092 --list || exit 1
      interval: 10s
      timeout: 10s
      retries: 5
    ports:
      - "9092:9092"
      - "29092:29092"
      - "9997:9997"
    expose:
      - "29092"
      - "9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
      KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
      KAFKA_MIN_INSYNC_REPLICAS: "1"

  kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    depends_on:
      - kafka
    ports:
      - "8088:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: kafka
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_METRICS_PORT: 9997
      DYNAMIC_CONFIG_ENABLED: "true"

Passos:

  1. Execute podman-compose -f docker-compose.yml up -d (ou docker-compose).
  2. Acesse a interface do Kafka em http://localhost:8088.

Nota: Este exemplo usa PLAINTEXT para simplicidade. Use SSL/SASL_SSL em produção.

Recapitulação do Brighter

Antes de prosseguir com a configuração do Kafka, vamos recapitular o que já sabemos sobre o Brighter.

Request (Comando/Evento)

O Brighter usa IRequest para marcar objetos para processamento. Herde de Command ou Event:

public class Greeting : Event(Guid.NewGuid())
{
    public string Name { get; set; } = string.Empty;
}

Mapeador de Mensagens

Traduz entre request do Brighter e mensagens do Kafka:

public class GreetingMapper : IAmAMessageMapper<Greeting>
{
    public Message MapToMessage(Greeting request)
    {
        var header = new MessageHeader();
        header.Id = request.Id; 
        header.TimeStamp = DateTime.UtcNow;
        header.Topic = "greeting.topic"; // Tópico de destino para publicação
        header.MessageType = MessageType.MT_EVENT;

        var body = new MessageBody(JsonSerializer.Serialize(request));
        return new Message(header, body);
    }

    public Greeting MapToRequest(Message message)
    {
        return JsonSerializer.Deserialize<Greeting>(message.Body.Bytes)!;
    }
}

Request Handler

Processa mensagens recebidas:

public class GreetingHandler(ILogger<GreetingHandler> logger) : RequestHandler<Greeting>
{
    public override Greeting Handle(Greeting command)
    {
        logger.LogInformation("Hello {Name}", command.Name);
        return base.Handle(command);
    }
}

Configurando o Kafka com o Brighter

Conexão com o Kafka

Defina as configurações de conexão:

var connection = new KafkaMessagingGatewayConfiguration
{
    Name = "sample", // Nome da aplicação
    BootStrapServers = ["localhost:9092"], // Endereço do broker
    SecurityProtocol = SecurityProtocol.Plaintext, // Use SSL em produção
    SaslMechanisms = SaslMechanism.Plain,
    SaslUsername = "admin", // Para autenticação SASL
    SaslPassword = "admin-secret"
};

Consumidor Kafka

Configure Subscription e Channel:

.AddServiceActivator(opt =>
{
    opt.Subscriptions =
    [
        new KafkaSubscription<Greeting>(
            new SubscriptionName("kafka.greeting.subscription"), // Nome da assinatura (interno)
            new ChannelName("greeting.topic"), // Nome do tópico
            new RoutingKey("greeting.topic"),  // Nome do tópico
            groupId: "some-consumer-group", // ID do grupo de consumidores
            makeChannels: OnMissingChannel.Create, // Ação ao verificar se tópico existe
            numOfPartitions: 2, // Número de partições do tópico (útil ao criar via código)
            noOfPerformers: 2, // Número de subscription em paralelo (não deve ultrapassar o número de partições)
            isAsync: false, // true quando quiser RequestHandlerAsync
        ),
    ];

    opt.ChannelFactory = new ChannelFactory(new KafkaMessageConsumerFactory(connection));
})

Produtor Kafka

Configure o external bus para publicação:

.UseExternalBus(new KafkaProducerRegistryFactory(connection,
 [
     new KafkaPublication
     {
         MakeChannels = OnMissingChannel.Create,
         NumPartitions = 2, // Número de partições do tópico (útil ao criar via código)
         Topic = new RoutingKey("greeting.topic"), // Nome do tópico (deve ser o mesmo usado no mapeamento)
     },
 ]).Create()
);

Conclusão

A integração do Brighter com o Kafka simplifica a construção de sistemas escaláveis e orientados a mensagens.

Referências