Connecting RDBs and Search Engines — Chapter 3
Chapter 3: Verifying the Flow from PostgreSQL to Debezium to Kafka In this chapter, we will test the end-to-end process of capturing change data from PostgreSQL with Debezium and delivering it to Kafka. 1. Starting the Environment Before beginning this chapter, ensure that all necessary services are running via Docker Compose: docker compose up -d Verify that the following components are running: PostgreSQL Apache Kafka Debezium Connect (Kafka Connect) ZooKeeper See docker-compose.yaml in the repository for service configuration details. 2. PostgreSQL Setup First, we create the table that Debezium will monitor. Add the following to postgres/00-init.sql: -- Create Debezium user CREATE ROLE debezium WITH LOGIN PASSWORD 'dbz' REPLICATION; -- Create target table CREATE TABLE testtable ( id INTEGER PRIMARY KEY, message VARCHAR(255) ); INSERT INTO testtable (id, message) VALUES (1, 'CDC test row'); -- Grant SELECT permission GRANT SELECT ON testtable TO debezium; -- Create publication (for pgoutput plugin) CREATE PUBLICATION debezium_pub FOR TABLE testtable; This script runs once only during initial container creation. To apply changes to the script, you must delete persistent volumes and restart: docker compose down --volumes 3. PostgreSQL Container Configuration To enable CDC using WAL (Write-Ahead Log), add the following to the docker-compose.yaml configuration: command: > postgres -c wal_level=logical -c max_replication_slots=4 -c max_wal_senders=4 To restart PostgreSQL without losing data: docker compose restart postgres To rerun initialization SQL, you’ll need to remove volumes: docker compose down --volumes 4. Registering the Debezium Connector Once Debezium Connect is running, register the connector with the following command (Debezium 1.9 format): curl -X POST "localhost:8083/connectors" \ -H "Content-Type: application/json" \ -d '{ "name": "postgres-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "debezium", "database.password": "dbz", "database.dbname": "postgres", "database.server.name": "dbserver1", "topic.prefix": "dbserver1", "plugin.name": "pgoutput", "publication.name": "debezium_pub", "slot.name": "debezium_slot", "slot.drop.on.stop": "true", "table.include.list": "public.testtable", "snapshot.mode": "initial", "tombstones.on.delete": "false", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false" } }' About snapshot.mode The snapshot.mode option controls whether to load the full contents of the database at connector startup. initial: Load all existing data once, then capture changes (default) never: Skip initial load; capture only subsequent changes In this guide, we use initial, so existing rows are sent to Kafka with op: r (read). 5. Checking Connector Status Check the connector status: curl http://localhost:8083/connectors/postgres-connector/status Expected output: { "name": "postgres-connector", "connector": { "state": "RUNNING", "worker_id": "10.4.1.81:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.4.1.81:8083" } ], "type": "source" } 6. Checking Kafka Topics List Kafka topics: # Enter Kafka container docker compose exec kafka bash # List topics kafka-topics --bootstrap-server localhost:9092 --list Expected output: dbserver1.public.testtable 7. Viewing CDC Events from Kafka Use the following to consume events from the topic: kafka-console-consumer --bootstrap-server localhost:9092 \ --topic dbserver1.public.testtable \ --from-beginning Sample snapshot output: { "before": null, "after": { "id": 1, "message": "CDC test row" }, "op": "r" } 8. Confirming Live Data Changes Add a new row in PostgreSQL: # Log into PostgreSQL docker compose exec postgres psql -U postgres # Insert new row INSERT INTO testtable (id, message) VALUES (2, 'inserted row'); Expected Kafka output: { "before": null, "after": { "id": 2, "message": "inserted row" }, "op": "c" } Debezium JSON Event Format Debezium uses the following format for CDC messages: op value Meaning r Snapshot read (initial data) c Insert (create) u Update d Delete For example, the following JSON means a new row was inserted: { "before": null, "after": { "id": 2, "message": "inserted row" }, "op": "c" }

Chapter 3: Verifying the Flow from PostgreSQL to Debezium to Kafka
In this chapter, we will test the end-to-end process of capturing change data from PostgreSQL with Debezium and delivering it to Kafka.
1. Starting the Environment
Before beginning this chapter, ensure that all necessary services are running via Docker Compose:
docker compose up -d
Verify that the following components are running:
- PostgreSQL
- Apache Kafka
- Debezium Connect (Kafka Connect)
- ZooKeeper
See docker-compose.yaml
in the repository for service configuration details.
2. PostgreSQL Setup
First, we create the table that Debezium will monitor. Add the following to postgres/00-init.sql
:
-- Create Debezium user
CREATE ROLE debezium WITH LOGIN PASSWORD 'dbz' REPLICATION;
-- Create target table
CREATE TABLE testtable (
id INTEGER PRIMARY KEY,
message VARCHAR(255)
);
INSERT INTO testtable (id, message) VALUES (1, 'CDC test row');
-- Grant SELECT permission
GRANT SELECT ON testtable TO debezium;
-- Create publication (for pgoutput plugin)
CREATE PUBLICATION debezium_pub FOR TABLE testtable;
This script runs once only during initial container creation.
To apply changes to the script, you must delete persistent volumes and restart:
docker compose down --volumes
3. PostgreSQL Container Configuration
To enable CDC using WAL (Write-Ahead Log), add the following to the docker-compose.yaml
configuration:
command: >
postgres
-c wal_level=logical
-c max_replication_slots=4
-c max_wal_senders=4
To restart PostgreSQL without losing data:
docker compose restart postgres
To rerun initialization SQL, you’ll need to remove volumes:
docker compose down --volumes
4. Registering the Debezium Connector
Once Debezium Connect is running, register the connector with the following command (Debezium 1.9 format):
curl -X POST "localhost:8083/connectors" \
-H "Content-Type: application/json" \
-d '{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "dbz",
"database.dbname": "postgres",
"database.server.name": "dbserver1",
"topic.prefix": "dbserver1",
"plugin.name": "pgoutput",
"publication.name": "debezium_pub",
"slot.name": "debezium_slot",
"slot.drop.on.stop": "true",
"table.include.list": "public.testtable",
"snapshot.mode": "initial",
"tombstones.on.delete": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
}'
About snapshot.mode
The snapshot.mode
option controls whether to load the full contents of the database at connector startup.
-
initial
: Load all existing data once, then capture changes (default) -
never
: Skip initial load; capture only subsequent changes
In this guide, we use initial
, so existing rows are sent to Kafka with op: r
(read).
5. Checking Connector Status
Check the connector status:
curl http://localhost:8083/connectors/postgres-connector/status
Expected output:
{
"name": "postgres-connector",
"connector": {
"state": "RUNNING",
"worker_id": "10.4.1.81:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "10.4.1.81:8083"
}
],
"type": "source"
}
6. Checking Kafka Topics
List Kafka topics:
# Enter Kafka container
docker compose exec kafka bash
# List topics
kafka-topics --bootstrap-server localhost:9092 --list
Expected output:
dbserver1.public.testtable
7. Viewing CDC Events from Kafka
Use the following to consume events from the topic:
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic dbserver1.public.testtable \
--from-beginning
Sample snapshot output:
{
"before": null,
"after": {
"id": 1,
"message": "CDC test row"
},
"op": "r"
}
8. Confirming Live Data Changes
Add a new row in PostgreSQL:
# Log into PostgreSQL
docker compose exec postgres psql -U postgres
# Insert new row
INSERT INTO testtable (id, message) VALUES (2, 'inserted row');
Expected Kafka output:
{
"before": null,
"after": {
"id": 2,
"message": "inserted row"
},
"op": "c"
}
Debezium JSON Event Format
Debezium uses the following format for CDC messages:
op value |
Meaning |
---|---|
r |
Snapshot read (initial data) |
c |
Insert (create) |
u |
Update |
d |
Delete |
For example, the following JSON means a new row was inserted:
{
"before": null,
"after": {
"id": 2,
"message": "inserted row"
},
"op": "c"
}
Troubleshooting
Topic Not Created in Kafka
If dbserver1.public.testtable
doesn’t appear, check:
- That
debezium_pub
exists in PostgreSQL:
SELECT * FROM pg_publication;
That
publication.name
andslot.name
are correctly set in the connector configThat the
00-init.sql
script ran (reset if needed):
docker compose down --volumes
docker compose up -d
- That Kafka Connect logs are error-free:
docker compose logs connect
No Events Displayed from Kafka
If CDC events aren’t visible in Kafka:
- Ensure the connector is
RUNNING
(see step 5) - Ensure the inserted row has a unique ID
- Make sure you’re consuming from the beginning:
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic dbserver1.public.testtable \
--from-beginning
In this chapter, we confirmed the flow of change data from PostgreSQL to Kafka using Debezium. In the next chapter, we'll use Flink to process this data.
(Coming soon: Chapter 4 Part 1 — Outputting Kafka CDC Data to Console with Flink)