Connecting RDBs and Search Engines — Chapter 5
Chapter 5: Implementing a CDC Join Pipeline with Flink SQL In this chapter, we build a full pipeline that captures CDC streams from PostgreSQL via Debezium, joins the data using Flink SQL, and stores the results in OpenSearch. Architecture Overview PostgreSQL ↓ (CDC) Debezium (Kafka Connect) ↓ (debezium-json) Kafka Topic (orders, products) ↓ Flink SQL ↓ OpenSearch (orders_with_products) PostgreSQL Table Setup Add the following to postgres/01-init.sql: -- Products table CREATE TABLE products ( product_id VARCHAR(32) PRIMARY KEY, product_name VARCHAR(255), category_id VARCHAR(32) ); ALTER TABLE products REPLICA IDENTITY FULL; -- Initial product data INSERT INTO products (product_id, product_name, category_id) VALUES ('P001', 'Sneaker X', 'C001'), ('P002', 'Jacket Y', 'C002'); -- Orders table CREATE TABLE orders ( order_id VARCHAR(32) PRIMARY KEY, order_time TIMESTAMP, customer_id VARCHAR(32), product_id VARCHAR(32), quantity INT, price NUMERIC(10,2) ); ALTER TABLE orders REPLICA IDENTITY FULL; -- Initial order data INSERT INTO orders (order_id, order_time, customer_id, product_id, quantity, price) VALUES ('O1001', '2025-04-27 10:00:00', 'CUST01', 'P001', 1, 9800.00), ('O1002', '2025-04-27 10:05:00', 'CUST02', 'P002', 2, 15800.00); -- Grant permissions to Debezium user GRANT SELECT ON products, orders TO debezium; -- Add tables to existing publication ALTER PUBLICATION debezium_pub ADD TABLE products, orders; Register Debezium Connector After Kafka Connect is running, register the connector using a script such as scripts/02-debezium-table-table-join.sh. Ensure the config includes: "table.include.list": "public.testtable,public.products,public.orders" Create OpenSearch Index Prepare opensearch/orders_with_products-mapping.json: { "mappings": { "properties": { "order_id": { "type": "keyword" }, "product_id": { "type": "keyword" }, "product_name": { "type": "text", "fields": { "raw": { "type": "keyword" } } }, "category_id": { "type": "keyword" }, "quantity": { "type": "integer" }, "price": { "type": "double" } } } } Create the index: curl -X PUT "http://localhost:9200/orders_with_products" \ -H "Content-Type: application/json" \ -d @opensearch/orders_with_products-mapping.json ✅ product_name uses a multi-field (text + keyword) for full-text and exact match support. Example Flink SQL Create flink/sql/table-table-join.sql: -- Products table CREATE TABLE products ( product_id STRING, product_name STRING, category_id STRING, PRIMARY KEY (product_id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'dbserver1.public.products', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'debezium-json', 'scan.startup.mode' = 'earliest-offset' ); -- Orders table CREATE TABLE orders ( order_id STRING, order_time STRING, customer_id STRING, product_id STRING, quantity INT, price DOUBLE ) WITH ( 'connector' = 'kafka', 'topic' = 'dbserver1.public.orders', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'debezium-json', 'scan.startup.mode' = 'earliest-offset' ); -- OpenSearch Sink CREATE TABLE orders_with_products ( order_id STRING, product_id STRING, product_name STRING, category_id STRING, quantity INT, price DOUBLE, PRIMARY KEY (order_id, product_id) NOT ENFORCED ) WITH ( 'connector' = 'opensearch-2', 'hosts' = 'http://opensearch:9200', 'allow-insecure' = 'true', 'index' = 'orders_with_products', 'document-id.key-delimiter' = '$', 'sink.bulk-flush.max-size' = '1mb' ); -- Join View CREATE VIEW orders_with_products_view AS SELECT o.order_id, o.product_id, p.product_name, p.category_id, o.quantity, o.price FROM orders o INNER JOIN products p ON o.product_id = p.product_id; -- Insert into OpenSearch INSERT INTO orders_with_products SELECT * FROM orders_with_products_view; Run the Flink Job Execute from inside the Flink container: sql-client.sh -f /opt/flink/sql/table-table-join.sql Or via Docker: docker compose exec flink-jobmanager sql-client.sh -f /opt/flink/sql/table-table-join.sql Validation Steps Verify CDC data in Kafka topics: docker compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic dbserver1.public.orders --from-beginning docker compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic dbserver1.public.products --from-beginning Verify data in OpenSearch: curl -X GET "http://localhost:9200/orders_with_products/_search?pretty" With custom query: curl -s -X GET "http://localhost:9200/orders_with_products/_search?pretty" \ -H 'Content-Type: application/json' \ -d '{ "query": { "match_all": {} } }' For convenience, use a script like scripts/opensearch-query.sh: #!

Chapter 5: Implementing a CDC Join Pipeline with Flink SQL
In this chapter, we build a full pipeline that captures CDC streams from PostgreSQL via Debezium, joins the data using Flink SQL, and stores the results in OpenSearch.
Architecture Overview
PostgreSQL
↓ (CDC)
Debezium (Kafka Connect)
↓ (debezium-json)
Kafka Topic (orders, products)
↓
Flink SQL
↓
OpenSearch (orders_with_products)
PostgreSQL Table Setup
Add the following to postgres/01-init.sql
:
-- Products table
CREATE TABLE products (
product_id VARCHAR(32) PRIMARY KEY,
product_name VARCHAR(255),
category_id VARCHAR(32)
);
ALTER TABLE products REPLICA IDENTITY FULL;
-- Initial product data
INSERT INTO products (product_id, product_name, category_id) VALUES
('P001', 'Sneaker X', 'C001'),
('P002', 'Jacket Y', 'C002');
-- Orders table
CREATE TABLE orders (
order_id VARCHAR(32) PRIMARY KEY,
order_time TIMESTAMP,
customer_id VARCHAR(32),
product_id VARCHAR(32),
quantity INT,
price NUMERIC(10,2)
);
ALTER TABLE orders REPLICA IDENTITY FULL;
-- Initial order data
INSERT INTO orders (order_id, order_time, customer_id, product_id, quantity, price) VALUES
('O1001', '2025-04-27 10:00:00', 'CUST01', 'P001', 1, 9800.00),
('O1002', '2025-04-27 10:05:00', 'CUST02', 'P002', 2, 15800.00);
-- Grant permissions to Debezium user
GRANT SELECT ON products, orders TO debezium;
-- Add tables to existing publication
ALTER PUBLICATION debezium_pub ADD TABLE products, orders;
Register Debezium Connector
After Kafka Connect is running, register the connector using a script such as scripts/02-debezium-table-table-join.sh
. Ensure the config includes:
"table.include.list": "public.testtable,public.products,public.orders"
Create OpenSearch Index
Prepare opensearch/orders_with_products-mapping.json
:
{
"mappings": {
"properties": {
"order_id": { "type": "keyword" },
"product_id": { "type": "keyword" },
"product_name": {
"type": "text",
"fields": { "raw": { "type": "keyword" } }
},
"category_id": { "type": "keyword" },
"quantity": { "type": "integer" },
"price": { "type": "double" }
}
}
}
Create the index:
curl -X PUT "http://localhost:9200/orders_with_products" \
-H "Content-Type: application/json" \
-d @opensearch/orders_with_products-mapping.json
✅
product_name
uses a multi-field (text + keyword) for full-text and exact match support.
Example Flink SQL
Create flink/sql/table-table-join.sql
:
-- Products table
CREATE TABLE products (
product_id STRING,
product_name STRING,
category_id STRING,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.public.products',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json',
'scan.startup.mode' = 'earliest-offset'
);
-- Orders table
CREATE TABLE orders (
order_id STRING,
order_time STRING,
customer_id STRING,
product_id STRING,
quantity INT,
price DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.public.orders',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json',
'scan.startup.mode' = 'earliest-offset'
);
-- OpenSearch Sink
CREATE TABLE orders_with_products (
order_id STRING,
product_id STRING,
product_name STRING,
category_id STRING,
quantity INT,
price DOUBLE,
PRIMARY KEY (order_id, product_id) NOT ENFORCED
) WITH (
'connector' = 'opensearch-2',
'hosts' = 'http://opensearch:9200',
'allow-insecure' = 'true',
'index' = 'orders_with_products',
'document-id.key-delimiter' = '$',
'sink.bulk-flush.max-size' = '1mb'
);
-- Join View
CREATE VIEW orders_with_products_view AS
SELECT
o.order_id,
o.product_id,
p.product_name,
p.category_id,
o.quantity,
o.price
FROM orders o
INNER JOIN products p ON o.product_id = p.product_id;
-- Insert into OpenSearch
INSERT INTO orders_with_products
SELECT * FROM orders_with_products_view;
Run the Flink Job
Execute from inside the Flink container:
sql-client.sh -f /opt/flink/sql/table-table-join.sql
Or via Docker:
docker compose exec flink-jobmanager sql-client.sh -f /opt/flink/sql/table-table-join.sql
Validation Steps
- Verify CDC data in Kafka topics:
docker compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic dbserver1.public.orders --from-beginning
docker compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic dbserver1.public.products --from-beginning
- Verify data in OpenSearch:
curl -X GET "http://localhost:9200/orders_with_products/_search?pretty"
With custom query:
curl -s -X GET "http://localhost:9200/orders_with_products/_search?pretty" \
-H 'Content-Type: application/json' \
-d '{ "query": { "match_all": {} } }'
For convenience, use a script like scripts/opensearch-query.sh
:
#!/bin/bash
osq() {
local index="${1:-_all}"
local size="${2:-10}"
curl -s -X GET "http://localhost:9200/${index}/_search?pretty" \
-H 'Content-Type: application/json' \
-d "{\"query\": { \"match_all\": {} }, \"size\": ${size} }"
}
Example usage:
source scripts/opensearch-query.sh
osq orders_with_products
osq orders_with_products 20
osq orders_with_products 100 | jq '.hits.hits[]._id'
In Chapter 6 and beyond, we will explore topics such as deduplication, batch processing, DLQ (Dead Letter Queue), and OpenSearch index partitioning strategies for production-grade pipelines.