Skip to main content
Updated Feb 23, 2026

Your First Consumer (Python)

You've built a producer that reliably publishes events to Kafka. Now you need something to receive those events. In the previous lesson, you verified message delivery by checking broker acknowledgments. But real systems need consumers that process messages, handle failures gracefully, and track their position in the event stream.

Consuming from Kafka is fundamentally different from calling an API. With an API, you request data and get an immediate response. With Kafka, you poll for messages continuously, process whatever arrives, and tell Kafka you're done. This poll-process-commit loop is the heart of every Kafka consumer.

The challenge is reliability. What happens if your consumer crashes after reading a message but before processing it? What if it processes successfully but crashes before confirming? These edge cases determine whether your system loses messages, processes them twice, or handles them exactly right. Your commit strategy makes this decision.

Consumer Fundamentals

Before writing code, understand what a consumer actually does:

ConceptDescription
Consumer GroupNamed group of consumers that share work. Each partition is assigned to exactly one consumer in the group.
SubscriptionTopics this consumer wants to receive messages from.
Poll LoopContinuous loop calling poll() to fetch messages from assigned partitions.
OffsetPosition in partition. Consumer tracks "where am I?" to resume after restart.
CommitTell Kafka "I've processed up to this offset."

The Poll Loop Pattern

Every Kafka consumer follows this structure:

while running:
messages = poll(timeout)
for message in messages:
process(message)
commit(message) # or auto-commit

The poll() call:

  • Fetches available messages from assigned partitions
  • Handles rebalancing when consumers join/leave
  • Returns None if no messages available within timeout
  • Must be called regularly (Kafka considers consumer dead if poll() stops)

Setting Up the Consumer

Install the same library you used for the producer:

uv add confluent-kafka

Connecting to Kafka

Like in Lesson 5, use the NodePort to connect from your local machine:

Where Your Code RunsBootstrap Server
Local machine (Mac/Windows)localhost:30092
Pod in same namespacetask-events-kafka-bootstrap:9092

Basic Consumer Configuration

from confluent_kafka import Consumer, KafkaError

consumer = Consumer({
'bootstrap.servers': 'localhost:30092', # NodePort for local dev
'group.id': 'task-notification-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True,
'auto.commit.interval.ms': 5000
})

Output:

Consumer created with group.id: task-notification-service

Let's understand each configuration:

SettingValuePurpose
bootstrap.serversKafka cluster addressWhere to connect
group.idUnique consumer group nameIdentifies this consumer group for partition assignment
auto.offset.resetearliest or latestWhere to start if no committed offset exists
enable.auto.commitTrue or FalseWhether Kafka auto-commits offsets periodically
auto.commit.interval.ms5000How often to auto-commit (if enabled)

The auto.offset.reset Decision

When a consumer group first subscribes to a topic (or when its committed offsets have expired), Kafka needs to know where to start reading:

ValueBehaviorUse When
earliestRead from the beginning of the topicYou need to process all historical messages
latestRead only new messages from now onYou only care about future events

Example scenario: Your notification service starts for the first time. With earliest, it processes all past task-created events (potentially thousands). With latest, it ignores history and only notifies for new tasks.

# Process all historical events (good for data pipelines, audit logs)
'auto.offset.reset': 'earliest'

# Only future events (good for real-time notifications)
'auto.offset.reset': 'latest'

Choose based on your business requirements, not technical preference.

The Complete Poll Loop

Here's a production-ready consumer with proper error handling:

from confluent_kafka import Consumer, KafkaError
import json
import signal
import sys

# Graceful shutdown handling
running = True

def signal_handler(sig, frame):
global running
print("Shutdown signal received...")
running = False

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

# Consumer configuration
consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'task-notification-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True,
'auto.commit.interval.ms': 5000
})

# Subscribe to topic(s)
consumer.subscribe(['task-created'])
print("Subscribed to topic: task-created")

try:
while running:
# Poll for messages (1 second timeout)
msg = consumer.poll(timeout=1.0)

if msg is None:
# No message available within timeout
continue

if msg.error():
# Handle specific errors
if msg.error().code() == KafkaError._PARTITION_EOF:
# Reached end of partition (not an error, just informational)
print(f"Reached end of partition {msg.partition()}")
continue
else:
# Actual error
print(f"Consumer error: {msg.error()}")
continue

# Process the message
key = msg.key().decode('utf-8') if msg.key() else None
value = json.loads(msg.value().decode('utf-8'))

print(f"Received task: {value.get('title', 'unknown')}")
print(f" Key: {key}")
print(f" Partition: {msg.partition()}, Offset: {msg.offset()}")

# Your business logic here
# send_notification(value)

finally:
# Clean shutdown
print("Closing consumer...")
consumer.close()
print("Consumer closed.")

Output:

Subscribed to topic: task-created
Received task: Buy groceries
Key: task-123
Partition: 0, Offset: 0
Received task: Complete report
Key: task-456
Partition: 1, Offset: 0
Reached end of partition 0
Reached end of partition 1
^CShutdown signal received...
Closing consumer...
Consumer closed.

Understanding the Error Handling

The msg.error() check catches several conditions:

Error CodeMeaningAction
_PARTITION_EOFReached end of available messagesContinue polling (more messages may arrive)
_UNKNOWN_TOPIC_OR_PARTTopic doesn't existCheck topic name, may need to create topic
_ALL_BROKERS_DOWNCan't reach any brokerCheck network, broker health

The _PARTITION_EOF error is not really an error. It means "you've caught up with this partition." Continue polling for new messages.

Why consumer.close() Matters

Always call consumer.close() when shutting down:

  1. Triggers immediate rebalance: Other consumers in the group get your partitions faster
  2. Commits pending offsets: Ensures processed messages aren't re-processed
  3. Releases resources: Cleans up connections and memory

Without close(), Kafka waits for session timeout (default 45 seconds) before reassigning partitions.

Auto-Commit vs Manual Commit

This is the most important decision for your consumer. It determines your message delivery guarantee.

Auto-Commit: Simple but Risky

With auto-commit enabled, Kafka periodically commits offsets in the background:

consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'my-service',
'enable.auto.commit': True,
'auto.commit.interval.ms': 5000 # Every 5 seconds
})

The Problem: Consider this timeline:

T=0:    poll() returns message at offset 100
T=1: Start processing message
T=2: Auto-commit happens -> Kafka records "processed up to 100"
T=3: Processing fails, consumer crashes
T=4: Consumer restarts, asks Kafka "where was I?"
T=5: Kafka says "offset 101" -> Message at offset 100 is LOST

Auto-commit committed the offset before processing completed. If processing fails, that message is never retried.

When auto-commit is acceptable:

  • Processing is fast and unlikely to fail
  • Losing occasional messages is acceptable
  • You have separate retry/dead-letter mechanisms

Manual Commit: Control and Safety

Manual commit lets you commit AFTER successful processing:

from confluent_kafka import Consumer, KafkaError

consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'task-audit-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Disable auto-commit
})

consumer.subscribe(['task-created'])

try:
while running:
msg = consumer.poll(timeout=1.0)

if msg is None:
continue

if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
print(f"Error: {msg.error()}")
continue

# Process the message
try:
value = json.loads(msg.value().decode('utf-8'))

# Your business logic (e.g., write to database)
save_to_audit_log(value)

# Commit AFTER successful processing
consumer.commit(message=msg)
print(f"Processed and committed offset {msg.offset()}")

except Exception as e:
print(f"Processing failed: {e}")
# Don't commit - message will be reprocessed on restart

finally:
consumer.close()

Output:

Processed and committed offset 0
Processed and committed offset 1
Processed and committed offset 2
Processing failed: Database connection error
# Consumer restarts later...
Processed and committed offset 3 # Retry of failed message

Timeline with manual commit:

T=0:    poll() returns message at offset 100
T=1: Start processing message
T=2: Processing succeeds
T=3: commit(message) -> Kafka records "processed up to 100"
T=4: If crash happened at T=2, message would be reprocessed

Commit Strategies Compared

StrategyCodeGuaranteeRisk
Auto-commitenable.auto.commit: TrueAt-most-once (may lose)Message loss if crash after poll
Commit per messagecommit(message=msg)At-least-once (may duplicate)Slow, but safe
Commit per batchcommit() after N messagesAt-least-onceBetter throughput, batch may duplicate

For most applications, at-least-once is the right choice. Process, then commit:

# Process message
result = process_task_event(value)

# Only commit if processing succeeded
if result.success:
consumer.commit(message=msg)
else:
# Log failure, don't commit
# Message will be reprocessed on next consumer restart
logger.error(f"Failed to process: {result.error}")

Your processing code must be idempotent. If the same message is processed twice (consumer crashed after processing but before commit), the result should be the same.

Idempotent processing example:

def process_task_event(event):
"""Idempotent: Uses event_id as database primary key."""
task_id = event['id']

# Check if already processed
if audit_log.exists(task_id):
return Result(success=True, skipped=True)

# Process and save
audit_log.insert(task_id, event)
return Result(success=True)

Synchronous vs Asynchronous Commit

The commit() method has two modes:

Synchronous Commit (Default)

consumer.commit(message=msg, asynchronous=False)
  • Blocks until Kafka confirms the commit
  • Slower but guarantees commit succeeded
  • Use when you need certainty

Asynchronous Commit

consumer.commit(message=msg, asynchronous=True)
  • Returns immediately, commit happens in background
  • Faster but commit might fail silently
  • Use when throughput matters more than certainty

For critical data, use synchronous. For high-throughput scenarios where occasional reprocessing is acceptable, use asynchronous.

Running Your Consumer

In Development (Local Port Forward)

# Forward Kafka bootstrap service to localhost
kubectl port-forward svc/task-events-kafka-bootstrap 9092:9092 -n kafka

# In another terminal, run your consumer
python consumer.py

In Kubernetes (as a Deployment)

Create a consumer deployment that runs alongside your producer:

# consumer-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: task-notification-consumer
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: task-notification-consumer
template:
metadata:
labels:
app: task-notification-consumer
spec:
containers:
- name: consumer
image: python:3.12-slim
command: ["python", "/app/consumer.py"]
volumeMounts:
- name: app-code
mountPath: /app
volumes:
- name: app-code
configMap:
name: consumer-code

Verifying Consumer Behavior

Use the Kafka CLI to check consumer group status:

# Check consumer group lag
kubectl exec -it task-events-dual-role-0 -n kafka -- \
/opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group task-notification-service \
--describe

Output:

GROUP                    TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
task-notification-service task-created 0 15 15 0
task-notification-service task-created 1 12 12 0
task-notification-service task-created 2 18 18 0
ColumnMeaning
CURRENT-OFFSETWhere consumer has committed
LOG-END-OFFSETLatest message in partition
LAGMessages behind (LOG-END - CURRENT)

A lag of 0 means your consumer is caught up. Growing lag means consumer can't keep up with producers.

Common Mistakes to Avoid

MistakeProblemSolution
Forgetting consumer.close()Slow rebalancing, uncommitted offsetsAlways close in finally block
Auto-commit with slow processingMessage loss on crashUse manual commit for slow/risky processing
Not handling _PARTITION_EOFTreating it as error, stopping consumerContinue polling, it's informational
Committing before processingMessage loss if processing failsAlways process first, commit second
Single consumer for high-volume topicCan't keep up, growing lagScale with more consumers (up to partition count)

Reflect on Your Skill

You built a kafka-events skill in Lesson 0. Test and improve it based on what you learned.

Test Your Skill

Using my kafka-events skill, generate Python consumer code that processes task events from a Kafka topic.
Does my skill include proper offset management, error handling, and graceful shutdown?

Identify Gaps

Ask yourself:

  • Did my skill explain auto.offset.reset and enable.auto.commit?
  • Did it show how to handle message processing errors without losing data?

Improve Your Skill

If you found gaps:

My kafka-events skill is missing consumer implementation patterns (offset management, error handling, shutdown).
Update it to include when to use auto-commit vs manual commit and how to handle processing failures.

Try With AI

Prompt 1: Debug a Consumer That's Losing Messages

I have a Kafka consumer that seems to be losing messages. Here's my code:

consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'my-service',
'enable.auto.commit': True,
'auto.commit.interval.ms': 1000
})

while True:
msg = consumer.poll(1.0)
if msg:
result = process_message(msg) # Takes 5-10 seconds
if not result.success:
print("Failed, will retry later")

When the consumer restarts after a crash, some messages are missing.
What's wrong with this code and how do I fix it?

What you're learning: Diagnosing the auto-commit timing problem and implementing proper at-least-once semantics.

Prompt 2: Design Idempotent Processing

I need to process task-created events and send email notifications.
Each event contains: task_id, title, owner_email, created_at.

My consumer uses at-least-once delivery, so duplicates are possible.
Help me design idempotent processing so users don't get duplicate emails.

What data should I track? Where should I store it?
Walk me through the logic step by step.

What you're learning: Designing idempotent consumers that handle duplicates gracefully using state tracking.

Prompt 3: Choose the Right Commit Strategy

I'm building three different consumers for the same task-events topic:

1. Real-time dashboard that shows live task counts
2. Audit log that must never miss an event
3. Analytics pipeline that aggregates daily statistics

For each consumer, help me decide:
- auto.offset.reset: earliest or latest?
- enable.auto.commit: true or false?
- If manual commit: per-message or batch?

Explain the trade-offs for each choice.

What you're learning: Matching commit strategies to business requirements, understanding that different consumers of the same topic may need different configurations.

Important: When working with Kafka consumers, always test your error handling by simulating failures. Kill your consumer mid-processing and verify it correctly resumes from the last committed offset.