Transactions for Stream Processing
Your Task API publishes events to Kafka reliably---acks=all ensures durability, idempotent producers prevent duplicates on retry. But what happens when a single operation must update multiple topics atomically? When a task is created, you might need to write to task-events, audit-log, and notification-queue as a single unit of work. If the producer crashes after writing to task-events but before audit-log, you have an inconsistent state that no amount of retries can fix.
This is the exactly-once challenge for stream processing. Consider a payment processor that consumes from payment-requests, processes the payment, and produces to both payment-completed and ledger-updates. If it crashes after producing to payment-completed but before ledger-updates, restarting will re-process the same payment---potentially charging a customer twice or leaving the ledger inconsistent.
Kafka transactions solve this by making the read-process-write cycle atomic. Either all outputs are committed together, or none are. This lesson covers the transactional producer lifecycle, consumer isolation levels, and the zombie fencing mechanism that prevents duplicate processing from crashed producers.
The Problem: Partial Writes Break Consistency
Without Transactions: The Crash Window
Consider this stream processing pattern:
1. Consumer reads message from input-topic
2. Processor transforms message
3. Producer writes to output-topic-1
4. Producer writes to output-topic-2
5. Consumer commits offset
The crash window exists between steps 3 and 5. If the processor crashes after step 3:
┌─────────────────────────────────────────────────────────────────┐
│ Timeline of Partial Failure │
├─────────────────────────────────────────────────────────────────┤
│ │
│ READ PROCESS WRITE-1 CRASH! (never happens) │
│ ──────────────────────────────────────X─────────────────────────│
│ payment- transform payment- │ ledger-updates │
│ requests message completed │ offset commit │
│ ✓ │ │
│ │ │
│ On Restart: │ │
│ - offset not committed │ │
│ - payment-completed has message │ │
│ - ledger-updates missing message │ │
│ - re-processing creates DUPLICATE │ │
│ │
└─────────────────────────────────────────────────────────────────┘
The result: payment-completed has the message, ledger-updates doesn't, and on restart the processor will re-read and re-process---creating duplicates in payment-completed.
With Transactions: All-or-Nothing
Transactions wrap the entire read-process-write cycle:
┌─────────────────────────────────────────────────────────────────┐
│ Transaction Lifecycle │
├─────────────────────────────────────────────────────────────────┤
│ │
│ BEGIN_TXN READ PROCESS WRITE-1 WRITE-2 COMMIT_TXN │
│ ─────────────────────────────────────────────────────────────── │
│ [─────────────────── TRANSACTION ──────────────────────────] │
│ │
│ If crash before COMMIT: │
│ - All writes are ABORTED │
│ - Offset not committed │
│ - On restart: clean re-processing │
│ │
│ If COMMIT succeeds: │
│ - All writes are VISIBLE atomically │
│ - Offset committed within transaction │
│ - No re-processing needed │
│ │
└─────────────────────────────────────────────────────────────────┘
The Transactional Producer Lifecycle
Step 1: Configure with transactional.id
Every transactional producer needs a unique, stable identifier:
from confluent_kafka import Producer
producer = Producer({
'bootstrap.servers': 'localhost:30092',
'transactional.id': 'task-processor-1', # REQUIRED: unique and stable
'enable.idempotence': True # Automatically enabled with transactional.id
})
Output:
>>> producer = Producer({...})
# Producer created, but transactions not yet initialized
Critical: The transactional.id must be:
- Unique per producer instance: Two producers with same ID cause fencing
- Stable across restarts: Same ID allows recovery of in-flight transactions
- Descriptive: Include service name and partition/shard for debugging
Step 2: Initialize Transactions
Before any transactional operations, call init_transactions() exactly once:
# Initialize transactions (one-time setup per producer instance)
producer.init_transactions()
Output:
>>> producer.init_transactions()
# Registers producer with transaction coordinator
# Aborts any pending transactions from previous instance with same transactional.id
What happens during init:
- Producer contacts transaction coordinator (a Kafka broker)
- Coordinator assigns a producer epoch (version number)
- Any pending transactions from previous epoch are aborted
- Producer is ready for transactional operations
Step 3: Transaction Lifecycle
Each transaction follows a strict lifecycle:
from confluent_kafka import Producer, KafkaError
producer = Producer({
'bootstrap.servers': 'localhost:30092',
'transactional.id': 'task-processor-1'
})
producer.init_transactions()
try:
# Begin transaction
producer.begin_transaction()
# All produce calls are part of this transaction
producer.produce('task-events', key='task-123', value='{"event": "created"}')
producer.produce('audit-log', key='task-123', value='{"action": "task_created"}')
producer.produce('notification-queue', key='user-456', value='{"notify": "task_assigned"}')
# Commit makes all writes visible atomically
producer.commit_transaction()
print("Transaction committed successfully")
except KafkaError as e:
# Abort on any error - all writes are discarded
producer.abort_transaction()
print(f"Transaction aborted: {e}")
raise
Output (success):
>>> producer.begin_transaction()
>>> producer.produce('task-events', ...)
>>> producer.produce('audit-log', ...)
>>> producer.produce('notification-queue', ...)
>>> producer.commit_transaction()
Transaction committed successfully
Output (failure):
>>> producer.begin_transaction()
>>> producer.produce('task-events', ...)
>>> # Network error occurs
>>> producer.abort_transaction()
Transaction aborted: KafkaError{code=_TIMED_OUT,...}
The Transaction State Machine
┌─────────────────────────────────────────────────────────────────┐
│ Transaction State Machine │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ begin_transaction() ┌─────────────────┐ │
│ │ READY │ ───────────────────────────→│ IN_TRANSACTION │ │
│ └──────────┘ └─────────────────┘ │
│ ↑ │ │ │
│ │ │ │ │
│ │ commit_transaction() │ │ │
│ │ ┌──────────────────────────────────┘ │ │
│ │ │ │ │
│ │ │ abort_transaction() │ │
│ │ │ ┌────────────────────────────────────────┘ │
│ │ ↓ ↓ │
│ │ ┌──────────┐ │
│ └──│ COMPLETE │ │
│ └──────────┘ │
│ │
│ States: │
│ - READY: Can begin new transaction │
│ - IN_TRANSACTION: produce() calls are transactional │
│ - COMPLETE: Transaction finished, can begin new one │
│ │
└─────────────────────────────────────────────────────────────────┘
Zombie Fencing: Preventing Duplicates from Crashed Producers
The Zombie Producer Problem
What happens when a transactional producer crashes and restarts?
Timeline:
1. Producer-A (epoch 1) begins transaction
2. Producer-A produces to topic-1
3. Producer-A crashes (network partition or process failure)
4. Producer-A restarts, calls init_transactions() (epoch 2)
5. Original Producer-A (epoch 1) recovers, tries to commit
Without fencing: Both producers write, creating duplicates
With fencing: Epoch 1 is "fenced" - cannot commit
How Epoch-Based Fencing Works
The transactional.id maps to a monotonically increasing epoch:
┌─────────────────────────────────────────────────────────────────┐
│ Zombie Fencing with Epochs │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Transaction Coordinator State: │
│ transactional.id = "task-processor-1" │
│ current_epoch = 42 │
│ │
│ ┌────────────────┐ ┌────────────────┐ │
│ │ Producer-A │ │ Producer-A │ │
│ │ (crashed) │ │ (restarted) │ │
│ │ epoch = 41 │ │ epoch = 42 │ │
│ └────────────────┘ └────────────────┘ │
│ │ │ │
│ │ commit_transaction() │ produce() │
│ ↓ ↓ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Transaction Coordinator │ │
│ │ epoch 41 < current_epoch 42 │ │
│ │ → REJECT: ProducerFencedException │ │
│ │ │ │
│ │ epoch 42 = current_epoch 42 │ │
│ │ → ACCEPT: Valid producer │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Fencing in Practice
When the old producer tries to commit:
# Producer-A (epoch 41) - the "zombie"
try:
producer_old.commit_transaction()
except KafkaError as e:
print(f"Fenced: {e}")
# KafkaError{code=PRODUCER_FENCED, ...}
Output:
Fenced: KafkaError{code=PRODUCER_FENCED,val=90,str="Producer has been fenced by a newer producer instance"}
The zombie producer receives PRODUCER_FENCED and must shut down. Only the producer with the current epoch can commit transactions.
Consumer Isolation: read_committed
The Problem with Default Isolation
By default, consumers see messages as soon as they're written---even if the transaction hasn't committed yet:
# Default consumer sees uncommitted messages
consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'notification-service',
# isolation.level defaults to 'read_uncommitted'
})
This consumer will see messages from in-progress transactions that might be aborted. If it processes and acts on an aborted message, data consistency is broken.
Configuring read_committed
To see only committed messages, set isolation.level:
from confluent_kafka import Consumer, KafkaError
consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'notification-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'isolation.level': 'read_committed' # Only see committed transactions
})
consumer.subscribe(['task-events'])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise Exception(msg.error())
# This message is guaranteed to be from a committed transaction
print(f"Processing committed message: {msg.value().decode()}")
consumer.commit(message=msg)
Output:
# Producer begins transaction, writes message
# Consumer with read_committed: (waiting...)
# Producer commits transaction
Processing committed message: {"event": "task_created", "task_id": "123"}
Isolation Level Comparison
| Isolation Level | Sees Uncommitted | Sees Aborted | Latency | Use Case |
|---|---|---|---|---|
read_uncommitted | Yes | Yes (briefly) | Lowest | Monitoring, metrics |
read_committed | No | No | Higher | Business logic, exactly-once |
The latency trade-off: With read_committed, consumers must wait for transactions to complete. If a producer has a long-running transaction, consumers see a delay.
Complete Example: Atomic Order Processing
Here's a complete stream processor that consumes orders, processes them, and atomically writes to multiple output topics:
from confluent_kafka import Producer, Consumer, KafkaError
import json
class AtomicOrderProcessor:
"""Process orders with exactly-once semantics across multiple topics."""
def __init__(self, processor_id: str, bootstrap_servers: str):
self.producer = Producer({
'bootstrap.servers': bootstrap_servers,
'transactional.id': f'order-processor-{processor_id}',
'enable.idempotence': True
})
self.consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'group.id': 'order-processors',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'isolation.level': 'read_committed'
})
# Initialize transactions once
self.producer.init_transactions()
self.consumer.subscribe(['incoming-orders'])
def process_orders(self):
"""Main processing loop with transactional semantics."""
while True:
msg = self.consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise Exception(msg.error())
try:
# Process within transaction
self._process_order_transactionally(msg)
except Exception as e:
print(f"Processing failed: {e}")
# Transaction already aborted in _process_order_transactionally
def _process_order_transactionally(self, msg):
"""Process single order atomically."""
try:
self.producer.begin_transaction()
# Parse order
order = json.loads(msg.value().decode())
order_id = order['order_id']
amount = order['amount']
# Business logic: validate, calculate, etc.
processed_order = {
'order_id': order_id,
'status': 'processed',
'amount': amount,
'tax': amount * 0.08
}
# Atomic writes to multiple topics
self.producer.produce(
'processed-orders',
key=order_id,
value=json.dumps(processed_order)
)
self.producer.produce(
'audit-log',
key=order_id,
value=json.dumps({
'action': 'order_processed',
'order_id': order_id,
'timestamp': '2025-01-15T10:30:00Z'
})
)
self.producer.produce(
'ledger-updates',
key=order_id,
value=json.dumps({
'order_id': order_id,
'debit': amount,
'credit': 0
})
)
# Commit transaction (makes all writes visible)
self.producer.commit_transaction()
print(f"Order {order_id} processed successfully")
except KafkaError as e:
self.producer.abort_transaction()
print(f"Transaction aborted for order: {e}")
raise
def close(self):
"""Clean shutdown."""
self.consumer.close()
# Usage
if __name__ == '__main__':
processor = AtomicOrderProcessor(
processor_id='1',
bootstrap_servers='localhost:30092'
)
try:
processor.process_orders()
finally:
processor.close()
Output (successful processing):
Order order-123 processed successfully
Order order-124 processed successfully
Order order-125 processed successfully
Output (transaction aborted):
Transaction aborted for order: KafkaError{code=_TIMED_OUT,...}
# No partial writes - all three topics remain consistent
Transaction Performance Considerations
Latency Impact
Transactions add latency at two points:
| Operation | Additional Latency | Why |
|---|---|---|
begin_transaction() | ~1-5ms | Coordinator round-trip |
commit_transaction() | ~10-50ms | Write transaction markers to all partitions |
For high-throughput systems, batch multiple operations within a single transaction:
# GOOD: Batch operations in one transaction
producer.begin_transaction()
for order in orders_batch: # Process 100 orders
producer.produce('processed-orders', ...)
producer.commit_transaction()
# One commit overhead for 100 messages
# AVOID: Transaction per message
for order in orders:
producer.begin_transaction()
producer.produce('processed-orders', ...)
producer.commit_transaction() # Commit overhead per message
Transaction Timeout
Transactions have a timeout (default 60 seconds). Long-running transactions risk timeout failures:
producer = Producer({
'bootstrap.servers': 'localhost:30092',
'transactional.id': 'processor-1',
'transaction.timeout.ms': 60000 # 60 seconds default
})
If a transaction takes longer than transaction.timeout.ms, the coordinator aborts it automatically.
Exploring Transaction Design Through Collaboration
You've implemented transactional producers, but choosing when to use them requires careful analysis.
Your scenario:
Your Task API creates tasks and must notify three downstream systems:
- Email service (notification-queue)
- Audit system (audit-log)
- Analytics pipeline (analytics-events)
Evaluating the trade-offs:
Consider these questions before choosing transactions:
-
Must all writes succeed together?
- If email fails but audit succeeds, is that acceptable?
- If analytics misses an event, does that break anything?
-
What's your latency budget?
- Transactions add 10-50ms per commit
- Can your Task API wait that long?
-
Are your consumers idempotent?
- If analytics can deduplicate by task_id, you might not need exactly-once
Questioning the approach:
For the Task API scenario, consider:
- Audit-log: Probably needs transactions (compliance requirement)
- Analytics: Might tolerate at-least-once with deduplication
- Notifications: Duplicate emails are worse than missing ones
What emerged from this analysis:
You might use transactions for task-events + audit-log only, and publish to analytics-events separately with idempotent producer. This reduces transaction scope while maintaining critical consistency.
The decision isn't "use transactions everywhere" but rather "use transactions where atomicity is required, and simpler patterns elsewhere."
Reflect on Your Skill
You built a kafka-events skill in Lesson 0. Test and improve it based on what you learned.
Test Your Skill
Using my kafka-events skill, implement a stream processing pipeline that reads from one topic, transforms data, and writes to another topic atomically.
Does my skill show proper transactional producer/consumer configuration and read_committed isolation?
Identify Gaps
Ask yourself:
- Did my skill explain transactional.id and how it enables exactly-once stream processing?
- Did it show the relationship between transactions and consumer isolation levels?
Improve Your Skill
If you found gaps:
My kafka-events skill is missing transactional patterns (transactional.id, read_committed, atomic read-process-write).
Update it to include when to use Kafka transactions and how they differ from database transactions.
Try With AI
Apply what you've learned by designing transactional systems for real scenarios.
Setup: Open Claude Code or your preferred AI assistant in your Kafka project directory.
Prompt 1: Design Transaction Boundaries
I'm building an e-commerce order processing system. When an order is placed:
1. Deduct inventory from stock-levels topic
2. Create payment record in payments topic
3. Add to shipping-queue topic
4. Update customer-orders topic
Help me decide:
- Should all four writes be in one transaction?
- Or should I group them differently?
- What if payment processing takes 2-3 seconds?
Explain the trade-offs and show me the transaction structure you recommend.
What you're learning: Transaction boundary design---grouping related writes while avoiding performance bottlenecks from overly broad transactions.
Prompt 2: Debug a Zombie Fencing Issue
My stream processor keeps getting ProducerFencedException:
KafkaError{code=PRODUCER_FENCED,val=90,str="Producer has been fenced..."}
My setup:
- 3 Kubernetes pods running the same processor
- Each pod uses transactional.id = "order-processor"
- Pods restart frequently due to memory limits
What's wrong? How should I configure transactional.id for horizontally-scaled processors?
What you're learning: Transactional.id uniqueness requirements in distributed systems. Each instance needs a unique ID, typically combining service name with partition assignment.
Prompt 3: Implement Consume-Transform-Produce
I need to implement exactly-once stream processing:
- Consume from: raw-events (5 partitions)
- Transform: enrich with metadata
- Produce to: enriched-events AND audit-log
Show me the code pattern that:
1. Consumes and processes in a transaction
2. Commits consumer offsets within the same transaction
3. Handles partition rebalancing correctly
Use confluent-kafka-python with proper error handling.
What you're learning: The full exactly-once pattern including send_offsets_to_transaction() for atomic offset commits---the most complex but most reliable stream processing pattern.
Safety Note: Transactional producers require proper cleanup. Always call abort_transaction() on errors and ensure transactions complete within timeout limits. Orphaned transactions can block consumers with read_committed isolation.