Skip to main content
Updated Feb 23, 2026

Kafka Connect: Building Data Pipelines

You've built producers and consumers in Python. They work, but now you're asked to integrate Kafka with a PostgreSQL database, an Elasticsearch cluster, and an S3 bucket. Writing custom producers and consumers for each integration means maintaining three separate codebases, handling retries, managing offsets, and implementing fault tolerance—for each one.

This is exactly what Kafka Connect solves. It's a framework for building standardized, scalable data pipelines without writing custom code. Over 200 pre-built connectors exist for databases, message queues, cloud storage, and search engines. Instead of implementing integration logic, you deploy a connector with configuration.

In this lesson, you'll deploy Kafka Connect on Kubernetes using Strimzi and configure connectors using declarative YAML—the same GitOps pattern you've used throughout this book. The pattern you learn here lets you build data pipelines that would take weeks to code manually, deployed in minutes.

Understanding Kafka Connect Architecture

Kafka Connect consists of workers that execute connectors and tasks. This separation lets Connect distribute work across multiple nodes for scalability and fault tolerance.

┌──────────────────────────────────────────────────────────────────────────┐
│ Kafka Connect Cluster │
├──────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ Worker 1 │ │ Worker 2 │ │
│ │ ┌───────────────┐ │ │ ┌───────────────┐ │ │
│ │ │ Connector A │ │ │ │ Connector A │ │ │
│ │ │ Task 0 │ │ │ │ Task 1 │ │ │
│ │ └───────────────┘ │ │ └───────────────┘ │ │
│ │ ┌───────────────┐ │ │ ┌───────────────┐ │ │
│ │ │ Connector B │ │ │ │ Connector C │ │ │
│ │ │ Task 0 │ │ │ │ Task 0 │ │ │
│ │ └───────────────┘ │ │ └───────────────┘ │ │
│ └─────────────────────┘ └─────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────────┘


┌───────────────┐
│ Kafka │
│ Cluster │
└───────────────┘

Key components:

ComponentRoleScaling
WorkerJVM process that hosts connector tasksAdd workers for more capacity
ConnectorLogical job definition (what to integrate)One connector can have many tasks
TaskUnit of work doing actual data movementConnector decides task count based on parallelism needs
ConverterTransforms data between Kafka format and connector formatConfigured per connector

Source vs Sink Connectors

Connectors come in two types based on data flow direction:

                    Source Connectors
(Data INTO Kafka)

┌──────────────┐ ▼ ┌──────────────┐
│ PostgreSQL │ ──────────────────▶ │ │
└──────────────┘ │ │
│ Kafka │
┌──────────────┐ │ Topics │
│ S3 │ ──────────────────▶ │ │
└──────────────┘ │ │
└──────────────┘

┌──────────────────┼──────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│Elasticsearch │ │ Redis │ │ S3 │
└──────────────┘ └──────────────┘ └──────────────┘

Sink Connectors
(Data OUT OF Kafka)

Source connectors produce data into Kafka topics:

Use CaseSource ConnectorData Flow
Database CDCDebezium PostgreSQLDB changes → Kafka
File ingestionFile SourceFiles → Kafka
JDBC pollingJDBC SourceSQL query results → Kafka
HTTP endpointsHTTP SourceAPI responses → Kafka

Sink connectors consume from Kafka topics and write to external systems:

Use CaseSink ConnectorData Flow
Search indexingElasticsearch SinkKafka → Elasticsearch
Data lakeS3 SinkKafka → S3 buckets
Database syncJDBC SinkKafka → Database tables
CachingRedis SinkKafka → Redis

Deploying Kafka Connect with Strimzi

Strimzi provides two CRDs for Kafka Connect:

  • KafkaConnect: Deploys a Kafka Connect cluster
  • KafkaConnector: Configures individual connectors

Step 1: Create KafkaConnect Resource

The KafkaConnect CRD deploys Connect workers and optionally builds a custom image with connector plugins:

# kafka-connect.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: task-connect
namespace: kafka
annotations:
strimzi.io/use-connector-resources: "true" # Enable KafkaConnector CRD
spec:
version: 4.1.1
replicas: 2 # Two workers for fault tolerance
bootstrapServers: task-events-kafka-bootstrap:9092

# Build connector image with plugins
build:
output:
type: docker
image: localhost:5000/task-connect:latest # Local registry or your registry
plugins:
- name: file-source
artifacts:
- type: maven
group: org.apache.kafka
artifact: connect-file
version: 4.1.1
- name: debezium-postgres
artifacts:
- type: maven
group: io.debezium
artifact: debezium-connector-postgres
version: 2.7.0.Final

config:
# Worker configuration
group.id: task-connect-cluster
offset.storage.topic: task-connect-offsets
config.storage.topic: task-connect-configs
status.storage.topic: task-connect-status

# Replication for internal topics
offset.storage.replication.factor: 1
config.storage.replication.factor: 1
status.storage.replication.factor: 1

# Converters (how data is serialized)
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false

resources:
requests:
memory: 512Mi
cpu: 250m
limits:
memory: 1Gi
cpu: 500m

Key configuration explained:

SettingPurpose
strimzi.io/use-connector-resourcesEnables managing connectors via KafkaConnector CRDs instead of REST API
build.pluginsDownloads connector JARs from Maven and builds them into the Connect image
offset.storage.topicKafka topic storing connector offset positions
config.storage.topicKafka topic storing connector configurations
status.storage.topicKafka topic storing connector status
key.converter / value.converterHow messages are serialized (JSON, Avro, String)

Step 2: Apply and Verify Deployment

# Apply KafkaConnect resource
kubectl apply -f kafka-connect.yaml -n kafka

# Watch for Connect pods to become ready (takes 2-5 minutes for image build)
kubectl get pods -n kafka -l strimzi.io/cluster=task-connect -w

Output:

NAME                                          READY   STATUS    RESTARTS   AGE
task-connect-connect-0 1/1 Running 0 3m
task-connect-connect-1 1/1 Running 0 3m
task-connect-build-1-build 0/1 Completed 0 5m

The build pod compiles the connector image, then the Connect workers start.

Step 3: Configure a Source Connector

With Kafka Connect running, deploy a connector using KafkaConnector CRD. This example uses the File Source connector (useful for testing):

# file-source-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: file-source
namespace: kafka
labels:
strimzi.io/cluster: task-connect # Links to KafkaConnect resource
spec:
class: org.apache.kafka.connect.file.FileStreamSourceConnector
tasksMax: 1
config:
file: /opt/kafka/data/input.txt
topic: file-events
# Apply the connector
kubectl apply -f file-source-connector.yaml -n kafka

# Check connector status
kubectl get kafkaconnectors -n kafka

Output:

NAME          CLUSTER        CONNECTOR CLASS                                          MAX TASKS   READY
file-source task-connect org.apache.kafka.connect.file.FileStreamSourceConnector 1 True

Step 4: Configure a Sink Connector

Sink connectors write Kafka data to external systems. This example writes to a file (for demonstration):

# file-sink-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: file-sink
namespace: kafka
labels:
strimzi.io/cluster: task-connect
spec:
class: org.apache.kafka.connect.file.FileStreamSinkConnector
tasksMax: 1
config:
file: /opt/kafka/data/output.txt
topics: file-events

Now data flows: input.txtfile-events topic → output.txt

Practical Example: Database to Elasticsearch Pipeline

A realistic pipeline might sync database changes to Elasticsearch for search. Here's how you'd configure it:

# postgres-source.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: postgres-tasks-source
namespace: kafka
labels:
strimzi.io/cluster: task-connect
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 1
config:
database.hostname: postgres.default.svc.cluster.local
database.port: "5432"
database.user: ${file:/opt/kafka/external-secrets/postgres:username}
database.password: ${file:/opt/kafka/external-secrets/postgres:password}
database.dbname: taskdb
database.server.name: taskdb
table.include.list: public.tasks
topic.prefix: cdc
plugin.name: pgoutput
---
# elasticsearch-sink.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: elasticsearch-tasks-sink
namespace: kafka
labels:
strimzi.io/cluster: task-connect
spec:
class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasksMax: 2
config:
connection.url: http://elasticsearch.default.svc.cluster.local:9200
topics: cdc.public.tasks
type.name: _doc
key.ignore: "false"
schema.ignore: "true"

Data flow:

  1. User creates task in PostgreSQL
  2. Debezium reads PostgreSQL WAL (transaction log)
  3. Change event published to cdc.public.tasks topic
  4. Elasticsearch sink writes to tasks index
  5. Task searchable in Elasticsearch within seconds

When to Use Kafka Connect vs Custom Code

Not every integration needs Kafka Connect. Here's a decision framework:

FactorUse Kafka ConnectUse Custom Producer/Consumer
Standard systemDatabases, S3, ElasticsearchCustom APIs, proprietary systems
Connector exists200+ pre-built connectorsNo connector available
Configuration-onlyYAML/JSON config sufficientComplex transformation logic
Operations teamPrefers declarative configPrefers code review process
MaintenanceConnector vendor maintainsYou maintain
Performance tuningStandard throughput needsExtreme optimization required
Domain logicSimple field mappingBusiness rules, validation

Choose Kafka Connect when:

  • A connector exists for your source/sink
  • Configuration can express your requirements
  • You want ecosystem support and community updates

Choose custom code when:

  • No connector exists for your system
  • Complex business logic during data movement
  • You need fine-grained control over batching, retries, error handling
  • Performance requirements exceed connector capabilities

Monitoring Connector Status

Check connector and task status:

# Connector-level status
kubectl get kafkaconnectors -n kafka

# Detailed status including tasks
kubectl get kafkaconnector file-source -n kafka -o yaml

Output (status section):

status:
conditions:
- lastTransitionTime: "2025-01-15T10:30:00Z"
status: "True"
type: Ready
connectorStatus:
connector:
state: RUNNING
worker_id: task-connect-connect-0.task-connect-connect.kafka.svc:8083
name: file-source
tasks:
- id: 0
state: RUNNING
worker_id: task-connect-connect-0.task-connect-connect.kafka.svc:8083
type: source

Common connector states:

StateMeaningAction
RUNNINGHealthyNone
PAUSEDTemporarily stoppedCheck if intentional
FAILEDTask crashedCheck logs, fix config
UNASSIGNEDNo worker picked up taskCheck worker health

View connector logs:

kubectl logs task-connect-connect-0 -n kafka | grep -i "file-source"

Reflect on Your Skill

You built a kafka-events skill in Lesson 0. Test and improve it based on what you learned.

Test Your Skill

Using my kafka-events skill, design a Kafka Connect pipeline that captures PostgreSQL changes and sinks them to Elasticsearch.
Does my skill generate KafkaConnect and KafkaConnector CRDs? Does it explain source vs sink connectors?

Identify Gaps

Ask yourself:

  • Did my skill explain Kafka Connect architecture (workers, tasks, connectors)?
  • Did it cover when to use Kafka Connect versus custom producers/consumers?

Improve Your Skill

If you found gaps:

My kafka-events skill is missing Kafka Connect patterns (source/sink connectors, KafkaConnect/KafkaConnector CRDs).
Update it to include when to use pre-built connectors versus writing custom integration code.

Try With AI

Setup: You need to build a data pipeline integrating multiple systems with Kafka.

Prompt 1: Design a pipeline architecture

I need to build a pipeline that:
1. Captures changes from a PostgreSQL "orders" table
2. Enriches data by joining with a "customers" topic
3. Writes to both Elasticsearch (for search) and S3 (for analytics)

Help me design this with Kafka Connect. What connectors do I need?
What Kafka topics should exist? Where does the enrichment happen?

What you're learning: AI helps you think through pipeline topology. It will likely point out that enrichment (joining streams) happens in Kafka Streams or ksqlDB, not in Connect itself—Connect just moves data, transformation is a separate concern.

Prompt 2: Troubleshoot a connector failure

My KafkaConnector shows this status:

status:
connectorStatus:
connector:
state: FAILED
trace: "org.apache.kafka.connect.errors.ConnectException:
Unable to connect to database: Connection refused"
tasks: []

What are the possible causes and how do I debug this step by step?
Include kubectl commands I should run.

What you're learning: AI walks through systematic debugging—network connectivity, credentials, service discovery in Kubernetes, and how to test database access from within the Connect pod.

Prompt 3: Evaluate for your use case

I'm building an integration between our Task API and a third-party
CRM system. The CRM has a REST API but no Kafka connector exists.
I need to:
- Push new tasks to CRM when created
- Pull CRM contact updates back to our system

Should I write custom producers/consumers or try to make Kafka Connect work?
What are the tradeoffs? Be specific about maintenance, complexity, and operations.

What you're learning: AI helps you apply the Connect vs custom code decision framework to your specific constraints, likely recommending custom code for the REST API integration while suggesting Connect patterns you might adopt in your custom implementation.

Safety note: When testing connectors, especially source connectors reading from databases, use read-only credentials or a replica database. A misconfigured connector can create load on production databases.