Producer Deep Dive: Reliability
Your Lesson 5 producer successfully sent messages to Kafka. But successful delivery to the broker's memory is different from durable delivery to disk with replication. When your Task API creates a task and publishes a task.created event, what happens if a broker crashes before replicating the message? What if the network drops during transmission and your producer retries?
Production event-driven systems handle millions of events daily. A 0.1% message loss rate means losing 1,000 events per million. For critical workflows---payment processing, inventory updates, audit logs---that's unacceptable. This lesson covers the reliability configurations that separate prototype producers from production-grade ones.
We'll examine three critical dimensions: how many brokers must acknowledge your message (acks), how to prevent duplicate messages during retries (idempotent producer), and how to handle delivery failures gracefully. By the end, you'll configure producers that match your data's criticality.
Understanding Acknowledgment Levels
When your producer sends a message, Kafka can acknowledge it at three different points in the durability spectrum. Each level trades latency for safety.
The Three acks Settings
| Setting | Acknowledgment Point | Latency | Durability | Use Case |
|---|---|---|---|---|
acks=0 | None (fire and forget) | Lowest | None | Metrics, logs where loss is acceptable |
acks=1 | Leader only | Low | Leader crash = data loss | Analytics, non-critical events |
acks=all | All in-sync replicas | Higher | Survives broker failures | Critical business events |
How Each Level Works
acks=0 (Fire and Forget)
The producer sends the message and immediately considers it delivered. It doesn't wait for any acknowledgment:
Producer → [sends message] → Broker
← [nothing]
Producer: "Success!" (maybe)
Problem: The message might have been lost in transmission, the broker might have crashed receiving it, or the write to the partition log might have failed. You'll never know.
When it's acceptable: High-volume metrics collection where individual data points don't matter. If you're sending 10,000 sensor readings per second, losing a few readings is tolerable.
acks=1 (Leader Acknowledgment)
The producer waits for the partition leader to write the message to its local log:
Producer → [sends message] → Leader Broker
← [ack: written to leader log]
Producer: "Success!" (leader has it)
Problem: If the leader crashes before replicating to followers, the message is lost. A new leader is elected from followers, but they don't have your message.
When it's acceptable: Analytics events, user activity tracking, recommendation system inputs---events where occasional loss doesn't break business logic.
acks=all (Full ISR Acknowledgment)
The producer waits for all in-sync replicas (ISR) to acknowledge the write:
Producer → [sends message] → Leader Broker
→ [replicates to Follower 1]
→ [replicates to Follower 2]
← [ack: all ISR replicas have it]
Producer: "Success!" (survives broker failure)
Why it matters: Even if the leader crashes immediately after acknowledgment, followers have the message. The new leader (promoted from followers) won't lose data.
When to use: Payment events, order creation, inventory adjustments, audit logs---any event where loss causes business problems.
Configuring acks in Python
from confluent_kafka import Producer
# Critical business events: acks=all
critical_producer = Producer({
'bootstrap.servers': 'localhost:30092',
'client.id': 'task-api-critical',
'acks': 'all' # Wait for all ISR replicas
})
# Analytics events: acks=1
analytics_producer = Producer({
'bootstrap.servers': 'localhost:30092',
'client.id': 'task-api-analytics',
'acks': '1' # Wait for leader only
})
# Metrics (loss acceptable): acks=0
metrics_producer = Producer({
'bootstrap.servers': 'localhost:30092',
'client.id': 'task-api-metrics',
'acks': '0' # Fire and forget
})
Output:
When you produce with acks=all, the delivery callback reports the full acknowledgment:
>>> critical_producer.produce('task-created', key='task-123', value='{"id": "task-123"}', callback=delivery_report)
>>> critical_producer.flush()
Delivered to task-created [0] @ 42
The offset (@ 42) confirms the message was durably written and replicated.
The Latency-Durability Trade-off
Here's what the acks settings mean in practice:
| acks | Typical Latency | Messages Lost on Leader Crash | Messages Lost on Network Error |
|---|---|---|---|
0 | ~1ms | All in flight | All in flight |
1 | ~5-10ms | Unreplicated ones | None (with retries) |
all | ~10-30ms | None | None (with retries) |
The latency difference between acks=1 and acks=all is typically 5-20ms---the time to replicate to followers. For most applications, this is negligible. The durability difference, however, is significant.
Decision Framework:
Ask yourself: "If this message is lost, what breaks?"
- Nothing breaks (metrics, logs):
acks=0oracks=1 - User experience degrades (recommendations, analytics):
acks=1 - Business logic fails (orders, payments, audit):
acks=all
Preventing Duplicates with Idempotent Producer
Even with acks=all, network failures can cause duplicates. Here's the scenario:
1. Producer sends message to broker
2. Broker writes message, starts replication
3. Network drops before ack reaches producer
4. Producer retries (thinks send failed)
5. Broker receives retry, writes again
6. Two copies of the same message in the partition
The Idempotent Producer Solution
Kafka's idempotent producer prevents this by assigning each message a sequence number. The broker detects and deduplicates retries:
from confluent_kafka import Producer
producer = Producer({
'bootstrap.servers': 'localhost:30092',
'client.id': 'task-api',
'acks': 'all',
'enable.idempotence': True, # Prevent duplicates on retry
'max.in.flight.requests.per.connection': 5, # Required: <= 5 for idempotence
'retries': 2147483647 # Retry until delivery.timeout.ms
})
Output:
>>> producer.produce('task-created', key='task-123', value='{"id": "task-123"}', callback=delivery_report)
>>> producer.flush()
Delivered to task-created [0] @ 43
Even if the producer retried internally due to network issues, the partition contains exactly one copy of the message.
How Idempotence Works
Under the hood, Kafka maintains a <ProducerID, SequenceNumber> for each producer:
Producer (ID: 42)
├── Partition 0: Last sequence = 157
├── Partition 1: Last sequence = 89
└── Partition 2: Last sequence = 203
When producer sends:
- Message with sequence 158 to Partition 0 → Accepted
- Message with sequence 157 to Partition 0 → Deduplicated (already seen)
- Message with sequence 160 to Partition 0 → Error (gap in sequence)
Idempotent Producer Requirements
Idempotence has constraints:
| Setting | Required Value | Why |
|---|---|---|
acks | all | Must ensure leader + replicas agree on sequence |
max.in.flight.requests.per.connection | <= 5 | More concurrent requests can cause reordering |
retries | > 0 | Must retry on transient failures |
Configuration that violates requirements:
# WRONG: max.in.flight > 5 disables idempotence
producer = Producer({
'bootstrap.servers': 'localhost:30092',
'enable.idempotence': True,
'max.in.flight.requests.per.connection': 10 # ERROR!
})
Output:
KafkaException: Idempotent producer requires max.in.flight.requests.per.connection <= 5
Handling Delivery Failures
Even with acks=all and idempotence, messages can fail to deliver. Your producer must handle these failures gracefully.
The Delivery Callback Pattern
Every produce() call should include a delivery callback:
from confluent_kafka import Producer, KafkaError
def delivery_report(err, msg):
"""Handle delivery result for each message."""
if err is not None:
# Delivery failed
if err.retriable():
# Transient error (network, broker restart)
print(f'Retriable error for {msg.key()}: {err}')
# In production: requeue or log for retry
else:
# Fatal error (authorization, invalid topic)
print(f'Fatal error for {msg.key()}: {err}')
# In production: alert, write to dead letter queue
else:
# Success
print(f'Delivered {msg.key()} to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
producer = Producer({
'bootstrap.servers': 'localhost:30092',
'acks': 'all',
'enable.idempotence': True
})
producer.produce(
topic='task-created',
key='task-123',
value='{"id": "task-123", "title": "Buy groceries"}',
callback=delivery_report
)
# Process callbacks
producer.poll(0)
# Ensure all messages delivered before shutdown
producer.flush()
Output (success):
Delivered task-123 to task-created [0] @ 44
Output (network failure):
Retriable error for task-123: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Common Error Types and Responses
| Error | Retriable? | Cause | Response |
|---|---|---|---|
_MSG_TIMED_OUT | Yes | Network, slow broker | Retry automatically |
_NOT_ENOUGH_REPLICAS | Yes | ISR below min.insync.replicas | Wait, broker will recover |
_TOPIC_AUTHORIZATION_FAILED | No | Missing ACL permissions | Fix configuration |
_UNKNOWN_TOPIC_OR_PARTITION | No | Topic doesn't exist | Create topic first |
_MSG_SIZE_TOO_LARGE | No | Message exceeds max.message.bytes | Split message or increase limit |
Configuring Retry Behavior
The producer's retry behavior is controlled by several settings:
producer = Producer({
'bootstrap.servers': 'localhost:30092',
'acks': 'all',
'enable.idempotence': True,
# Retry configuration
'retries': 2147483647, # Retry indefinitely
'retry.backoff.ms': 100, # Wait 100ms between retries
'delivery.timeout.ms': 120000, # Total time to try delivery (2 minutes)
# Request timeout (per attempt)
'request.timeout.ms': 30000 # 30 seconds per request
})
How timeouts interact:
Total delivery window: delivery.timeout.ms (120s)
├── Attempt 1: request.timeout.ms (30s) → fails
├── Wait: retry.backoff.ms (100ms)
├── Attempt 2: request.timeout.ms (30s) → fails
├── Wait: retry.backoff.ms (100ms)
├── ... continues until delivery.timeout.ms expires
└── Final: callback with _MSG_TIMED_OUT error
Dead Letter Queue Pattern
For messages that fail permanently, implement a dead letter queue (DLQ):
from confluent_kafka import Producer
import json
class ReliableProducer:
def __init__(self, config):
self.producer = Producer(config)
self.dlq_producer = Producer(config) # Same config for DLQ
def delivery_report(self, err, msg):
if err is not None:
if not err.retriable():
# Send to dead letter queue
dlq_message = {
'original_topic': msg.topic(),
'original_key': msg.key().decode() if msg.key() else None,
'original_value': msg.value().decode() if msg.value() else None,
'error': str(err),
'error_code': err.code()
}
self.dlq_producer.produce(
topic='dead-letter-queue',
key=msg.key(),
value=json.dumps(dlq_message)
)
print(f'Sent to DLQ: {msg.key()}')
else:
print(f'Delivered: {msg.key()} @ {msg.offset()}')
def send(self, topic, key, value):
self.producer.produce(
topic=topic,
key=key,
value=value,
callback=self.delivery_report
)
self.producer.poll(0)
def flush(self):
self.producer.flush()
self.dlq_producer.flush()
Output (authorization failure):
Sent to DLQ: task-456
The DLQ message contains all information needed to investigate and replay the failed event.
Putting It Together: Production Configuration
Here's the complete production-ready producer configuration:
from confluent_kafka import Producer
import json
def create_production_producer(bootstrap_servers: str, client_id: str) -> Producer:
"""Create a production-ready Kafka producer with reliability guarantees."""
return Producer({
# Connection
'bootstrap.servers': bootstrap_servers,
'client.id': client_id,
# Durability: Wait for all ISR replicas
'acks': 'all',
# Idempotence: Prevent duplicates on retry
'enable.idempotence': True,
'max.in.flight.requests.per.connection': 5,
# Retry behavior
'retries': 2147483647, # Retry until timeout
'retry.backoff.ms': 100,
'delivery.timeout.ms': 120000, # 2 minutes total
'request.timeout.ms': 30000, # 30 seconds per attempt
# Performance (optional tuning)
'linger.ms': 5, # Batch for 5ms
'batch.size': 16384, # 16KB batches
'compression.type': 'lz4' # Compress messages
})
# Usage
producer = create_production_producer(
bootstrap_servers='localhost:30092',
client_id='task-api'
)
def delivery_callback(err, msg):
if err:
print(f'DELIVERY FAILED: {err}')
else:
print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
# Send critical business event
task_event = {
'event_type': 'task.created',
'task_id': 'task-789',
'title': 'Review quarterly report',
'created_at': '2025-01-15T10:30:00Z'
}
producer.produce(
topic='task-events',
key='task-789',
value=json.dumps(task_event),
callback=delivery_callback
)
producer.flush()
Output:
Delivered to task-events [2] @ 156
Refining Configuration Through Collaboration
You've configured a reliable producer, but is it right for your specific use case? Let's explore how to refine the configuration.
Your initial request:
"I need to configure Kafka for my Task API. Tasks must never be lost."
Exploring requirements:
Before settling on configuration, consider these questions:
- What's your acceptable message latency? (sub-100ms? sub-second?)
- How many brokers do you have? (affects replication options)
- What's your peak message rate? (affects batching strategy)
- Can your consumers handle duplicate messages? (affects exactly-once need)
Evaluating the initial configuration:
The configuration we built uses acks=all with idempotent producer. This provides:
- Durability: Messages survive broker failures
- No duplicates: Retries don't create copies
- Trade-off: Higher latency (~20-30ms per message)
Questioning the approach:
For a Task API handling hundreds of events per second, consider:
- Is the latency acceptable for your users?
- Do you need exactly-once semantics, or is at-least-once sufficient if consumers are idempotent?
- Should you use transactions for atomic multi-topic writes?
Refining based on production context:
If your notification consumer can handle duplicates (checking task_id before sending), you might simplify:
# At-least-once with idempotent consumers (simpler)
producer = Producer({
'bootstrap.servers': 'localhost:30092',
'acks': 'all',
'enable.idempotence': True,
'delivery.timeout.ms': 30000 # Shorter timeout, faster failure
})
If you need atomic writes across multiple topics (task-created, audit-log), you need transactions (covered in Lesson 12).
What emerged from this exploration:
- Production configuration depends on specific requirements
acks=all+ idempotence is the baseline for critical data- Consumer idempotency can simplify producer complexity
- Transactions add complexity but enable atomic multi-topic writes
Reflect on Your Skill
You built a kafka-events skill in Lesson 0. Test and improve it based on what you learned.
Test Your Skill
Using my kafka-events skill, configure a producer for maximum reliability (no message loss allowed).
Does my skill set acks=all, enable.idempotence=true, and appropriate retry configuration?
Identify Gaps
Ask yourself:
- Did my skill explain the relationship between acks, retries, and idempotence?
- Did it cover the trade-offs between reliability and throughput?
Improve Your Skill
If you found gaps:
My kafka-events skill is missing producer reliability configuration (acks, idempotence, retries).
Update it to include when to use acks=all vs acks=1 and how idempotence prevents duplicates.
Try With AI
Apply what you've learned by configuring producers for different scenarios.
Setup: Open Claude Code or your preferred AI assistant in your Kafka project directory.
Prompt 1: Analyze Your Use Case
I'm building a Task API with these requirements:
- Task creation events: must never be lost, but 50ms latency is acceptable
- Task analytics events: loss is acceptable for up to 0.1% of events
- Real-time activity metrics: high volume (1000/sec), individual loss is fine
For each event type, recommend:
1. acks setting
2. Whether to use idempotent producer
3. Retry timeout configuration
4. Justification for the choices
Show me the Python configuration for each.
What you're learning: Matching reliability configuration to business requirements. Different event types within the same application may need different producers.
Prompt 2: Debug a Delivery Failure
My Kafka producer is failing with this error:
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
My producer config:
- acks=all
- min.insync.replicas=2 (topic config)
- replication.factor=3
What's happening? How do I diagnose this? What are my options to resolve it?
What you're learning: Understanding the relationship between producer acks, topic replication, and ISR. This error is common in production when brokers are unhealthy.
Prompt 3: Design Error Handling Strategy
Review this delivery callback and improve it:
def delivery_report(err, msg):
if err:
print(f"Error: {err}")
else:
print(f"Delivered: {msg.offset()}")
I need:
1. Distinguish retriable vs fatal errors
2. Log with proper context for debugging
3. Dead letter queue for fatal errors
4. Metrics for monitoring delivery success rate
Show me the improved implementation with Python type hints.
What you're learning: Production error handling requires more than printing errors. You need observability, retry logic, and fallback mechanisms.
Safety Note: Always test producer configurations in a development environment before production. Incorrect timeout settings can cause message backlogs or premature failures during normal broker maintenance.