Kafka Connect: Building Data Pipelines
You've built producers and consumers in Python. They work, but now you're asked to integrate Kafka with a PostgreSQL database, an Elasticsearch cluster, and an S3 bucket. Writing custom producers and consumers for each integration means maintaining three separate codebases, handling retries, managing offsets, and implementing fault tolerance—for each one.
This is exactly what Kafka Connect solves. It's a framework for building standardized, scalable data pipelines without writing custom code. Over 200 pre-built connectors exist for databases, message queues, cloud storage, and search engines. Instead of implementing integration logic, you deploy a connector with configuration.
In this lesson, you'll deploy Kafka Connect on Kubernetes using Strimzi and configure connectors using declarative YAML—the same GitOps pattern you've used throughout this book. The pattern you learn here lets you build data pipelines that would take weeks to code manually, deployed in minutes.
Understanding Kafka Connect Architecture
Kafka Connect consists of workers that execute connectors and tasks. This separation lets Connect distribute work across multiple nodes for scalability and fault tolerance.
┌──────────────────────────────────────────────────────────────────────────┐
│ Kafka Connect Cluster │
├──────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ Worker 1 │ │ Worker 2 │ │
│ │ ┌───────────────┐ │ │ ┌───────────────┐ │ │
│ │ │ Connector A │ │ │ │ Connector A │ │ │
│ │ │ Task 0 │ │ │ │ Task 1 │ │ │
│ │ └───────────────┘ │ │ └───────────────┘ │ │
│ │ ┌───────────────┐ │ │ ┌───────────────┐ │ │
│ │ │ Connector B │ │ │ │ Connector C │ │ │
│ │ │ Task 0 │ │ │ │ Task 0 │ │ │
│ │ └───────────────┘ │ │ └───────────────┘ │ │
│ └─────────────────────┘ └─────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────────┘
│
▼
┌───────────────┐
│ Kafka │
│ Cluster │
└───────────────┘
Key components:
| Component | Role | Scaling |
|---|---|---|
| Worker | JVM process that hosts connector tasks | Add workers for more capacity |
| Connector | Logical job definition (what to integrate) | One connector can have many tasks |
| Task | Unit of work doing actual data movement | Connector decides task count based on parallelism needs |
| Converter | Transforms data between Kafka format and connector format | Configured per connector |
Source vs Sink Connectors
Connectors come in two types based on data flow direction:
Source Connectors
(Data INTO Kafka)
│
┌──────────────┐ ▼ ┌──────────────┐
│ PostgreSQL │ ──────────────────▶ │ │
└──────────────┘ │ │
│ Kafka │
┌──────────────┐ │ Topics │
│ S3 │ ──────────────────▶ │ │
└──────────────┘ │ │
└──────────────┘
│
┌──────────────────┼──────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│Elasticsearch │ │ Redis │ │ S3 │
└──────────────┘ └──────────────┘ └──────────────┘
Sink Connectors
(Data OUT OF Kafka)
Source connectors produce data into Kafka topics:
| Use Case | Source Connector | Data Flow |
|---|---|---|
| Database CDC | Debezium PostgreSQL | DB changes → Kafka |
| File ingestion | File Source | Files → Kafka |
| JDBC polling | JDBC Source | SQL query results → Kafka |
| HTTP endpoints | HTTP Source | API responses → Kafka |
Sink connectors consume from Kafka topics and write to external systems:
| Use Case | Sink Connector | Data Flow |
|---|---|---|
| Search indexing | Elasticsearch Sink | Kafka → Elasticsearch |
| Data lake | S3 Sink | Kafka → S3 buckets |
| Database sync | JDBC Sink | Kafka → Database tables |
| Caching | Redis Sink | Kafka → Redis |
Deploying Kafka Connect with Strimzi
Strimzi provides two CRDs for Kafka Connect:
- KafkaConnect: Deploys a Kafka Connect cluster
- KafkaConnector: Configures individual connectors
Step 1: Create KafkaConnect Resource
The KafkaConnect CRD deploys Connect workers and optionally builds a custom image with connector plugins:
# kafka-connect.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: task-connect
namespace: kafka
annotations:
strimzi.io/use-connector-resources: "true" # Enable KafkaConnector CRD
spec:
version: 4.1.1
replicas: 2 # Two workers for fault tolerance
bootstrapServers: task-events-kafka-bootstrap:9092
# Build connector image with plugins
build:
output:
type: docker
image: localhost:5000/task-connect:latest # Local registry or your registry
plugins:
- name: file-source
artifacts:
- type: maven
group: org.apache.kafka
artifact: connect-file
version: 4.1.1
- name: debezium-postgres
artifacts:
- type: maven
group: io.debezium
artifact: debezium-connector-postgres
version: 2.7.0.Final
config:
# Worker configuration
group.id: task-connect-cluster
offset.storage.topic: task-connect-offsets
config.storage.topic: task-connect-configs
status.storage.topic: task-connect-status
# Replication for internal topics
offset.storage.replication.factor: 1
config.storage.replication.factor: 1
status.storage.replication.factor: 1
# Converters (how data is serialized)
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
resources:
requests:
memory: 512Mi
cpu: 250m
limits:
memory: 1Gi
cpu: 500m
Key configuration explained:
| Setting | Purpose |
|---|---|
strimzi.io/use-connector-resources | Enables managing connectors via KafkaConnector CRDs instead of REST API |
build.plugins | Downloads connector JARs from Maven and builds them into the Connect image |
offset.storage.topic | Kafka topic storing connector offset positions |
config.storage.topic | Kafka topic storing connector configurations |
status.storage.topic | Kafka topic storing connector status |
key.converter / value.converter | How messages are serialized (JSON, Avro, String) |
Step 2: Apply and Verify Deployment
# Apply KafkaConnect resource
kubectl apply -f kafka-connect.yaml -n kafka
# Watch for Connect pods to become ready (takes 2-5 minutes for image build)
kubectl get pods -n kafka -l strimzi.io/cluster=task-connect -w
Output:
NAME READY STATUS RESTARTS AGE
task-connect-connect-0 1/1 Running 0 3m
task-connect-connect-1 1/1 Running 0 3m
task-connect-build-1-build 0/1 Completed 0 5m
The build pod compiles the connector image, then the Connect workers start.
Step 3: Configure a Source Connector
With Kafka Connect running, deploy a connector using KafkaConnector CRD. This example uses the File Source connector (useful for testing):
# file-source-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: file-source
namespace: kafka
labels:
strimzi.io/cluster: task-connect # Links to KafkaConnect resource
spec:
class: org.apache.kafka.connect.file.FileStreamSourceConnector
tasksMax: 1
config:
file: /opt/kafka/data/input.txt
topic: file-events
# Apply the connector
kubectl apply -f file-source-connector.yaml -n kafka
# Check connector status
kubectl get kafkaconnectors -n kafka
Output:
NAME CLUSTER CONNECTOR CLASS MAX TASKS READY
file-source task-connect org.apache.kafka.connect.file.FileStreamSourceConnector 1 True
Step 4: Configure a Sink Connector
Sink connectors write Kafka data to external systems. This example writes to a file (for demonstration):
# file-sink-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: file-sink
namespace: kafka
labels:
strimzi.io/cluster: task-connect
spec:
class: org.apache.kafka.connect.file.FileStreamSinkConnector
tasksMax: 1
config:
file: /opt/kafka/data/output.txt
topics: file-events
Now data flows: input.txt → file-events topic → output.txt
Practical Example: Database to Elasticsearch Pipeline
A realistic pipeline might sync database changes to Elasticsearch for search. Here's how you'd configure it:
# postgres-source.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: postgres-tasks-source
namespace: kafka
labels:
strimzi.io/cluster: task-connect
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 1
config:
database.hostname: postgres.default.svc.cluster.local
database.port: "5432"
database.user: ${file:/opt/kafka/external-secrets/postgres:username}
database.password: ${file:/opt/kafka/external-secrets/postgres:password}
database.dbname: taskdb
database.server.name: taskdb
table.include.list: public.tasks
topic.prefix: cdc
plugin.name: pgoutput
---
# elasticsearch-sink.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: elasticsearch-tasks-sink
namespace: kafka
labels:
strimzi.io/cluster: task-connect
spec:
class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasksMax: 2
config:
connection.url: http://elasticsearch.default.svc.cluster.local:9200
topics: cdc.public.tasks
type.name: _doc
key.ignore: "false"
schema.ignore: "true"
Data flow:
- User creates task in PostgreSQL
- Debezium reads PostgreSQL WAL (transaction log)
- Change event published to
cdc.public.taskstopic - Elasticsearch sink writes to
tasksindex - Task searchable in Elasticsearch within seconds
When to Use Kafka Connect vs Custom Code
Not every integration needs Kafka Connect. Here's a decision framework:
| Factor | Use Kafka Connect | Use Custom Producer/Consumer |
|---|---|---|
| Standard system | Databases, S3, Elasticsearch | Custom APIs, proprietary systems |
| Connector exists | 200+ pre-built connectors | No connector available |
| Configuration-only | YAML/JSON config sufficient | Complex transformation logic |
| Operations team | Prefers declarative config | Prefers code review process |
| Maintenance | Connector vendor maintains | You maintain |
| Performance tuning | Standard throughput needs | Extreme optimization required |
| Domain logic | Simple field mapping | Business rules, validation |
Choose Kafka Connect when:
- A connector exists for your source/sink
- Configuration can express your requirements
- You want ecosystem support and community updates
Choose custom code when:
- No connector exists for your system
- Complex business logic during data movement
- You need fine-grained control over batching, retries, error handling
- Performance requirements exceed connector capabilities
Monitoring Connector Status
Check connector and task status:
# Connector-level status
kubectl get kafkaconnectors -n kafka
# Detailed status including tasks
kubectl get kafkaconnector file-source -n kafka -o yaml
Output (status section):
status:
conditions:
- lastTransitionTime: "2025-01-15T10:30:00Z"
status: "True"
type: Ready
connectorStatus:
connector:
state: RUNNING
worker_id: task-connect-connect-0.task-connect-connect.kafka.svc:8083
name: file-source
tasks:
- id: 0
state: RUNNING
worker_id: task-connect-connect-0.task-connect-connect.kafka.svc:8083
type: source
Common connector states:
| State | Meaning | Action |
|---|---|---|
| RUNNING | Healthy | None |
| PAUSED | Temporarily stopped | Check if intentional |
| FAILED | Task crashed | Check logs, fix config |
| UNASSIGNED | No worker picked up task | Check worker health |
View connector logs:
kubectl logs task-connect-connect-0 -n kafka | grep -i "file-source"
Reflect on Your Skill
You built a kafka-events skill in Lesson 0. Test and improve it based on what you learned.
Test Your Skill
Using my kafka-events skill, design a Kafka Connect pipeline that captures PostgreSQL changes and sinks them to Elasticsearch.
Does my skill generate KafkaConnect and KafkaConnector CRDs? Does it explain source vs sink connectors?
Identify Gaps
Ask yourself:
- Did my skill explain Kafka Connect architecture (workers, tasks, connectors)?
- Did it cover when to use Kafka Connect versus custom producers/consumers?
Improve Your Skill
If you found gaps:
My kafka-events skill is missing Kafka Connect patterns (source/sink connectors, KafkaConnect/KafkaConnector CRDs).
Update it to include when to use pre-built connectors versus writing custom integration code.
Try With AI
Setup: You need to build a data pipeline integrating multiple systems with Kafka.
Prompt 1: Design a pipeline architecture
I need to build a pipeline that:
1. Captures changes from a PostgreSQL "orders" table
2. Enriches data by joining with a "customers" topic
3. Writes to both Elasticsearch (for search) and S3 (for analytics)
Help me design this with Kafka Connect. What connectors do I need?
What Kafka topics should exist? Where does the enrichment happen?
What you're learning: AI helps you think through pipeline topology. It will likely point out that enrichment (joining streams) happens in Kafka Streams or ksqlDB, not in Connect itself—Connect just moves data, transformation is a separate concern.
Prompt 2: Troubleshoot a connector failure
My KafkaConnector shows this status:
status:
connectorStatus:
connector:
state: FAILED
trace: "org.apache.kafka.connect.errors.ConnectException:
Unable to connect to database: Connection refused"
tasks: []
What are the possible causes and how do I debug this step by step?
Include kubectl commands I should run.
What you're learning: AI walks through systematic debugging—network connectivity, credentials, service discovery in Kubernetes, and how to test database access from within the Connect pod.
Prompt 3: Evaluate for your use case
I'm building an integration between our Task API and a third-party
CRM system. The CRM has a REST API but no Kafka connector exists.
I need to:
- Push new tasks to CRM when created
- Pull CRM contact updates back to our system
Should I write custom producers/consumers or try to make Kafka Connect work?
What are the tradeoffs? Be specific about maintenance, complexity, and operations.
What you're learning: AI helps you apply the Connect vs custom code decision framework to your specific constraints, likely recommending custom code for the REST API integration while suggesting Connect patterns you might adopt in your custom implementation.
Safety note: When testing connectors, especially source connectors reading from databases, use read-only credentials or a replica database. A misconfigured connector can create load on production databases.