Consumer Deep Dive: Groups and Rebalancing
You deployed a notification service consuming from task-created. It works perfectly with one consumer. Then traffic spikes during a product launch, and you scale to three consumers. Suddenly, messages are processed twice. A rebalance occurred, your consumer hadn't committed its offset, and another consumer reprocessed the same messages.
This is one of the most common Kafka production issues. Understanding consumer groups and rebalancing isn't optional—it's essential for reliable event processing. In this lesson, you'll learn why rebalances happen, how to handle them safely, and how to diagnose the consumer lag that often triggers scaling decisions.
The pattern you'll learn here—committing offsets during rebalance callbacks—prevents duplicate processing in virtually every Kafka consumer you'll ever write. Master this, and you've solved one of distributed messaging's trickiest problems.
How Consumer Groups Distribute Work
When multiple consumers share a group.id, Kafka treats them as a team working together. Instead of each consumer receiving every message (like a pub/sub topic), Kafka assigns partitions to individual consumers so each message is processed exactly once within the group.
Topic: task-created (3 partitions)
┌──────────────┬──────────────┬──────────────┐
│ Partition 0 │ Partition 1 │ Partition 2 │
└──────┬───────┴──────┬───────┴──────┬───────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────┐
│ Consumer Group: notification-svc │
├─────────────┬─────────────┬─────────────────┤
│ Consumer A │ Consumer B │ Consumer C │
│ (P0) │ (P1) │ (P2) │
└─────────────┴─────────────┴─────────────────┘
Key rules for partition assignment:
| Rule | Implication |
|---|---|
| One partition = one consumer | A partition is never processed by multiple consumers in the same group |
| Consumers can handle multiple partitions | If you have 3 partitions and 2 consumers, one consumer gets 2 partitions |
| More consumers than partitions = idle consumers | Adding a 4th consumer to a 3-partition topic leaves one consumer doing nothing |
This design ensures ordering within a partition. Messages with the same key always go to the same partition, so they're always processed by the same consumer in order.
What Triggers Rebalancing
A rebalance redistributes partitions when the group membership changes. The group coordinator (a Kafka broker) detects changes and orchestrates the redistribution.
Triggers that cause rebalancing:
| Trigger | Example | Impact |
|---|---|---|
| Consumer joins | Scaling up from 2 to 3 instances | Partitions redistributed to include new consumer |
| Consumer leaves | Instance shutdown, crash, or scaling down | Orphaned partitions assigned to remaining consumers |
| Consumer timeout | Consumer fails to send heartbeat within session.timeout.ms | Group assumes consumer is dead |
| Subscription change | Consumer calls subscribe() with different topics | Full group rebalance |
| Partition count change | Admin adds partitions to topic | New partitions assigned to consumers |
The Rebalance Problem: Lost Offsets
Here's why rebalancing causes duplicate processing:
Timeline:
1. Consumer A polls messages 100-105 from Partition 0
2. Consumer A processes messages 100-103
3. REBALANCE STARTS - Partition 0 assigned to Consumer B
4. Consumer A loses Partition 0 before committing offset 103
5. Consumer B starts from last committed offset (99)
6. Messages 100-103 processed AGAIN by Consumer B
The window between processing and committing is the danger zone. Without proper handling, any messages processed but not committed will be reprocessed.
Implementing Rebalance Callbacks
The on_revoke callback is your opportunity to commit offsets before losing partitions. The on_assign callback lets you set up state for newly assigned partitions.
from confluent_kafka import Consumer, TopicPartition
def on_assign(consumer, partitions):
"""Called when partitions are assigned to this consumer."""
partition_list = [f"{p.topic}[{p.partition}]" for p in partitions]
print(f"Assigned partitions: {partition_list}")
# Optional: Seek to specific offset or reset position
# for partition in partitions:
# consumer.seek(TopicPartition(partition.topic, partition.partition, 0))
def on_revoke(consumer, partitions):
"""Called when partitions are being revoked from this consumer.
CRITICAL: Commit offsets synchronously here. Async commit may not
complete before partition is reassigned.
"""
partition_list = [f"{p.topic}[{p.partition}]" for p in partitions]
print(f"Revoking partitions: {partition_list}")
# Synchronous commit - blocks until broker confirms
try:
consumer.commit(asynchronous=False)
print("Offsets committed successfully before revocation")
except Exception as e:
print(f"Failed to commit during revoke: {e}")
# Configure consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'notification-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Manual commit for safety
})
# Subscribe with callbacks
consumer.subscribe(
['task-created'],
on_assign=on_assign,
on_revoke=on_revoke
)
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Error: {msg.error()}")
continue
# Process message
print(f"Processing: {msg.value().decode()}")
# Commit after successful processing
consumer.commit(message=msg)
finally:
consumer.close()
Output (during normal operation):
Assigned partitions: ['task-created[0]', 'task-created[1]']
Processing: {"id": "task-1", "title": "Buy groceries"}
Processing: {"id": "task-2", "title": "Call dentist"}
Output (during rebalance when new consumer joins):
Revoking partitions: ['task-created[1]']
Offsets committed successfully before revocation
Assigned partitions: ['task-created[0]']
Why synchronous commit in on_revoke? Asynchronous commits may not complete before the partition is reassigned. The new consumer would start from an old offset, causing duplicates. The brief blocking during synchronous commit is worth the guarantee.
Cooperative vs Eager Rebalancing
Kafka supports two rebalancing protocols:
Eager Rebalancing (Legacy):
1. ALL consumers stop processing
2. ALL partitions revoked from everyone
3. Coordinator recalculates assignments
4. ALL partitions reassigned
5. ALL consumers resume
Duration: Several seconds of complete stoppage
Cooperative (Incremental) Rebalancing (Modern):
1. Only AFFECTED partitions identified
2. Only those partitions revoked
3. Other consumers continue processing
4. Revoked partitions reassigned
5. Minimal disruption
Duration: Milliseconds for unaffected consumers
Configure cooperative rebalancing in your consumer:
consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'notification-service',
'partition.assignment.strategy': 'cooperative-sticky', # Cooperative mode
'enable.auto.commit': False
})
Output:
The cooperative-sticky strategy minimizes partition movement. If Consumer A had partitions 0 and 1, and Consumer B joins, the coordinator might only move partition 1 to B—Consumer A keeps processing partition 0 uninterrupted.
Static Membership: Reducing Unnecessary Rebalances
Every time a consumer restarts (even briefly), it triggers a rebalance. In Kubernetes, rolling updates cause repeated rebalances as pods restart. Static membership solves this by giving each consumer a persistent identity.
import os
consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'notification-service',
'group.instance.id': f"notification-{os.environ.get('HOSTNAME', 'local')}", # Static identity
'session.timeout.ms': 45000, # Longer timeout for restarts
'enable.auto.commit': False
})
How static membership works:
| Without Static Membership | With Static Membership |
|---|---|
| Consumer restarts | Consumer restarts |
| Immediate rebalance triggered | No rebalance during session.timeout.ms window |
| Partitions redistributed | Consumer rejoins with same instance ID |
| New assignment may differ | Gets same partitions back |
When to use static membership:
- Kubernetes deployments with rolling updates
- Consumers with local state (cache, in-memory aggregations)
- High-throughput systems where rebalance pauses are costly
Understanding Consumer Lag
Consumer lag is the difference between the latest offset in a partition and the offset your consumer has processed. It's the most important metric for consumer health.
Partition 0:
Latest offset (log-end): 1000
Consumer offset: 850
LAG = 1000 - 850 = 150 messages behind
What lag tells you:
| Lag Level | Meaning | Action |
|---|---|---|
| 0 | Caught up | Healthy |
| Stable low number | Processing at production rate | Monitor |
| Growing | Producing faster than consuming | Scale consumers or optimize processing |
| Very large | Significant backlog | May need to skip or process in batches |
Checking lag with Kafka CLI:
# Inside Kafka pod or with kafka-consumer-groups.sh available
kubectl exec -it task-events-kafka-0 -n kafka -- \
/opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group notification-service
Output:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
notification-service task-created 0 850 1000 150
notification-service task-created 1 920 920 0
notification-service task-created 2 780 800 20
This output shows:
- Partition 0 is 150 messages behind (potential problem)
- Partition 1 is caught up (healthy)
- Partition 2 has minimal lag (healthy)
Diagnosing Common Consumer Lag Issues
When you see growing lag, systematically diagnose:
1. Processing too slow?
import time
# Add timing to identify slow processing
start = time.time()
process_message(msg)
duration = time.time() - start
if duration > 0.1: # > 100ms per message
print(f"Slow processing: {duration:.2f}s for {msg.key()}")
2. Polling too infrequently?
# Increase max.poll.records to process more per poll
consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'notification-service',
'max.poll.records': 500, # Default is 500, adjust based on processing speed
})
3. Not enough consumers?
If you have 3 partitions but only 1 consumer, one consumer handles all load. Scale to 3 consumers for parallel processing.
4. Rebalancing too frequently?
Frequent rebalances interrupt processing. Check for:
- Consumers with slow heartbeats (increase
session.timeout.ms) - Deployments causing constant restarts (use static membership)
- Processing taking longer than
max.poll.interval.ms
consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'notification-service',
'session.timeout.ms': 45000, # 45 seconds
'max.poll.interval.ms': 300000, # 5 minutes for slow processing
'heartbeat.interval.ms': 10000 # 10 seconds
})
Putting It Together: Production Consumer Pattern
Here's the complete pattern combining rebalance callbacks, cooperative rebalancing, static membership, and lag awareness:
from confluent_kafka import Consumer, KafkaError
import os
import time
class ProductionConsumer:
def __init__(self, group_id: str, topics: list[str]):
self.consumer = Consumer({
'bootstrap.servers': os.environ.get(
'KAFKA_BOOTSTRAP_SERVERS',
'localhost:30092'
),
'group.id': group_id,
'group.instance.id': f"{group_id}-{os.environ.get('HOSTNAME', 'local')}",
'partition.assignment.strategy': 'cooperative-sticky',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'session.timeout.ms': 45000,
'max.poll.interval.ms': 300000,
})
self.consumer.subscribe(
topics,
on_assign=self._on_assign,
on_revoke=self._on_revoke
)
self.running = True
self.messages_processed = 0
self.last_lag_check = time.time()
def _on_assign(self, consumer, partitions):
partition_list = [f"{p.topic}[{p.partition}]" for p in partitions]
print(f"Assigned: {partition_list}")
def _on_revoke(self, consumer, partitions):
partition_list = [f"{p.topic}[{p.partition}]" for p in partitions]
print(f"Revoking: {partition_list}")
try:
consumer.commit(asynchronous=False)
print("Pre-revoke commit successful")
except Exception as e:
print(f"Pre-revoke commit failed: {e}")
def process(self, msg):
"""Override this method with your processing logic."""
print(f"Processing: {msg.value().decode()}")
return True
def run(self):
try:
while self.running:
msg = self.consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
print(f"Error: {msg.error()}")
continue
start = time.time()
success = self.process(msg)
duration = time.time() - start
if duration > 0.1:
print(f"Slow processing: {duration:.2f}s")
if success:
self.consumer.commit(message=msg)
self.messages_processed += 1
finally:
self.consumer.close()
print(f"Shutdown. Processed {self.messages_processed} messages.")
# Usage
if __name__ == "__main__":
consumer = ProductionConsumer(
group_id='notification-service',
topics=['task-created', 'task-completed']
)
consumer.run()
Output:
Assigned: ['task-created[0]', 'task-created[1]', 'task-completed[0]']
Processing: {"id": "task-1", "title": "Buy groceries"}
Processing: {"id": "task-2", "title": "Call dentist"}
Revoking: ['task-created[1]']
Pre-revoke commit successful
Assigned: ['task-created[0]', 'task-completed[0]']
Processing: {"id": "task-3", "title": "Review PR"}
^C
Shutdown. Processed 3 messages.
Reflect on Your Skill
You built a kafka-events skill in Lesson 0. Test and improve it based on what you learned.
Test Your Skill
Using my kafka-events skill, design a consumer group strategy for a notification service with 6 partitions and 3 consumer instances.
Does my skill explain partition assignment, rebalancing triggers, and how to minimize downtime during rebalancing?
Identify Gaps
Ask yourself:
- Did my skill explain how consumer groups distribute partitions across instances?
- Did it cover rebalancing triggers (consumer crash, new consumer, partition change)?
Improve Your Skill
If you found gaps:
My kafka-events skill is missing consumer group mechanics (partition assignment, rebalancing, cooperative rebalancing).
Update it to include when rebalancing occurs and how to configure consumers to minimize rebalancing impact.
Try With AI
Setup: You have a Kafka consumer that's experiencing duplicate message processing after scaling events.
Prompt 1: Analyze a rebalance scenario
I have a Kafka consumer with this configuration:
consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'order-processor',
'enable.auto.commit': True,
'auto.commit.interval.ms': 5000
})
consumer.subscribe(['orders'])
while True:
msg = consumer.poll(1.0)
if msg:
process_order(msg) # Takes 2-3 seconds
When I scale from 1 to 2 consumers, orders are processed twice.
Walk me through exactly what happens during the rebalance that causes duplicates.
What you're learning: AI will trace the timeline showing how auto-commit with a 5-second interval creates a window where messages are processed but not committed when revocation occurs.
Prompt 2: Debug consumer lag
My consumer group shows this lag:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
order-processor orders 0 45000 50000 5000
order-processor orders 1 48000 50000 2000
order-processor orders 2 42000 50000 8000
The lag on partition 2 keeps growing while partitions 0 and 1 are stable.
What are the possible causes and how do I diagnose further?
What you're learning: AI helps you think through asymmetric lag patterns—could be slow processing for certain message types, a hot partition with too much traffic, or a consumer that crashed and recovered.
Prompt 3: Design for your domain
I'm building a notification service that:
- Consumes from 'user-events' topic (10 partitions)
- Sends emails (slow: 500ms per message)
- Runs on Kubernetes with autoscaling (1-5 pods)
- Cannot send duplicate notifications
Design the consumer configuration and rebalance handling strategy.
Consider: session timeouts, static membership, commit patterns.
What you're learning: AI collaborates on applying consumer group patterns to your specific constraints, suggesting configurations that balance throughput with your no-duplicate requirement.
Safety note: When testing rebalance scenarios, use a separate consumer group from production. Joining a production group with test consumers triggers rebalances that affect real traffic.