Connecting RDBs and Search Engines — Chapter 3

Chapter 3: Verifying the Flow from PostgreSQL to Debezium to Kafka In this chapter, we will test the end-to-end process of capturing change data from PostgreSQL with Debezium and delivering it to Kafka. 1. Starting the Environment Before beginning this chapter, ensure that all necessary services are running via Docker Compose: docker compose up -d Verify that the following components are running: PostgreSQL Apache Kafka Debezium Connect (Kafka Connect) ZooKeeper See docker-compose.yaml in the repository for service configuration details. 2. PostgreSQL Setup First, we create the table that Debezium will monitor. Add the following to postgres/00-init.sql: -- Create Debezium user CREATE ROLE debezium WITH LOGIN PASSWORD 'dbz' REPLICATION; -- Create target table CREATE TABLE testtable ( id INTEGER PRIMARY KEY, message VARCHAR(255) ); INSERT INTO testtable (id, message) VALUES (1, 'CDC test row'); -- Grant SELECT permission GRANT SELECT ON testtable TO debezium; -- Create publication (for pgoutput plugin) CREATE PUBLICATION debezium_pub FOR TABLE testtable; This script runs once only during initial container creation. To apply changes to the script, you must delete persistent volumes and restart: docker compose down --volumes 3. PostgreSQL Container Configuration To enable CDC using WAL (Write-Ahead Log), add the following to the docker-compose.yaml configuration: command: > postgres -c wal_level=logical -c max_replication_slots=4 -c max_wal_senders=4 To restart PostgreSQL without losing data: docker compose restart postgres To rerun initialization SQL, you’ll need to remove volumes: docker compose down --volumes 4. Registering the Debezium Connector Once Debezium Connect is running, register the connector with the following command (Debezium 1.9 format): curl -X POST "localhost:8083/connectors" \ -H "Content-Type: application/json" \ -d '{ "name": "postgres-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "debezium", "database.password": "dbz", "database.dbname": "postgres", "database.server.name": "dbserver1", "topic.prefix": "dbserver1", "plugin.name": "pgoutput", "publication.name": "debezium_pub", "slot.name": "debezium_slot", "slot.drop.on.stop": "true", "table.include.list": "public.testtable", "snapshot.mode": "initial", "tombstones.on.delete": "false", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false" } }' About snapshot.mode The snapshot.mode option controls whether to load the full contents of the database at connector startup. initial: Load all existing data once, then capture changes (default) never: Skip initial load; capture only subsequent changes In this guide, we use initial, so existing rows are sent to Kafka with op: r (read). 5. Checking Connector Status Check the connector status: curl http://localhost:8083/connectors/postgres-connector/status Expected output: { "name": "postgres-connector", "connector": { "state": "RUNNING", "worker_id": "10.4.1.81:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.4.1.81:8083" } ], "type": "source" } 6. Checking Kafka Topics List Kafka topics: # Enter Kafka container docker compose exec kafka bash # List topics kafka-topics --bootstrap-server localhost:9092 --list Expected output: dbserver1.public.testtable 7. Viewing CDC Events from Kafka Use the following to consume events from the topic: kafka-console-consumer --bootstrap-server localhost:9092 \ --topic dbserver1.public.testtable \ --from-beginning Sample snapshot output: { "before": null, "after": { "id": 1, "message": "CDC test row" }, "op": "r" } 8. Confirming Live Data Changes Add a new row in PostgreSQL: # Log into PostgreSQL docker compose exec postgres psql -U postgres # Insert new row INSERT INTO testtable (id, message) VALUES (2, 'inserted row'); Expected Kafka output: { "before": null, "after": { "id": 2, "message": "inserted row" }, "op": "c" } Debezium JSON Event Format Debezium uses the following format for CDC messages: op value Meaning r Snapshot read (initial data) c Insert (create) u Update d Delete For example, the following JSON means a new row was inserted: { "before": null, "after": { "id": 2, "message": "inserted row" }, "op": "c" }

May 10, 2025 - 23:58
 0
Connecting RDBs and Search Engines — Chapter 3

Chapter 3: Verifying the Flow from PostgreSQL to Debezium to Kafka

In this chapter, we will test the end-to-end process of capturing change data from PostgreSQL with Debezium and delivering it to Kafka.

1. Starting the Environment

Before beginning this chapter, ensure that all necessary services are running via Docker Compose:

docker compose up -d

Verify that the following components are running:

  • PostgreSQL
  • Apache Kafka
  • Debezium Connect (Kafka Connect)
  • ZooKeeper

See docker-compose.yaml in the repository for service configuration details.

2. PostgreSQL Setup

First, we create the table that Debezium will monitor. Add the following to postgres/00-init.sql:

-- Create Debezium user
CREATE ROLE debezium WITH LOGIN PASSWORD 'dbz' REPLICATION;

-- Create target table
CREATE TABLE testtable (
  id INTEGER PRIMARY KEY,
  message VARCHAR(255)
);
INSERT INTO testtable (id, message) VALUES (1, 'CDC test row');

-- Grant SELECT permission
GRANT SELECT ON testtable TO debezium;

-- Create publication (for pgoutput plugin)
CREATE PUBLICATION debezium_pub FOR TABLE testtable;

This script runs once only during initial container creation.
To apply changes to the script, you must delete persistent volumes and restart:

docker compose down --volumes

3. PostgreSQL Container Configuration

To enable CDC using WAL (Write-Ahead Log), add the following to the docker-compose.yaml configuration:

command: >
  postgres
  -c wal_level=logical
  -c max_replication_slots=4
  -c max_wal_senders=4

To restart PostgreSQL without losing data:

docker compose restart postgres

To rerun initialization SQL, you’ll need to remove volumes:

docker compose down --volumes

4. Registering the Debezium Connector

Once Debezium Connect is running, register the connector with the following command (Debezium 1.9 format):

curl -X POST "localhost:8083/connectors" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "postgres-connector",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "debezium",
      "database.password": "dbz",
      "database.dbname": "postgres",
      "database.server.name": "dbserver1",
      "topic.prefix": "dbserver1",
      "plugin.name": "pgoutput",
      "publication.name": "debezium_pub",
      "slot.name": "debezium_slot",
      "slot.drop.on.stop": "true",
      "table.include.list": "public.testtable",
      "snapshot.mode": "initial",
      "tombstones.on.delete": "false",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable": "false",
      "value.converter.schemas.enable": "false"
    }
  }'

About snapshot.mode

The snapshot.mode option controls whether to load the full contents of the database at connector startup.

  • initial: Load all existing data once, then capture changes (default)
  • never: Skip initial load; capture only subsequent changes

In this guide, we use initial, so existing rows are sent to Kafka with op: r (read).

5. Checking Connector Status

Check the connector status:

curl http://localhost:8083/connectors/postgres-connector/status

Expected output:

{
  "name": "postgres-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.4.1.81:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.4.1.81:8083"
    }
  ],
  "type": "source"
}

6. Checking Kafka Topics

List Kafka topics:

# Enter Kafka container
docker compose exec kafka bash

# List topics
kafka-topics --bootstrap-server localhost:9092 --list

Expected output:

dbserver1.public.testtable

7. Viewing CDC Events from Kafka

Use the following to consume events from the topic:

kafka-console-consumer --bootstrap-server localhost:9092 \
  --topic dbserver1.public.testtable \
  --from-beginning

Sample snapshot output:

{
  "before": null,
  "after": {
    "id": 1,
    "message": "CDC test row"
  },
  "op": "r"
}

8. Confirming Live Data Changes

Add a new row in PostgreSQL:

# Log into PostgreSQL
docker compose exec postgres psql -U postgres

# Insert new row
INSERT INTO testtable (id, message) VALUES (2, 'inserted row');

Expected Kafka output:

{
  "before": null,
  "after": {
    "id": 2,
    "message": "inserted row"
  },
  "op": "c"
}

Debezium JSON Event Format

Debezium uses the following format for CDC messages:

op value Meaning
r Snapshot read (initial data)
c Insert (create)
u Update
d Delete

For example, the following JSON means a new row was inserted:

{
  "before": null,
  "after": {
    "id": 2,
    "message": "inserted row"
  },
  "op": "c"
}

Troubleshooting

Topic Not Created in Kafka

If dbserver1.public.testtable doesn’t appear, check:

  • That debezium_pub exists in PostgreSQL:
SELECT * FROM pg_publication;
  • That publication.name and slot.name are correctly set in the connector config

  • That the 00-init.sql script ran (reset if needed):

docker compose down --volumes
docker compose up -d
  • That Kafka Connect logs are error-free:
docker compose logs connect

No Events Displayed from Kafka

If CDC events aren’t visible in Kafka:

  • Ensure the connector is RUNNING (see step 5)
  • Ensure the inserted row has a unique ID
  • Make sure you’re consuming from the beginning:
kafka-console-consumer --bootstrap-server localhost:9092 \
  --topic dbserver1.public.testtable \
  --from-beginning

In this chapter, we confirmed the flow of change data from PostgreSQL to Kafka using Debezium. In the next chapter, we'll use Flink to process this data.

(Coming soon: Chapter 4 Part 1 — Outputting Kafka CDC Data to Console with Flink)