Change Data Capture with Debezium
You have been building event-driven systems, but there is a dangerous gap hiding in your architecture. When your application writes to the database and then publishes an event to Kafka, what happens if the app crashes between those two operations? The database has the data, but the event never reaches Kafka. Your downstream services never know the change happened.
This is the dual-write problem, and it has caused countless production incidents. Polling the database for changes does not solve it either--polls can miss changes, create duplicate events, and add significant load to your database. The solution is Change Data Capture (CDC): reading changes directly from the database transaction log, where every committed change is guaranteed to appear exactly once.
Debezium is the industry-standard CDC platform for Kafka. It reads the PostgreSQL Write-Ahead Log (WAL), transforms changes into events, and delivers them to Kafka topics with exactly-once semantics. Combined with the transactional outbox pattern, you can guarantee that database writes and event publishing are atomic--they either both succeed or both fail.
The Dual-Write Problem Visualized
Consider what happens when your Task API creates a task and publishes an event:
┌─────────────────────────────────────────────────────────────────┐
│ Traditional Approach (DANGEROUS) │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. BEGIN TRANSACTION │
│ 2. INSERT INTO tasks (id, title) VALUES ('t-123', 'Buy milk') │
│ 3. COMMIT │
│ │
│ ─── Database write succeeded ─── │
│ │
│ 4. producer.produce('task-created', event) ← App crashes here │
│ │
│ Result: Task exists in database, but NO event in Kafka │
│ Notification service never knows task was created │
│ Audit log is missing the entry │
│ System is INCONSISTENT │
│ │
└─────────────────────────────────────────────────────────────────┘
The problem is fundamental: you cannot make a database write and a Kafka publish atomic using two-phase commit (2PC). Most databases and message brokers do not support it, and even when they do, the coupling creates fragile systems.
CDC: Reading the Transaction Log
Change Data Capture solves this by reading the database's own transaction log--the Write-Ahead Log (WAL) in PostgreSQL. Every committed change appears in the WAL, and Debezium reads it in near real-time:
┌─────────────────────────────────────────────────────────────────┐
│ CDC Approach (RELIABLE) │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ WAL ┌──────────┐ Events ┌─────────┐ │
│ │PostgreSQL│ ────────── │ Debezium │ ───────── │ Kafka │ │
│ │ DB │ (commits) │ Connector│ │ │ │
│ └──────────┘ └──────────┘ └─────────┘ │
│ │
│ - Every committed change appears in WAL │
│ - Debezium reads WAL asynchronously │
│ - No changes are ever missed │
│ - Low overhead (no polling queries) │
│ - Near real-time latency (milliseconds) │
│ │
└─────────────────────────────────────────────────────────────────┘
Debezium acts as a PostgreSQL replication client. It subscribes to the WAL using logical replication, transforms each change into a structured event, and produces it to Kafka. The database handles the hard work of tracking changes; Debezium simply reads and forwards them.
Deploying the Debezium PostgreSQL Connector
Debezium runs as a Kafka Connect connector. With Strimzi, you deploy it using the KafkaConnector custom resource.
Step 1: Prepare PostgreSQL for Logical Replication
PostgreSQL must be configured to allow logical replication. Add these settings to your PostgreSQL configuration:
-- postgresql.conf settings (or via ConfigMap for Kubernetes)
-- wal_level = logical
-- max_replication_slots = 4
-- max_wal_senders = 4
-- Create a replication user for Debezium
CREATE ROLE debezium WITH LOGIN REPLICATION PASSWORD 'dbz-secret';
-- Grant access to the database
GRANT CONNECT ON DATABASE taskdb TO debezium;
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium;
Output:
CREATE ROLE
GRANT
GRANT
GRANT
ALTER DEFAULT PRIVILEGES
Step 2: Build a Kafka Connect Image with Debezium
Strimzi requires a custom Kafka Connect image that includes the Debezium connector. Create a KafkaConnect resource that builds the image:
# kafka-connect-debezium.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: task-connect
namespace: kafka
annotations:
strimzi.io/use-connector-resources: "true" # Enable KafkaConnector CRDs
spec:
version: 4.1.1
replicas: 1
bootstrapServers: task-events-kafka-bootstrap:9092
config:
group.id: task-connect-cluster
offset.storage.topic: connect-offsets
config.storage.topic: connect-configs
status.storage.topic: connect-status
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
build:
output:
type: docker
image: my-registry/kafka-connect-debezium:latest
pushSecret: registry-credentials
plugins:
- name: debezium-postgres
artifacts:
- type: tgz
url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/3.0.0.Final/debezium-connector-postgres-3.0.0.Final-plugin.tar.gz
For local development without a registry, use an ephemeral output:
build:
output:
type: imagestream
image: kafka-connect-debezium:latest
plugins:
- name: debezium-postgres
artifacts:
- type: tgz
url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/3.0.0.Final/debezium-connector-postgres-3.0.0.Final-plugin.tar.gz
Apply the resource:
kubectl apply -f kafka-connect-debezium.yaml -n kafka
Output:
kafkaconnect.kafka.strimzi.io/task-connect created
Wait for the Connect cluster to be ready:
kubectl wait kafkaconnect/task-connect --for=condition=Ready --timeout=300s -n kafka
Step 3: Deploy the PostgreSQL Connector
Now deploy the connector itself using a KafkaConnector resource:
# debezium-postgres-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: task-postgres-connector
namespace: kafka
labels:
strimzi.io/cluster: task-connect # Must match KafkaConnect name
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 1
config:
# Connection settings
database.hostname: postgres-service
database.port: "5432"
database.user: debezium
database.password: dbz-secret
database.dbname: taskdb
# Unique identifier for this connector
topic.prefix: taskdb
# Capture settings
plugin.name: pgoutput # PostgreSQL native logical decoding
slot.name: debezium_task_slot
publication.name: debezium_publication
# Which tables to capture
table.include.list: public.tasks,public.outbox
# Schema settings
schema.history.internal.kafka.bootstrap.servers: task-events-kafka-bootstrap:9092
schema.history.internal.kafka.topic: schema-changes.taskdb
Apply the connector:
kubectl apply -f debezium-postgres-connector.yaml -n kafka
Output:
kafkaconnector.kafka.strimzi.io/task-postgres-connector created
Step 4: Verify CDC is Working
Check the connector status:
kubectl get kafkaconnector task-postgres-connector -n kafka -o jsonpath='{.status.connectorStatus.connector.state}'
Output:
RUNNING
Insert a test record and verify it appears in Kafka:
# Insert via psql or your application
psql -h postgres-service -U debezium -d taskdb -c \
"INSERT INTO tasks (id, title, status) VALUES ('t-999', 'Test CDC', 'pending')"
# Consume from the CDC topic
kubectl run kafka-consumer --rm -it --restart=Never \
--image=quay.io/strimzi/kafka:0.49.1-kafka-4.1.1 \
-n kafka -- bin/kafka-console-consumer.sh \
--bootstrap-server task-events-kafka-bootstrap:9092 \
--topic taskdb.public.tasks \
--from-beginning --max-messages 1
Output:
{
"before": null,
"after": {
"id": "t-999",
"title": "Test CDC",
"status": "pending"
},
"source": {
"version": "3.0.0.Final",
"connector": "postgresql",
"name": "taskdb",
"ts_ms": 1735344000000,
"db": "taskdb",
"schema": "public",
"table": "tasks"
},
"op": "c",
"ts_ms": 1735344000123
}
The "op": "c" indicates a create operation. Debezium uses operation codes: c (create), u (update), d (delete), and r (read/snapshot).
The Transactional Outbox Pattern
CDC captures all table changes, but capturing the tasks table directly has problems:
- Schema coupling: Consumer must understand your internal table schema
- Too much detail: Every column update becomes an event, even internal fields
- Wrong abstraction: Table rows are not the same as domain events
The transactional outbox pattern solves this. Instead of capturing the business table, you write domain events to an outbox table in the same transaction as your business data. Debezium captures the outbox table and transforms the records into proper events.
┌─────────────────────────────────────────────────────────────────┐
│ Transactional Outbox Pattern │
├─────────────────────────────────────────────────────────────────┤
│ │
│ BEGIN TRANSACTION │
│ │ │
│ ├── INSERT INTO tasks (id, title) │
│ │ VALUES ('t-123', 'Buy milk') │
│ │ │
│ └── INSERT INTO outbox (aggregate_id, event_type, payload) │
│ VALUES ('t-123', 'TaskCreated', '{"id":"t-123",...}') │
│ │ │
│ COMMIT ← Both writes succeed or both fail (ATOMIC) │
│ │
│ ───────────────────────────────────────────────────────────── │
│ │
│ Debezium reads outbox table from WAL │
│ │ │
│ └── Transforms outbox row into clean event │
│ └── Produces to: task-events topic │
│ │
│ Result: Database + Event are ALWAYS consistent │
│ │
└─────────────────────────────────────────────────────────────────┘
Creating the Outbox Table
Design your outbox table to hold domain events:
-- Outbox table for domain events
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(255) NOT NULL, -- e.g., 'Task', 'User'
aggregate_id VARCHAR(255) NOT NULL, -- e.g., task ID
event_type VARCHAR(255) NOT NULL, -- e.g., 'TaskCreated'
payload JSONB NOT NULL, -- Event data as JSON
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
-- Index for potential cleanup queries
CREATE INDEX idx_outbox_created_at ON outbox(created_at);
Output:
CREATE TABLE
CREATE INDEX
Writing to the Outbox
When your application creates a task, write to both tables in one transaction:
from sqlalchemy import text
from sqlalchemy.orm import Session
import json
import uuid
from datetime import datetime, timezone
def create_task_with_event(session: Session, title: str, owner_id: str) -> dict:
"""Create a task and its event atomically."""
task_id = str(uuid.uuid4())
# Domain event payload
event_payload = {
"task_id": task_id,
"title": title,
"owner_id": owner_id,
"created_at": datetime.now(timezone.utc).isoformat()
}
# Both inserts in the same transaction
session.execute(
text("""
INSERT INTO tasks (id, title, owner_id, status)
VALUES (:id, :title, :owner_id, 'pending')
"""),
{"id": task_id, "title": title, "owner_id": owner_id}
)
session.execute(
text("""
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES (:agg_type, :agg_id, :event_type, :payload)
"""),
{
"agg_type": "Task",
"agg_id": task_id,
"event_type": "TaskCreated",
"payload": json.dumps(event_payload)
}
)
session.commit() # Both writes are atomic
return {"id": task_id, "title": title}
Output:
>>> create_task_with_event(session, "Buy groceries", "user-456")
{'id': 'a1b2c3d4-e5f6-7890-abcd-ef1234567890', 'title': 'Buy groceries'}
Configuring the Outbox Event Router
Debezium includes the Outbox Event Router transformation that converts outbox table records into properly formatted events. Update your connector configuration:
# debezium-outbox-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: task-outbox-connector
namespace: kafka
labels:
strimzi.io/cluster: task-connect
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 1
config:
# Connection settings (same as before)
database.hostname: postgres-service
database.port: "5432"
database.user: debezium
database.password: dbz-secret
database.dbname: taskdb
topic.prefix: taskdb
plugin.name: pgoutput
slot.name: debezium_outbox_slot
publication.name: debezium_outbox_pub
# Only capture the outbox table
table.include.list: public.outbox
# Outbox Event Router transformation
transforms: outbox
transforms.outbox.type: io.debezium.transforms.outbox.EventRouter
# Route to topic based on aggregate type
transforms.outbox.route.topic.replacement: ${routedByValue}.events
transforms.outbox.table.field.event.type: event_type
transforms.outbox.table.field.event.key: aggregate_id
transforms.outbox.table.field.event.payload: payload
# Delete outbox records after capture (optional)
transforms.outbox.table.expand.json.payload: true
# Schema history
schema.history.internal.kafka.bootstrap.servers: task-events-kafka-bootstrap:9092
schema.history.internal.kafka.topic: schema-changes.outbox
Apply the updated connector:
kubectl apply -f debezium-outbox-connector.yaml -n kafka
Output:
kafkaconnector.kafka.strimzi.io/task-outbox-connector created
With this configuration, when you insert a row into the outbox table with aggregate_type: "Task", Debezium produces an event to the Task.events Kafka topic. The event key is the aggregate_id, and the payload is the JSON from the payload column.
Verifying the Outbox Pipeline
Test the complete flow:
# Insert via your application or directly
psql -h postgres-service -U app_user -d taskdb -c "
BEGIN;
INSERT INTO tasks (id, title, owner_id, status)
VALUES ('t-abc', 'Test Outbox', 'u-123', 'pending');
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES ('Task', 't-abc', 'TaskCreated',
'{\"task_id\":\"t-abc\",\"title\":\"Test Outbox\",\"owner_id\":\"u-123\"}');
COMMIT;
"
# Consume from the routed topic
kubectl run kafka-consumer --rm -it --restart=Never \
--image=quay.io/strimzi/kafka:0.49.1-kafka-4.1.1 \
-n kafka -- bin/kafka-console-consumer.sh \
--bootstrap-server task-events-kafka-bootstrap:9092 \
--topic Task.events \
--from-beginning --max-messages 1
Output:
{
"task_id": "t-abc",
"title": "Test Outbox",
"owner_id": "u-123"
}
The event is clean and domain-focused--no database metadata, no before/after snapshots, just the business event payload.
Handling Outbox Table Growth
One concern with the outbox pattern: the table grows with every event. There are three strategies:
| Strategy | How It Works | Pros | Cons |
|---|---|---|---|
| Debezium delete | Configure Debezium to delete rows after capture | Automatic cleanup | Requires DELETE permissions |
| Scheduled cleanup | Cron job deletes old rows | Simple, controllable | Slight delay before cleanup |
| Log-only outbox | Use pg_logical_emit_message() | No table growth at all | PostgreSQL-specific |
For most applications, scheduled cleanup is simplest:
-- Delete outbox entries older than 7 days
DELETE FROM outbox WHERE created_at < NOW() - INTERVAL '7 days';
Run this as a Kubernetes CronJob or PostgreSQL scheduled job.
Common Issues and Debugging
| Symptom | Likely Cause | Resolution |
|---|---|---|
Connector stuck in PAUSED | Replication slot missing | Check PostgreSQL logs; recreate slot |
| No events appearing | Wrong table.include.list | Verify table name matches exactly |
| Events have wrong schema | Outbox table structure mismatch | Verify column names match transformer config |
| High WAL disk usage | Connector not reading fast enough | Check connector lag; increase resources |
REPLICATION_SLOT_CONFLICT | Slot dropped while connector running | Restart connector; it will create new slot |
Check connector status with:
kubectl get kafkaconnector task-outbox-connector -n kafka -o yaml
Look at .status.connectorStatus for error messages.
Reflect on Your Skill
You built a kafka-events skill in Lesson 0. Test and improve it based on what you learned.
Test Your Skill
Using my kafka-events skill, implement the transactional outbox pattern for a Task API.
Does my skill generate the outbox table schema, the transactional insert code, and the Debezium outbox event router configuration?
Identify Gaps
Ask yourself:
- Did my skill explain the dual-write problem and how CDC solves it?
- Did it cover the transactional outbox pattern and Debezium outbox event router?
Improve Your Skill
If you found gaps:
My kafka-events skill is missing CDC patterns (dual-write problem, transactional outbox, Debezium connector config).
Update it to include when to use CDC versus polling and how to implement the outbox pattern atomically.
Try With AI
Prompt 1: Design Your Outbox Schema
I'm implementing the transactional outbox pattern for my [describe your domain -
e.g., "e-commerce order system" or "user management service"].
My main entities are: [list 2-3 entities]
Events I need to publish: [list events like "OrderCreated", "UserRegistered"]
Help me design:
1. The outbox table schema for my specific events
2. A Python function that writes my business data + outbox entry atomically
3. The Debezium outbox event router configuration for my topic naming
Ask me clarifying questions about my requirements before designing.
What you're learning: Translating the outbox pattern from generic knowledge to your specific domain. AI helps you think through the schema decisions and naming conventions that fit your business events.
Prompt 2: Debug a CDC Issue
My Debezium PostgreSQL connector is in RUNNING state but I'm not seeing events
in Kafka. Here's my situation:
- PostgreSQL version: [your version]
- Connector config: [paste relevant parts]
- Topic I expect events on: [topic name]
Walk me through the debugging steps:
1. How do I verify PostgreSQL WAL is configured correctly?
2. How do I check if Debezium is actually reading the WAL?
3. How do I trace where events might be getting lost?
What you're learning: Systematic debugging of CDC pipelines. AI provides the specific commands and queries for each diagnostic step while you evaluate whether the outputs indicate problems.
Prompt 3: Evaluate CDC vs Polling Trade-offs
My team is debating whether to use Debezium CDC or simple polling for our
event publishing. Our context:
- Database: [PostgreSQL/MySQL/etc.]
- Event volume: [events per second/minute]
- Latency requirement: [how fast events must be published]
- Ops team experience: [familiar with Kafka Connect or not]
Help me build a decision framework. What questions should I ask to make
this choice? What are the hidden costs of each approach that might not
be obvious upfront?
What you're learning: Architectural decision-making. AI helps you identify considerations you might miss while you evaluate whether each factor applies to your specific context.
Safety note: Always test CDC configurations in a non-production environment first. Logical replication creates replication slots that consume WAL space--if the connector stops reading, WAL can fill your disk. Monitor replication slot lag in production.