Connecting RDBs and Search Engines — Chapter 4 Part 1

Chapter 4 (Part 1): Outputting Kafka CDC Data to Console with Flink In this chapter, we will process the CDC data delivered to Kafka using Flink SQL and print the output to the console. Before persisting to OpenSearch, we visually verify that Flink is correctly consuming and processing the data from Kafka. 1. Prerequisites Ensure the following components are already running: PostgreSQL Apache Kafka Kafka Connect ZooKeeper Flink Refer to Chapter 3 for details on setting up the Debezium → Kafka pipeline. 2. Architecture Overview graph TD subgraph source PG[PostgreSQL] end subgraph Change Data Capture DBZ[Debezium Connect] end subgraph stream platform TOPIC[Kafka Topic] end subgraph stream processing FLINK[Flink SQL Kafka Source] PRINT[Flink Print Sink Console] end PG --> DBZ --> TOPIC --> FLINK --> PRINT We verify that CDC events flow from Kafka to Flink and appear in the standard output in a format like +I[...]. 3. Add Kafka Connector to Flink Add the Kafka SQL connector JAR to Flink: flink-sql-connector-kafka-3.3.0-1.19.jar ⚠️ To avoid interference with Flink core libraries, place the JAR in flink/lib/ext/ and copy it into /opt/flink/lib/ at startup. docker-compose.yaml Excerpt flink-jobmanager: image: flink:1.19 command: ["/bin/bash", "/jobmanager-entrypoint.sh"] volumes: - ./flink/sql:/opt/flink/sql - ./flink/lib/ext:/opt/flink/lib/ext - ./flink/jobmanager-entrypoint.sh:/jobmanager-entrypoint.sh jobmanager-entrypoint.sh Example #!/bin/bash set -e cp /opt/flink/lib/ext/*.jar /opt/flink/lib/ exec /docker-entrypoint.sh jobmanager Apply a similar setup to TaskManager to copy the JAR. 4. Write Flink SQL Script Save the following SQL in flink/sql/cdc_to_console.sql: CREATE TABLE cdc_source ( id INT, message STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'dbserver1.public.testtable', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'debezium-json', 'scan.startup.mode' = 'earliest-offset' ); CREATE TABLE print_sink ( id INT, message STRING ) WITH ( 'connector' = 'print' ); INSERT INTO print_sink SELECT * FROM cdc_source; Explanation of Flink SQL Tables Kafka Source Table: cdc_source Property Description connector = 'kafka' Reads data from a Kafka topic format = 'debezium-json' Handles JSON messages in Debezium format scan.startup.mode = 'earliest-offset' Reads from the earliest offset PRIMARY KEY (...) NOT ENFORCED Defines a primary key without enforcement Console Sink Table: print_sink Uses Flink's internal print connector to write output to stdout. 5. Run the Flink SQL Job docker compose exec flink-jobmanager bash sql-client.sh -f /opt/flink/sql/cdc_to_console.sql Verify Running Jobs docker compose exec flink-jobmanager bash flink list Expected output: ------------------ Running/Restarting Jobs ------------------- : insert-into_default_catalog.default_database.print_sink (RUNNING) 6. Check the Output docker compose logs flink-taskmanager Expected output: flink-taskmanager-1 | +I[1, CDC test row] Why TaskManager? The print sink outputs to the logs of the TaskManager running the job. Use docker compose logs flink-taskmanager to view the output. 7. Troubleshooting SQL Client Doesn’t Start Check if JobManager is running: docker compose logs flink-jobmanager No CDC Output Appears Is the Kafka topic name correct? Is there data in the Kafka topic? Is scan.startup.mode set to earliest-offset? Check topic content: kafka-console-consumer --bootstrap-server localhost:9092 \ --topic dbserver1.public.testtable \ --from-beginning Bonus: Observing Parallelism If you scale Flink to use multiple TaskManagers, you’ll see output distributed across their logs. This allows you to observe parallel execution, slot allocation, and subtask distribution. In this chapter, we confirmed the flow from Kafka → Flink → console output. Next, we will write the results to OpenSearch for persistence. (Coming soon: Chapter 4 Part 2 — Integrating Kafka CDC Data with OpenSearch Using Flink)

May 10, 2025 - 23:58
 0
Connecting RDBs and Search Engines — Chapter 4 Part 1

Chapter 4 (Part 1): Outputting Kafka CDC Data to Console with Flink

In this chapter, we will process the CDC data delivered to Kafka using Flink SQL and print the output to the console. Before persisting to OpenSearch, we visually verify that Flink is correctly consuming and processing the data from Kafka.

1. Prerequisites

Ensure the following components are already running:

  • PostgreSQL
  • Apache Kafka
  • Kafka Connect
  • ZooKeeper
  • Flink

Refer to Chapter 3 for details on setting up the Debezium → Kafka pipeline.

2. Architecture Overview

graph TD
  subgraph source
    PG[PostgreSQL]
  end
  subgraph Change Data Capture
    DBZ[Debezium Connect]
  end
  subgraph stream platform
    TOPIC[Kafka Topic]
  end
  subgraph stream processing
    FLINK[Flink SQL Kafka Source]
    PRINT[Flink Print Sink Console]
  end

  PG --> DBZ --> TOPIC --> FLINK --> PRINT

We verify that CDC events flow from Kafka to Flink and appear in the standard output in a format like +I[...].

3. Add Kafka Connector to Flink

Add the Kafka SQL connector JAR to Flink:

  • flink-sql-connector-kafka-3.3.0-1.19.jar

⚠️ To avoid interference with Flink core libraries, place the JAR in flink/lib/ext/ and copy it into /opt/flink/lib/ at startup.

docker-compose.yaml Excerpt

flink-jobmanager:
  image: flink:1.19
  command: ["/bin/bash", "/jobmanager-entrypoint.sh"]
  volumes:
    - ./flink/sql:/opt/flink/sql
    - ./flink/lib/ext:/opt/flink/lib/ext
    - ./flink/jobmanager-entrypoint.sh:/jobmanager-entrypoint.sh

jobmanager-entrypoint.sh Example

#!/bin/bash
set -e
cp /opt/flink/lib/ext/*.jar /opt/flink/lib/
exec /docker-entrypoint.sh jobmanager

Apply a similar setup to TaskManager to copy the JAR.

4. Write Flink SQL Script

Save the following SQL in flink/sql/cdc_to_console.sql:

CREATE TABLE cdc_source (
  id INT,
  message STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'topic' = 'dbserver1.public.testtable',
  'properties.bootstrap.servers' = 'kafka:9092',
  'format' = 'debezium-json',
  'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE print_sink (
  id INT,
  message STRING
) WITH (
  'connector' = 'print'
);

INSERT INTO print_sink
SELECT * FROM cdc_source;

Explanation of Flink SQL Tables

Kafka Source Table: cdc_source

Property Description
connector = 'kafka' Reads data from a Kafka topic
format = 'debezium-json' Handles JSON messages in Debezium format
scan.startup.mode = 'earliest-offset' Reads from the earliest offset
PRIMARY KEY (...) NOT ENFORCED Defines a primary key without enforcement

Console Sink Table: print_sink

  • Uses Flink's internal print connector to write output to stdout.

5. Run the Flink SQL Job

docker compose exec flink-jobmanager bash
sql-client.sh -f /opt/flink/sql/cdc_to_console.sql

Verify Running Jobs

docker compose exec flink-jobmanager bash
flink list

Expected output:

------------------ Running/Restarting Jobs -------------------
 : insert-into_default_catalog.default_database.print_sink (RUNNING)

6. Check the Output

docker compose logs flink-taskmanager

Expected output:

flink-taskmanager-1 | +I[1, CDC test row]

Why TaskManager?

  • The print sink outputs to the logs of the TaskManager running the job.
  • Use docker compose logs flink-taskmanager to view the output.

7. Troubleshooting

SQL Client Doesn’t Start

  • Check if JobManager is running:
docker compose logs flink-jobmanager

No CDC Output Appears

  • Is the Kafka topic name correct?
  • Is there data in the Kafka topic?
  • Is scan.startup.mode set to earliest-offset?

Check topic content:

kafka-console-consumer --bootstrap-server localhost:9092 \
  --topic dbserver1.public.testtable \
  --from-beginning

Bonus: Observing Parallelism

If you scale Flink to use multiple TaskManagers, you’ll see output distributed across their logs.

This allows you to observe parallel execution, slot allocation, and subtask distribution.

In this chapter, we confirmed the flow from Kafka → Flink → console output. Next, we will write the results to OpenSearch for persistence.

(Coming soon: Chapter 4 Part 2 — Integrating Kafka CDC Data with OpenSearch Using Flink)