Skip to main content
Updated Feb 23, 2026

Consumer Deep Dive: Groups and Rebalancing

You deployed a notification service consuming from task-created. It works perfectly with one consumer. Then traffic spikes during a product launch, and you scale to three consumers. Suddenly, messages are processed twice. A rebalance occurred, your consumer hadn't committed its offset, and another consumer reprocessed the same messages.

This is one of the most common Kafka production issues. Understanding consumer groups and rebalancing isn't optional—it's essential for reliable event processing. In this lesson, you'll learn why rebalances happen, how to handle them safely, and how to diagnose the consumer lag that often triggers scaling decisions.

The pattern you'll learn here—committing offsets during rebalance callbacks—prevents duplicate processing in virtually every Kafka consumer you'll ever write. Master this, and you've solved one of distributed messaging's trickiest problems.

How Consumer Groups Distribute Work

When multiple consumers share a group.id, Kafka treats them as a team working together. Instead of each consumer receiving every message (like a pub/sub topic), Kafka assigns partitions to individual consumers so each message is processed exactly once within the group.

Topic: task-created (3 partitions)
┌──────────────┬──────────────┬──────────────┐
│ Partition 0 │ Partition 1 │ Partition 2 │
└──────┬───────┴──────┬───────┴──────┬───────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────┐
│ Consumer Group: notification-svc │
├─────────────┬─────────────┬─────────────────┤
│ Consumer A │ Consumer B │ Consumer C │
│ (P0) │ (P1) │ (P2) │
└─────────────┴─────────────┴─────────────────┘

Key rules for partition assignment:

RuleImplication
One partition = one consumerA partition is never processed by multiple consumers in the same group
Consumers can handle multiple partitionsIf you have 3 partitions and 2 consumers, one consumer gets 2 partitions
More consumers than partitions = idle consumersAdding a 4th consumer to a 3-partition topic leaves one consumer doing nothing

This design ensures ordering within a partition. Messages with the same key always go to the same partition, so they're always processed by the same consumer in order.

What Triggers Rebalancing

A rebalance redistributes partitions when the group membership changes. The group coordinator (a Kafka broker) detects changes and orchestrates the redistribution.

Triggers that cause rebalancing:

TriggerExampleImpact
Consumer joinsScaling up from 2 to 3 instancesPartitions redistributed to include new consumer
Consumer leavesInstance shutdown, crash, or scaling downOrphaned partitions assigned to remaining consumers
Consumer timeoutConsumer fails to send heartbeat within session.timeout.msGroup assumes consumer is dead
Subscription changeConsumer calls subscribe() with different topicsFull group rebalance
Partition count changeAdmin adds partitions to topicNew partitions assigned to consumers

The Rebalance Problem: Lost Offsets

Here's why rebalancing causes duplicate processing:

Timeline:
1. Consumer A polls messages 100-105 from Partition 0
2. Consumer A processes messages 100-103
3. REBALANCE STARTS - Partition 0 assigned to Consumer B
4. Consumer A loses Partition 0 before committing offset 103
5. Consumer B starts from last committed offset (99)
6. Messages 100-103 processed AGAIN by Consumer B

The window between processing and committing is the danger zone. Without proper handling, any messages processed but not committed will be reprocessed.

Implementing Rebalance Callbacks

The on_revoke callback is your opportunity to commit offsets before losing partitions. The on_assign callback lets you set up state for newly assigned partitions.

from confluent_kafka import Consumer, TopicPartition

def on_assign(consumer, partitions):
"""Called when partitions are assigned to this consumer."""
partition_list = [f"{p.topic}[{p.partition}]" for p in partitions]
print(f"Assigned partitions: {partition_list}")

# Optional: Seek to specific offset or reset position
# for partition in partitions:
# consumer.seek(TopicPartition(partition.topic, partition.partition, 0))

def on_revoke(consumer, partitions):
"""Called when partitions are being revoked from this consumer.

CRITICAL: Commit offsets synchronously here. Async commit may not
complete before partition is reassigned.
"""
partition_list = [f"{p.topic}[{p.partition}]" for p in partitions]
print(f"Revoking partitions: {partition_list}")

# Synchronous commit - blocks until broker confirms
try:
consumer.commit(asynchronous=False)
print("Offsets committed successfully before revocation")
except Exception as e:
print(f"Failed to commit during revoke: {e}")

# Configure consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'notification-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Manual commit for safety
})

# Subscribe with callbacks
consumer.subscribe(
['task-created'],
on_assign=on_assign,
on_revoke=on_revoke
)

try:
while True:
msg = consumer.poll(1.0)

if msg is None:
continue

if msg.error():
print(f"Error: {msg.error()}")
continue

# Process message
print(f"Processing: {msg.value().decode()}")

# Commit after successful processing
consumer.commit(message=msg)

finally:
consumer.close()

Output (during normal operation):

Assigned partitions: ['task-created[0]', 'task-created[1]']
Processing: {"id": "task-1", "title": "Buy groceries"}
Processing: {"id": "task-2", "title": "Call dentist"}

Output (during rebalance when new consumer joins):

Revoking partitions: ['task-created[1]']
Offsets committed successfully before revocation
Assigned partitions: ['task-created[0]']

Why synchronous commit in on_revoke? Asynchronous commits may not complete before the partition is reassigned. The new consumer would start from an old offset, causing duplicates. The brief blocking during synchronous commit is worth the guarantee.

Cooperative vs Eager Rebalancing

Kafka supports two rebalancing protocols:

Eager Rebalancing (Legacy):

1. ALL consumers stop processing
2. ALL partitions revoked from everyone
3. Coordinator recalculates assignments
4. ALL partitions reassigned
5. ALL consumers resume

Duration: Several seconds of complete stoppage

Cooperative (Incremental) Rebalancing (Modern):

1. Only AFFECTED partitions identified
2. Only those partitions revoked
3. Other consumers continue processing
4. Revoked partitions reassigned
5. Minimal disruption

Duration: Milliseconds for unaffected consumers

Configure cooperative rebalancing in your consumer:

consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'notification-service',
'partition.assignment.strategy': 'cooperative-sticky', # Cooperative mode
'enable.auto.commit': False
})

Output:

The cooperative-sticky strategy minimizes partition movement. If Consumer A had partitions 0 and 1, and Consumer B joins, the coordinator might only move partition 1 to B—Consumer A keeps processing partition 0 uninterrupted.

Static Membership: Reducing Unnecessary Rebalances

Every time a consumer restarts (even briefly), it triggers a rebalance. In Kubernetes, rolling updates cause repeated rebalances as pods restart. Static membership solves this by giving each consumer a persistent identity.

import os

consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'notification-service',
'group.instance.id': f"notification-{os.environ.get('HOSTNAME', 'local')}", # Static identity
'session.timeout.ms': 45000, # Longer timeout for restarts
'enable.auto.commit': False
})

How static membership works:

Without Static MembershipWith Static Membership
Consumer restartsConsumer restarts
Immediate rebalance triggeredNo rebalance during session.timeout.ms window
Partitions redistributedConsumer rejoins with same instance ID
New assignment may differGets same partitions back

When to use static membership:

  • Kubernetes deployments with rolling updates
  • Consumers with local state (cache, in-memory aggregations)
  • High-throughput systems where rebalance pauses are costly

Understanding Consumer Lag

Consumer lag is the difference between the latest offset in a partition and the offset your consumer has processed. It's the most important metric for consumer health.

Partition 0:
Latest offset (log-end): 1000
Consumer offset: 850
LAG = 1000 - 850 = 150 messages behind

What lag tells you:

Lag LevelMeaningAction
0Caught upHealthy
Stable low numberProcessing at production rateMonitor
GrowingProducing faster than consumingScale consumers or optimize processing
Very largeSignificant backlogMay need to skip or process in batches

Checking lag with Kafka CLI:

# Inside Kafka pod or with kafka-consumer-groups.sh available
kubectl exec -it task-events-kafka-0 -n kafka -- \
/opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group notification-service

Output:

GROUP                TOPIC         PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
notification-service task-created 0 850 1000 150
notification-service task-created 1 920 920 0
notification-service task-created 2 780 800 20

This output shows:

  • Partition 0 is 150 messages behind (potential problem)
  • Partition 1 is caught up (healthy)
  • Partition 2 has minimal lag (healthy)

Diagnosing Common Consumer Lag Issues

When you see growing lag, systematically diagnose:

1. Processing too slow?

import time

# Add timing to identify slow processing
start = time.time()
process_message(msg)
duration = time.time() - start

if duration > 0.1: # > 100ms per message
print(f"Slow processing: {duration:.2f}s for {msg.key()}")

2. Polling too infrequently?

# Increase max.poll.records to process more per poll
consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'notification-service',
'max.poll.records': 500, # Default is 500, adjust based on processing speed
})

3. Not enough consumers?

If you have 3 partitions but only 1 consumer, one consumer handles all load. Scale to 3 consumers for parallel processing.

4. Rebalancing too frequently?

Frequent rebalances interrupt processing. Check for:

  • Consumers with slow heartbeats (increase session.timeout.ms)
  • Deployments causing constant restarts (use static membership)
  • Processing taking longer than max.poll.interval.ms
consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'notification-service',
'session.timeout.ms': 45000, # 45 seconds
'max.poll.interval.ms': 300000, # 5 minutes for slow processing
'heartbeat.interval.ms': 10000 # 10 seconds
})

Putting It Together: Production Consumer Pattern

Here's the complete pattern combining rebalance callbacks, cooperative rebalancing, static membership, and lag awareness:

from confluent_kafka import Consumer, KafkaError
import os
import time

class ProductionConsumer:
def __init__(self, group_id: str, topics: list[str]):
self.consumer = Consumer({
'bootstrap.servers': os.environ.get(
'KAFKA_BOOTSTRAP_SERVERS',
'localhost:30092'
),
'group.id': group_id,
'group.instance.id': f"{group_id}-{os.environ.get('HOSTNAME', 'local')}",
'partition.assignment.strategy': 'cooperative-sticky',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'session.timeout.ms': 45000,
'max.poll.interval.ms': 300000,
})

self.consumer.subscribe(
topics,
on_assign=self._on_assign,
on_revoke=self._on_revoke
)

self.running = True
self.messages_processed = 0
self.last_lag_check = time.time()

def _on_assign(self, consumer, partitions):
partition_list = [f"{p.topic}[{p.partition}]" for p in partitions]
print(f"Assigned: {partition_list}")

def _on_revoke(self, consumer, partitions):
partition_list = [f"{p.topic}[{p.partition}]" for p in partitions]
print(f"Revoking: {partition_list}")
try:
consumer.commit(asynchronous=False)
print("Pre-revoke commit successful")
except Exception as e:
print(f"Pre-revoke commit failed: {e}")

def process(self, msg):
"""Override this method with your processing logic."""
print(f"Processing: {msg.value().decode()}")
return True

def run(self):
try:
while self.running:
msg = self.consumer.poll(1.0)

if msg is None:
continue

if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
print(f"Error: {msg.error()}")
continue

start = time.time()
success = self.process(msg)
duration = time.time() - start

if duration > 0.1:
print(f"Slow processing: {duration:.2f}s")

if success:
self.consumer.commit(message=msg)
self.messages_processed += 1

finally:
self.consumer.close()
print(f"Shutdown. Processed {self.messages_processed} messages.")


# Usage
if __name__ == "__main__":
consumer = ProductionConsumer(
group_id='notification-service',
topics=['task-created', 'task-completed']
)
consumer.run()

Output:

Assigned: ['task-created[0]', 'task-created[1]', 'task-completed[0]']
Processing: {"id": "task-1", "title": "Buy groceries"}
Processing: {"id": "task-2", "title": "Call dentist"}
Revoking: ['task-created[1]']
Pre-revoke commit successful
Assigned: ['task-created[0]', 'task-completed[0]']
Processing: {"id": "task-3", "title": "Review PR"}
^C
Shutdown. Processed 3 messages.

Reflect on Your Skill

You built a kafka-events skill in Lesson 0. Test and improve it based on what you learned.

Test Your Skill

Using my kafka-events skill, design a consumer group strategy for a notification service with 6 partitions and 3 consumer instances.
Does my skill explain partition assignment, rebalancing triggers, and how to minimize downtime during rebalancing?

Identify Gaps

Ask yourself:

  • Did my skill explain how consumer groups distribute partitions across instances?
  • Did it cover rebalancing triggers (consumer crash, new consumer, partition change)?

Improve Your Skill

If you found gaps:

My kafka-events skill is missing consumer group mechanics (partition assignment, rebalancing, cooperative rebalancing).
Update it to include when rebalancing occurs and how to configure consumers to minimize rebalancing impact.

Try With AI

Setup: You have a Kafka consumer that's experiencing duplicate message processing after scaling events.

Prompt 1: Analyze a rebalance scenario

I have a Kafka consumer with this configuration:

consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'order-processor',
'enable.auto.commit': True,
'auto.commit.interval.ms': 5000
})

consumer.subscribe(['orders'])

while True:
msg = consumer.poll(1.0)
if msg:
process_order(msg) # Takes 2-3 seconds

When I scale from 1 to 2 consumers, orders are processed twice.
Walk me through exactly what happens during the rebalance that causes duplicates.

What you're learning: AI will trace the timeline showing how auto-commit with a 5-second interval creates a window where messages are processed but not committed when revocation occurs.

Prompt 2: Debug consumer lag

My consumer group shows this lag:

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
order-processor orders 0 45000 50000 5000
order-processor orders 1 48000 50000 2000
order-processor orders 2 42000 50000 8000

The lag on partition 2 keeps growing while partitions 0 and 1 are stable.
What are the possible causes and how do I diagnose further?

What you're learning: AI helps you think through asymmetric lag patterns—could be slow processing for certain message types, a hot partition with too much traffic, or a consumer that crashed and recovered.

Prompt 3: Design for your domain

I'm building a notification service that:
- Consumes from 'user-events' topic (10 partitions)
- Sends emails (slow: 500ms per message)
- Runs on Kubernetes with autoscaling (1-5 pods)
- Cannot send duplicate notifications

Design the consumer configuration and rebalance handling strategy.
Consider: session timeouts, static membership, commit patterns.

What you're learning: AI collaborates on applying consumer group patterns to your specific constraints, suggesting configurations that balance throughput with your no-duplicate requirement.

Safety note: When testing rebalance scenarios, use a separate consumer group from production. Joining a production group with test consumers triggers rebalances that affect real traffic.