Connecting RDBs and Search Engines — Chapter 4 Part 1
Chapter 4 (Part 1): Outputting Kafka CDC Data to Console with Flink In this chapter, we will process the CDC data delivered to Kafka using Flink SQL and print the output to the console. Before persisting to OpenSearch, we visually verify that Flink is correctly consuming and processing the data from Kafka. 1. Prerequisites Ensure the following components are already running: PostgreSQL Apache Kafka Kafka Connect ZooKeeper Flink Refer to Chapter 3 for details on setting up the Debezium → Kafka pipeline. 2. Architecture Overview graph TD subgraph source PG[PostgreSQL] end subgraph Change Data Capture DBZ[Debezium Connect] end subgraph stream platform TOPIC[Kafka Topic] end subgraph stream processing FLINK[Flink SQL Kafka Source] PRINT[Flink Print Sink Console] end PG --> DBZ --> TOPIC --> FLINK --> PRINT We verify that CDC events flow from Kafka to Flink and appear in the standard output in a format like +I[...]. 3. Add Kafka Connector to Flink Add the Kafka SQL connector JAR to Flink: flink-sql-connector-kafka-3.3.0-1.19.jar ⚠️ To avoid interference with Flink core libraries, place the JAR in flink/lib/ext/ and copy it into /opt/flink/lib/ at startup. docker-compose.yaml Excerpt flink-jobmanager: image: flink:1.19 command: ["/bin/bash", "/jobmanager-entrypoint.sh"] volumes: - ./flink/sql:/opt/flink/sql - ./flink/lib/ext:/opt/flink/lib/ext - ./flink/jobmanager-entrypoint.sh:/jobmanager-entrypoint.sh jobmanager-entrypoint.sh Example #!/bin/bash set -e cp /opt/flink/lib/ext/*.jar /opt/flink/lib/ exec /docker-entrypoint.sh jobmanager Apply a similar setup to TaskManager to copy the JAR. 4. Write Flink SQL Script Save the following SQL in flink/sql/cdc_to_console.sql: CREATE TABLE cdc_source ( id INT, message STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'dbserver1.public.testtable', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'debezium-json', 'scan.startup.mode' = 'earliest-offset' ); CREATE TABLE print_sink ( id INT, message STRING ) WITH ( 'connector' = 'print' ); INSERT INTO print_sink SELECT * FROM cdc_source; Explanation of Flink SQL Tables Kafka Source Table: cdc_source Property Description connector = 'kafka' Reads data from a Kafka topic format = 'debezium-json' Handles JSON messages in Debezium format scan.startup.mode = 'earliest-offset' Reads from the earliest offset PRIMARY KEY (...) NOT ENFORCED Defines a primary key without enforcement Console Sink Table: print_sink Uses Flink's internal print connector to write output to stdout. 5. Run the Flink SQL Job docker compose exec flink-jobmanager bash sql-client.sh -f /opt/flink/sql/cdc_to_console.sql Verify Running Jobs docker compose exec flink-jobmanager bash flink list Expected output: ------------------ Running/Restarting Jobs ------------------- : insert-into_default_catalog.default_database.print_sink (RUNNING) 6. Check the Output docker compose logs flink-taskmanager Expected output: flink-taskmanager-1 | +I[1, CDC test row] Why TaskManager? The print sink outputs to the logs of the TaskManager running the job. Use docker compose logs flink-taskmanager to view the output. 7. Troubleshooting SQL Client Doesn’t Start Check if JobManager is running: docker compose logs flink-jobmanager No CDC Output Appears Is the Kafka topic name correct? Is there data in the Kafka topic? Is scan.startup.mode set to earliest-offset? Check topic content: kafka-console-consumer --bootstrap-server localhost:9092 \ --topic dbserver1.public.testtable \ --from-beginning Bonus: Observing Parallelism If you scale Flink to use multiple TaskManagers, you’ll see output distributed across their logs. This allows you to observe parallel execution, slot allocation, and subtask distribution. In this chapter, we confirmed the flow from Kafka → Flink → console output. Next, we will write the results to OpenSearch for persistence. (Coming soon: Chapter 4 Part 2 — Integrating Kafka CDC Data with OpenSearch Using Flink)

Chapter 4 (Part 1): Outputting Kafka CDC Data to Console with Flink
In this chapter, we will process the CDC data delivered to Kafka using Flink SQL and print the output to the console. Before persisting to OpenSearch, we visually verify that Flink is correctly consuming and processing the data from Kafka.
1. Prerequisites
Ensure the following components are already running:
- PostgreSQL
- Apache Kafka
- Kafka Connect
- ZooKeeper
- Flink
Refer to Chapter 3 for details on setting up the Debezium → Kafka pipeline.
2. Architecture Overview
graph TD
subgraph source
PG[PostgreSQL]
end
subgraph Change Data Capture
DBZ[Debezium Connect]
end
subgraph stream platform
TOPIC[Kafka Topic]
end
subgraph stream processing
FLINK[Flink SQL Kafka Source]
PRINT[Flink Print Sink Console]
end
PG --> DBZ --> TOPIC --> FLINK --> PRINT
We verify that CDC events flow from Kafka to Flink and appear in the standard output in a format like +I[...]
.
3. Add Kafka Connector to Flink
Add the Kafka SQL connector JAR to Flink:
flink-sql-connector-kafka-3.3.0-1.19.jar
⚠️ To avoid interference with Flink core libraries, place the JAR in
flink/lib/ext/
and copy it into/opt/flink/lib/
at startup.
docker-compose.yaml
Excerpt
flink-jobmanager:
image: flink:1.19
command: ["/bin/bash", "/jobmanager-entrypoint.sh"]
volumes:
- ./flink/sql:/opt/flink/sql
- ./flink/lib/ext:/opt/flink/lib/ext
- ./flink/jobmanager-entrypoint.sh:/jobmanager-entrypoint.sh
jobmanager-entrypoint.sh
Example
#!/bin/bash
set -e
cp /opt/flink/lib/ext/*.jar /opt/flink/lib/
exec /docker-entrypoint.sh jobmanager
Apply a similar setup to TaskManager to copy the JAR.
4. Write Flink SQL Script
Save the following SQL in flink/sql/cdc_to_console.sql
:
CREATE TABLE cdc_source (
id INT,
message STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.public.testtable',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json',
'scan.startup.mode' = 'earliest-offset'
);
CREATE TABLE print_sink (
id INT,
message STRING
) WITH (
'connector' = 'print'
);
INSERT INTO print_sink
SELECT * FROM cdc_source;
Explanation of Flink SQL Tables
Kafka Source Table: cdc_source
Property | Description |
---|---|
connector = 'kafka' |
Reads data from a Kafka topic |
format = 'debezium-json' |
Handles JSON messages in Debezium format |
scan.startup.mode = 'earliest-offset' |
Reads from the earliest offset |
PRIMARY KEY (...) NOT ENFORCED |
Defines a primary key without enforcement |
Console Sink Table: print_sink
- Uses Flink's internal print connector to write output to stdout.
5. Run the Flink SQL Job
docker compose exec flink-jobmanager bash
sql-client.sh -f /opt/flink/sql/cdc_to_console.sql
Verify Running Jobs
docker compose exec flink-jobmanager bash
flink list
Expected output:
------------------ Running/Restarting Jobs -------------------
: insert-into_default_catalog.default_database.print_sink (RUNNING)
6. Check the Output
docker compose logs flink-taskmanager
Expected output:
flink-taskmanager-1 | +I[1, CDC test row]
Why TaskManager?
- The print sink outputs to the logs of the TaskManager running the job.
- Use
docker compose logs flink-taskmanager
to view the output.
7. Troubleshooting
SQL Client Doesn’t Start
- Check if JobManager is running:
docker compose logs flink-jobmanager
No CDC Output Appears
- Is the Kafka topic name correct?
- Is there data in the Kafka topic?
- Is
scan.startup.mode
set toearliest-offset
?
Check topic content:
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic dbserver1.public.testtable \
--from-beginning
Bonus: Observing Parallelism
If you scale Flink to use multiple TaskManagers, you’ll see output distributed across their logs.
This allows you to observe parallel execution, slot allocation, and subtask distribution.
In this chapter, we confirmed the flow from Kafka → Flink → console output. Next, we will write the results to OpenSearch for persistence.
(Coming soon: Chapter 4 Part 2 — Integrating Kafka CDC Data with OpenSearch Using Flink)