Your First Consumer (Python)
You've built a producer that reliably publishes events to Kafka. Now you need something to receive those events. In the previous lesson, you verified message delivery by checking broker acknowledgments. But real systems need consumers that process messages, handle failures gracefully, and track their position in the event stream.
Consuming from Kafka is fundamentally different from calling an API. With an API, you request data and get an immediate response. With Kafka, you poll for messages continuously, process whatever arrives, and tell Kafka you're done. This poll-process-commit loop is the heart of every Kafka consumer.
The challenge is reliability. What happens if your consumer crashes after reading a message but before processing it? What if it processes successfully but crashes before confirming? These edge cases determine whether your system loses messages, processes them twice, or handles them exactly right. Your commit strategy makes this decision.
Consumer Fundamentals
Before writing code, understand what a consumer actually does:
| Concept | Description |
|---|---|
| Consumer Group | Named group of consumers that share work. Each partition is assigned to exactly one consumer in the group. |
| Subscription | Topics this consumer wants to receive messages from. |
| Poll Loop | Continuous loop calling poll() to fetch messages from assigned partitions. |
| Offset | Position in partition. Consumer tracks "where am I?" to resume after restart. |
| Commit | Tell Kafka "I've processed up to this offset." |
The Poll Loop Pattern
Every Kafka consumer follows this structure:
while running:
messages = poll(timeout)
for message in messages:
process(message)
commit(message) # or auto-commit
The poll() call:
- Fetches available messages from assigned partitions
- Handles rebalancing when consumers join/leave
- Returns
Noneif no messages available within timeout - Must be called regularly (Kafka considers consumer dead if
poll()stops)
Setting Up the Consumer
Install the same library you used for the producer:
uv add confluent-kafka
Connecting to Kafka
Like in Lesson 5, use the NodePort to connect from your local machine:
| Where Your Code Runs | Bootstrap Server |
|---|---|
| Local machine (Mac/Windows) | localhost:30092 |
| Pod in same namespace | task-events-kafka-bootstrap:9092 |
Basic Consumer Configuration
from confluent_kafka import Consumer, KafkaError
consumer = Consumer({
'bootstrap.servers': 'localhost:30092', # NodePort for local dev
'group.id': 'task-notification-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True,
'auto.commit.interval.ms': 5000
})
Output:
Consumer created with group.id: task-notification-service
Let's understand each configuration:
| Setting | Value | Purpose |
|---|---|---|
bootstrap.servers | Kafka cluster address | Where to connect |
group.id | Unique consumer group name | Identifies this consumer group for partition assignment |
auto.offset.reset | earliest or latest | Where to start if no committed offset exists |
enable.auto.commit | True or False | Whether Kafka auto-commits offsets periodically |
auto.commit.interval.ms | 5000 | How often to auto-commit (if enabled) |
The auto.offset.reset Decision
When a consumer group first subscribes to a topic (or when its committed offsets have expired), Kafka needs to know where to start reading:
| Value | Behavior | Use When |
|---|---|---|
earliest | Read from the beginning of the topic | You need to process all historical messages |
latest | Read only new messages from now on | You only care about future events |
Example scenario: Your notification service starts for the first time. With earliest, it processes all past task-created events (potentially thousands). With latest, it ignores history and only notifies for new tasks.
# Process all historical events (good for data pipelines, audit logs)
'auto.offset.reset': 'earliest'
# Only future events (good for real-time notifications)
'auto.offset.reset': 'latest'
Choose based on your business requirements, not technical preference.
The Complete Poll Loop
Here's a production-ready consumer with proper error handling:
from confluent_kafka import Consumer, KafkaError
import json
import signal
import sys
# Graceful shutdown handling
running = True
def signal_handler(sig, frame):
global running
print("Shutdown signal received...")
running = False
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Consumer configuration
consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'task-notification-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True,
'auto.commit.interval.ms': 5000
})
# Subscribe to topic(s)
consumer.subscribe(['task-created'])
print("Subscribed to topic: task-created")
try:
while running:
# Poll for messages (1 second timeout)
msg = consumer.poll(timeout=1.0)
if msg is None:
# No message available within timeout
continue
if msg.error():
# Handle specific errors
if msg.error().code() == KafkaError._PARTITION_EOF:
# Reached end of partition (not an error, just informational)
print(f"Reached end of partition {msg.partition()}")
continue
else:
# Actual error
print(f"Consumer error: {msg.error()}")
continue
# Process the message
key = msg.key().decode('utf-8') if msg.key() else None
value = json.loads(msg.value().decode('utf-8'))
print(f"Received task: {value.get('title', 'unknown')}")
print(f" Key: {key}")
print(f" Partition: {msg.partition()}, Offset: {msg.offset()}")
# Your business logic here
# send_notification(value)
finally:
# Clean shutdown
print("Closing consumer...")
consumer.close()
print("Consumer closed.")
Output:
Subscribed to topic: task-created
Received task: Buy groceries
Key: task-123
Partition: 0, Offset: 0
Received task: Complete report
Key: task-456
Partition: 1, Offset: 0
Reached end of partition 0
Reached end of partition 1
^CShutdown signal received...
Closing consumer...
Consumer closed.
Understanding the Error Handling
The msg.error() check catches several conditions:
| Error Code | Meaning | Action |
|---|---|---|
_PARTITION_EOF | Reached end of available messages | Continue polling (more messages may arrive) |
_UNKNOWN_TOPIC_OR_PART | Topic doesn't exist | Check topic name, may need to create topic |
_ALL_BROKERS_DOWN | Can't reach any broker | Check network, broker health |
The _PARTITION_EOF error is not really an error. It means "you've caught up with this partition." Continue polling for new messages.
Why consumer.close() Matters
Always call consumer.close() when shutting down:
- Triggers immediate rebalance: Other consumers in the group get your partitions faster
- Commits pending offsets: Ensures processed messages aren't re-processed
- Releases resources: Cleans up connections and memory
Without close(), Kafka waits for session timeout (default 45 seconds) before reassigning partitions.
Auto-Commit vs Manual Commit
This is the most important decision for your consumer. It determines your message delivery guarantee.
Auto-Commit: Simple but Risky
With auto-commit enabled, Kafka periodically commits offsets in the background:
consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'my-service',
'enable.auto.commit': True,
'auto.commit.interval.ms': 5000 # Every 5 seconds
})
The Problem: Consider this timeline:
T=0: poll() returns message at offset 100
T=1: Start processing message
T=2: Auto-commit happens -> Kafka records "processed up to 100"
T=3: Processing fails, consumer crashes
T=4: Consumer restarts, asks Kafka "where was I?"
T=5: Kafka says "offset 101" -> Message at offset 100 is LOST
Auto-commit committed the offset before processing completed. If processing fails, that message is never retried.
When auto-commit is acceptable:
- Processing is fast and unlikely to fail
- Losing occasional messages is acceptable
- You have separate retry/dead-letter mechanisms
Manual Commit: Control and Safety
Manual commit lets you commit AFTER successful processing:
from confluent_kafka import Consumer, KafkaError
consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'task-audit-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Disable auto-commit
})
consumer.subscribe(['task-created'])
try:
while running:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
print(f"Error: {msg.error()}")
continue
# Process the message
try:
value = json.loads(msg.value().decode('utf-8'))
# Your business logic (e.g., write to database)
save_to_audit_log(value)
# Commit AFTER successful processing
consumer.commit(message=msg)
print(f"Processed and committed offset {msg.offset()}")
except Exception as e:
print(f"Processing failed: {e}")
# Don't commit - message will be reprocessed on restart
finally:
consumer.close()
Output:
Processed and committed offset 0
Processed and committed offset 1
Processed and committed offset 2
Processing failed: Database connection error
# Consumer restarts later...
Processed and committed offset 3 # Retry of failed message
Timeline with manual commit:
T=0: poll() returns message at offset 100
T=1: Start processing message
T=2: Processing succeeds
T=3: commit(message) -> Kafka records "processed up to 100"
T=4: If crash happened at T=2, message would be reprocessed
Commit Strategies Compared
| Strategy | Code | Guarantee | Risk |
|---|---|---|---|
| Auto-commit | enable.auto.commit: True | At-most-once (may lose) | Message loss if crash after poll |
| Commit per message | commit(message=msg) | At-least-once (may duplicate) | Slow, but safe |
| Commit per batch | commit() after N messages | At-least-once | Better throughput, batch may duplicate |
At-Least-Once Pattern (Recommended Default)
For most applications, at-least-once is the right choice. Process, then commit:
# Process message
result = process_task_event(value)
# Only commit if processing succeeded
if result.success:
consumer.commit(message=msg)
else:
# Log failure, don't commit
# Message will be reprocessed on next consumer restart
logger.error(f"Failed to process: {result.error}")
Your processing code must be idempotent. If the same message is processed twice (consumer crashed after processing but before commit), the result should be the same.
Idempotent processing example:
def process_task_event(event):
"""Idempotent: Uses event_id as database primary key."""
task_id = event['id']
# Check if already processed
if audit_log.exists(task_id):
return Result(success=True, skipped=True)
# Process and save
audit_log.insert(task_id, event)
return Result(success=True)
Synchronous vs Asynchronous Commit
The commit() method has two modes:
Synchronous Commit (Default)
consumer.commit(message=msg, asynchronous=False)
- Blocks until Kafka confirms the commit
- Slower but guarantees commit succeeded
- Use when you need certainty
Asynchronous Commit
consumer.commit(message=msg, asynchronous=True)
- Returns immediately, commit happens in background
- Faster but commit might fail silently
- Use when throughput matters more than certainty
For critical data, use synchronous. For high-throughput scenarios where occasional reprocessing is acceptable, use asynchronous.
Running Your Consumer
In Development (Local Port Forward)
# Forward Kafka bootstrap service to localhost
kubectl port-forward svc/task-events-kafka-bootstrap 9092:9092 -n kafka
# In another terminal, run your consumer
python consumer.py
In Kubernetes (as a Deployment)
Create a consumer deployment that runs alongside your producer:
# consumer-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: task-notification-consumer
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: task-notification-consumer
template:
metadata:
labels:
app: task-notification-consumer
spec:
containers:
- name: consumer
image: python:3.12-slim
command: ["python", "/app/consumer.py"]
volumeMounts:
- name: app-code
mountPath: /app
volumes:
- name: app-code
configMap:
name: consumer-code
Verifying Consumer Behavior
Use the Kafka CLI to check consumer group status:
# Check consumer group lag
kubectl exec -it task-events-dual-role-0 -n kafka -- \
/opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group task-notification-service \
--describe
Output:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
task-notification-service task-created 0 15 15 0
task-notification-service task-created 1 12 12 0
task-notification-service task-created 2 18 18 0
| Column | Meaning |
|---|---|
| CURRENT-OFFSET | Where consumer has committed |
| LOG-END-OFFSET | Latest message in partition |
| LAG | Messages behind (LOG-END - CURRENT) |
A lag of 0 means your consumer is caught up. Growing lag means consumer can't keep up with producers.
Common Mistakes to Avoid
| Mistake | Problem | Solution |
|---|---|---|
Forgetting consumer.close() | Slow rebalancing, uncommitted offsets | Always close in finally block |
| Auto-commit with slow processing | Message loss on crash | Use manual commit for slow/risky processing |
Not handling _PARTITION_EOF | Treating it as error, stopping consumer | Continue polling, it's informational |
| Committing before processing | Message loss if processing fails | Always process first, commit second |
| Single consumer for high-volume topic | Can't keep up, growing lag | Scale with more consumers (up to partition count) |
Reflect on Your Skill
You built a kafka-events skill in Lesson 0. Test and improve it based on what you learned.
Test Your Skill
Using my kafka-events skill, generate Python consumer code that processes task events from a Kafka topic.
Does my skill include proper offset management, error handling, and graceful shutdown?
Identify Gaps
Ask yourself:
- Did my skill explain auto.offset.reset and enable.auto.commit?
- Did it show how to handle message processing errors without losing data?
Improve Your Skill
If you found gaps:
My kafka-events skill is missing consumer implementation patterns (offset management, error handling, shutdown).
Update it to include when to use auto-commit vs manual commit and how to handle processing failures.
Try With AI
Prompt 1: Debug a Consumer That's Losing Messages
I have a Kafka consumer that seems to be losing messages. Here's my code:
consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'my-service',
'enable.auto.commit': True,
'auto.commit.interval.ms': 1000
})
while True:
msg = consumer.poll(1.0)
if msg:
result = process_message(msg) # Takes 5-10 seconds
if not result.success:
print("Failed, will retry later")
When the consumer restarts after a crash, some messages are missing.
What's wrong with this code and how do I fix it?
What you're learning: Diagnosing the auto-commit timing problem and implementing proper at-least-once semantics.
Prompt 2: Design Idempotent Processing
I need to process task-created events and send email notifications.
Each event contains: task_id, title, owner_email, created_at.
My consumer uses at-least-once delivery, so duplicates are possible.
Help me design idempotent processing so users don't get duplicate emails.
What data should I track? Where should I store it?
Walk me through the logic step by step.
What you're learning: Designing idempotent consumers that handle duplicates gracefully using state tracking.
Prompt 3: Choose the Right Commit Strategy
I'm building three different consumers for the same task-events topic:
1. Real-time dashboard that shows live task counts
2. Audit log that must never miss an event
3. Analytics pipeline that aggregates daily statistics
For each consumer, help me decide:
- auto.offset.reset: earliest or latest?
- enable.auto.commit: true or false?
- If manual commit: per-message or batch?
Explain the trade-offs for each choice.
What you're learning: Matching commit strategies to business requirements, understanding that different consumers of the same topic may need different configurations.
Important: When working with Kafka consumers, always test your error handling by simulating failures. Kill your consumer mid-processing and verify it correctly resumes from the last committed offset.