Saga Pattern for Multi-Step Workflows
Your Task API now publishes events when tasks are created. The notification service sends emails. The reminder service schedules follow-ups. The audit service logs everything. Each service works independently, consuming events and doing its job.
Then a user creates a high-priority task that requires team assignment. The workflow becomes: create task, assign to user, send notification, schedule reminder. Each step depends on the previous. What happens if notification fails after the user is already assigned? You can't leave the system half-done—the user expects an assignment notification. But you also can't use a distributed transaction across independent services.
This is where the saga pattern saves you. Instead of trying to make multiple services act as one atomic transaction, you embrace eventual consistency and design explicit compensation steps that undo work when things go wrong. In this lesson, you'll implement a choreography-based saga where services coordinate through events, each knowing how to reverse its own actions.
Why Distributed Transactions Don't Work
Traditional databases give you ACID transactions: either all changes commit, or none do. When you try to extend this across multiple services, you hit fundamental problems.
The two-phase commit trap:
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Task API │ │ User Service │ │ Notification │
│ │ │ │ │ Service │
└──────┬───────┘ └──────┬────────┘ └──────┬───────┘
│ │ │
│ PREPARE │ PREPARE │ PREPARE
├────────────────────┼─────────────────────┤
│ │ │
│ READY │ READY │ TIMEOUT!
├────────────────────┼─────────────────────┤
│ │ │
│ ??? (Resources locked, waiting) │
│ │
The coordinator must lock resources across all services, wait for all to be ready, then commit. If any service is slow or fails, everything blocks. In microservices with network partitions and varying latencies, this approach creates cascading failures.
What we actually need:
| Distributed Transaction | Saga Pattern |
|---|---|
| All or nothing (atomic) | Eventually consistent |
| Lock resources during coordination | No locks, independent processing |
| Coordinator blocks until all ready | Services react asynchronously |
| Failure = abort everything | Failure = compensate completed steps |
The saga pattern trades atomicity for availability. You accept that the system may be temporarily inconsistent, but you guarantee it will reach a consistent state—either fully complete or fully rolled back.
The Saga Pattern: Sequence with Compensations
A saga is a sequence of local transactions where each step publishes an event that triggers the next. If a step fails, the saga executes compensation events in reverse order to undo completed work.
┌─────────────────────────────────────────────────────────────────────┐
│ Task Assignment Saga │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ FORWARD PATH (Happy Path): │
│ │
│ ┌─────────┐ ┌─────────────┐ ┌──────────────┐ ┌──────────┐ │
│ │ Create │───▶│ Assign User │───▶│ Send │───▶│ Schedule │ │
│ │ Task │ │ │ │ Notification │ │ Reminder │ │
│ └─────────┘ └─────────────┘ └──────────────┘ └──────────┘ │
│ │
│ COMPENSATION PATH (Failure at notification): │
│ │
│ ┌─────────┐ ┌─────────────┐ │
│ │ Mark │◀───│ Unassign │◀─── Failure detected! │
│ │ Failed │ │ User │ │
│ └─────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Key principle: Every forward action must have a corresponding compensation that can undo it. Not all actions are reversible the same way—you can't "unsend" an email—but you can send a correction or update status to reflect the failure.
Choreography vs Orchestration
There are two ways to coordinate a saga:
Choreography (decentralized): Each service knows the overall flow and reacts to events. No central coordinator.
task.created ──▶ User Service listens, assigns user, emits user.assigned
user.assigned ──▶ Notification Service listens, sends email, emits notification.sent
notification.sent ──▶ Reminder Service listens, schedules reminder
Orchestration (centralized): A saga coordinator tells each service what to do and tracks progress.
Saga Coordinator:
1. Tell User Service: "Assign user"
2. Wait for response
3. Tell Notification Service: "Send notification"
4. Wait for response
5. Tell Reminder Service: "Schedule reminder"
| Aspect | Choreography | Orchestration |
|---|---|---|
| Coordination | Implicit in event flow | Explicit coordinator |
| Service coupling | Looser—services only know events | Tighter—coordinator knows all services |
| Adding services | Add subscriber, no code changes elsewhere | Update coordinator logic |
| Debugging | Harder—trace events across services | Easier—saga state in one place |
| Failure handling | Each service handles its compensations | Coordinator manages rollback |
This lesson focuses on choreography because it's more aligned with Kafka's event-driven model and keeps services truly independent. Orchestration has its place for complex workflows with many conditional branches.
Implementing a Choreography Saga
Let's implement the task assignment saga with four services. Each service:
- Listens for relevant events
- Performs its local work
- Publishes success or failure events
Step 1: Define the Event Schema
First, establish the events that drive the saga:
# events.py - Event definitions for the saga
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import json
@dataclass
class SagaEvent:
"""Base class for saga events with correlation tracking."""
saga_id: str # Unique identifier for this saga instance
correlation_id: str # Traces the entire request
occurred_at: str # ISO timestamp
def to_json(self) -> str:
return json.dumps(self.__dict__)
@classmethod
def from_json(cls, data: str):
return cls(**json.loads(data))
@dataclass
class TaskCreated(SagaEvent):
task_id: str
title: str
assignee_id: str
priority: str
@dataclass
class UserAssigned(SagaEvent):
task_id: str
user_id: str
assignment_id: str
@dataclass
class UserAssignmentFailed(SagaEvent):
task_id: str
user_id: str
reason: str
@dataclass
class NotificationSent(SagaEvent):
task_id: str
notification_id: str
channel: str # email, slack, etc.
@dataclass
class NotificationFailed(SagaEvent):
task_id: str
reason: str
# Compensation events
@dataclass
class UserUnassigned(SagaEvent):
"""Compensation: Reverses a user assignment."""
task_id: str
user_id: str
assignment_id: str
compensation_reason: str
@dataclass
class TaskMarkedFailed(SagaEvent):
"""Compensation: Marks task creation as failed in saga."""
task_id: str
failed_step: str
reason: str
Output:
The event classes ensure every saga event carries:
saga_id: Links all events in one saga instancecorrelation_id: Traces across sagas for request debuggingoccurred_at: Timestamp for ordering and audit
Step 2: Task Service (Saga Initiator)
The task service starts the saga by creating a task and publishing the initiating event:
# task_service.py - Initiates the saga
from confluent_kafka import Producer
from datetime import datetime
import uuid
import json
class TaskService:
def __init__(self, producer: Producer):
self.producer = producer
def create_task_with_assignment(
self,
title: str,
assignee_id: str,
priority: str = "normal"
) -> str:
"""Create a task and initiate assignment saga."""
task_id = str(uuid.uuid4())
saga_id = str(uuid.uuid4())
correlation_id = str(uuid.uuid4())
# Local transaction: Create task in database
task = {
"id": task_id,
"title": title,
"assignee_id": assignee_id,
"priority": priority,
"status": "pending_assignment", # Saga in progress
"saga_id": saga_id
}
self._save_task(task)
# Publish event to start saga
event = {
"saga_id": saga_id,
"correlation_id": correlation_id,
"occurred_at": datetime.utcnow().isoformat(),
"task_id": task_id,
"title": title,
"assignee_id": assignee_id,
"priority": priority
}
self.producer.produce(
topic='task.created',
key=task_id,
value=json.dumps(event),
callback=self._delivery_report
)
self.producer.flush()
print(f"Saga {saga_id} started for task {task_id}")
return task_id
def handle_saga_failure(self, event: dict):
"""Compensation: Mark task as failed when saga cannot complete."""
task_id = event['task_id']
task = self._load_task(task_id)
task['status'] = 'assignment_failed'
task['failure_reason'] = event.get('reason', 'Unknown failure')
self._save_task(task)
print(f"Task {task_id} marked as failed: {task['failure_reason']}")
def _save_task(self, task: dict):
# In production: Save to database
print(f"Saved task: {task['id']} with status {task['status']}")
def _load_task(self, task_id: str) -> dict:
# In production: Load from database
return {"id": task_id, "status": "pending"}
def _delivery_report(self, err, msg):
if err:
print(f"Delivery failed: {err}")
else:
print(f"Event delivered to {msg.topic()}")
Output:
Saved task: abc-123 with status pending_assignment
Event delivered to task.created
Saga 7f8a9b2c started for task abc-123
Step 3: User Service (First Saga Step)
The user service assigns users when tasks are created, or publishes a failure event if assignment fails:
# user_service.py - Handles user assignment step
from confluent_kafka import Consumer, Producer
import json
import uuid
class UserAssignmentService:
def __init__(self, consumer: Consumer, producer: Producer):
self.consumer = consumer
self.producer = producer
self.consumer.subscribe(['task.created', 'notification.failed'])
def run(self):
"""Process events in the saga."""
while True:
msg = self.consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue
topic = msg.topic()
event = json.loads(msg.value().decode())
if topic == 'task.created':
self._handle_task_created(event)
elif topic == 'notification.failed':
self._handle_notification_failed(event)
self.consumer.commit(message=msg)
def _handle_task_created(self, event: dict):
"""Forward step: Assign user to task."""
task_id = event['task_id']
user_id = event['assignee_id']
saga_id = event['saga_id']
try:
# Local transaction: Create assignment
assignment_id = self._create_assignment(task_id, user_id)
# Success: Publish user.assigned
success_event = {
"saga_id": saga_id,
"correlation_id": event['correlation_id'],
"occurred_at": datetime.utcnow().isoformat(),
"task_id": task_id,
"user_id": user_id,
"assignment_id": assignment_id
}
self.producer.produce(
topic='user.assigned',
key=task_id,
value=json.dumps(success_event)
)
print(f"User {user_id} assigned to task {task_id}")
except Exception as e:
# Failure: Publish failure event
failure_event = {
"saga_id": saga_id,
"correlation_id": event['correlation_id'],
"occurred_at": datetime.utcnow().isoformat(),
"task_id": task_id,
"user_id": user_id,
"reason": str(e)
}
self.producer.produce(
topic='user.assignment.failed',
key=task_id,
value=json.dumps(failure_event)
)
print(f"Assignment failed for task {task_id}: {e}")
self.producer.poll(0)
def _handle_notification_failed(self, event: dict):
"""Compensation: Unassign user when downstream step fails."""
task_id = event['task_id']
saga_id = event['saga_id']
# Load the assignment we made
assignment = self._load_assignment_for_task(task_id)
if assignment:
# Reverse the assignment
self._delete_assignment(assignment['id'])
# Publish compensation event
compensation_event = {
"saga_id": saga_id,
"correlation_id": event['correlation_id'],
"occurred_at": datetime.utcnow().isoformat(),
"task_id": task_id,
"user_id": assignment['user_id'],
"assignment_id": assignment['id'],
"compensation_reason": f"Downstream failure: {event['reason']}"
}
self.producer.produce(
topic='user.unassigned',
key=task_id,
value=json.dumps(compensation_event)
)
print(f"Compensated: Unassigned user from task {task_id}")
self.producer.poll(0)
def _create_assignment(self, task_id: str, user_id: str) -> str:
# In production: Insert into database
assignment_id = str(uuid.uuid4())
print(f"Created assignment {assignment_id}")
return assignment_id
def _load_assignment_for_task(self, task_id: str) -> dict:
# In production: Query database
return {"id": "assign-123", "user_id": "user-456"}
def _delete_assignment(self, assignment_id: str):
# In production: Delete from database
print(f"Deleted assignment {assignment_id}")
Output (success path):
Created assignment 8f9a2b3c
User user-456 assigned to task abc-123
Event delivered to user.assigned
Output (compensation path):
Deleted assignment assign-123
Compensated: Unassigned user from task abc-123
Event delivered to user.unassigned
Step 4: Notification Service (Second Saga Step)
The notification service sends notifications and can trigger compensations:
# notification_service.py - Handles notification step
from confluent_kafka import Consumer, Producer
import json
from datetime import datetime
import uuid
class NotificationService:
def __init__(self, consumer: Consumer, producer: Producer):
self.consumer = consumer
self.producer = producer
self.consumer.subscribe(['user.assigned'])
def run(self):
while True:
msg = self.consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue
event = json.loads(msg.value().decode())
self._handle_user_assigned(event)
self.consumer.commit(message=msg)
def _handle_user_assigned(self, event: dict):
"""Forward step: Send assignment notification."""
task_id = event['task_id']
user_id = event['user_id']
saga_id = event['saga_id']
try:
# Local transaction: Send notification
notification_id = self._send_notification(user_id, task_id)
# Success: Publish notification.sent
success_event = {
"saga_id": saga_id,
"correlation_id": event['correlation_id'],
"occurred_at": datetime.utcnow().isoformat(),
"task_id": task_id,
"notification_id": notification_id,
"channel": "email"
}
self.producer.produce(
topic='notification.sent',
key=task_id,
value=json.dumps(success_event)
)
print(f"Notification sent for task {task_id}")
except Exception as e:
# Failure: Trigger compensation chain
failure_event = {
"saga_id": saga_id,
"correlation_id": event['correlation_id'],
"occurred_at": datetime.utcnow().isoformat(),
"task_id": task_id,
"reason": str(e)
}
self.producer.produce(
topic='notification.failed',
key=task_id,
value=json.dumps(failure_event)
)
print(f"Notification failed for task {task_id}: {e}")
self.producer.poll(0)
def _send_notification(self, user_id: str, task_id: str) -> str:
# In production: Call email/SMS service
# Simulate occasional failure
import random
if random.random() < 0.1: # 10% failure rate
raise Exception("Email service unavailable")
notification_id = str(uuid.uuid4())
print(f"Sent email notification {notification_id}")
return notification_id
Output (success):
Sent email notification 3c4d5e6f
Notification sent for task abc-123
Output (failure - triggers compensation):
Notification failed for task abc-123: Email service unavailable
Tracking Saga State
In choreography, each service only sees its own events. To monitor saga progress, implement a saga state tracker:
# saga_tracker.py - Monitors saga progress across all events
from confluent_kafka import Consumer
import json
from datetime import datetime, timedelta
from enum import Enum
class SagaState(Enum):
STARTED = "started"
ASSIGNED = "user_assigned"
NOTIFIED = "notification_sent"
COMPLETED = "completed"
COMPENSATING = "compensating"
FAILED = "failed"
class SagaTracker:
def __init__(self, consumer: Consumer):
self.consumer = consumer
self.sagas: dict[str, dict] = {}
# Subscribe to ALL saga-related topics
self.consumer.subscribe([
'task.created',
'user.assigned',
'user.assignment.failed',
'notification.sent',
'notification.failed',
'user.unassigned',
'task.marked.failed'
])
def run(self):
while True:
msg = self.consumer.poll(1.0)
if msg is None:
self._check_timeouts()
continue
if msg.error():
continue
event = json.loads(msg.value().decode())
topic = msg.topic()
saga_id = event.get('saga_id')
if saga_id:
self._update_saga_state(saga_id, topic, event)
self.consumer.commit(message=msg)
def _update_saga_state(self, saga_id: str, topic: str, event: dict):
if saga_id not in self.sagas:
self.sagas[saga_id] = {
'saga_id': saga_id,
'started_at': event['occurred_at'],
'state': SagaState.STARTED,
'events': []
}
saga = self.sagas[saga_id]
saga['events'].append({
'topic': topic,
'occurred_at': event['occurred_at']
})
saga['last_updated'] = event['occurred_at']
# Update state based on event type
state_transitions = {
'task.created': SagaState.STARTED,
'user.assigned': SagaState.ASSIGNED,
'notification.sent': SagaState.NOTIFIED,
'notification.failed': SagaState.COMPENSATING,
'user.assignment.failed': SagaState.COMPENSATING,
'user.unassigned': SagaState.COMPENSATING,
'task.marked.failed': SagaState.FAILED
}
if topic in state_transitions:
saga['state'] = state_transitions[topic]
print(f"Saga {saga_id[:8]}... -> {saga['state'].value}")
def _check_timeouts(self):
"""Detect sagas stuck in intermediate states."""
timeout = timedelta(minutes=5)
now = datetime.utcnow()
for saga_id, saga in self.sagas.items():
last_update = datetime.fromisoformat(saga['last_updated'])
if now - last_update > timeout:
if saga['state'] not in [SagaState.COMPLETED, SagaState.FAILED]:
print(f"ALERT: Saga {saga_id[:8]}... stuck in {saga['state'].value}")
def get_saga_status(self, saga_id: str) -> dict:
return self.sagas.get(saga_id, {"error": "Saga not found"})
Output:
Saga 7f8a9b2c... -> started
Saga 7f8a9b2c... -> user_assigned
Saga 7f8a9b2c... -> notification_sent
Output (failure scenario):
Saga 7f8a9b2c... -> started
Saga 7f8a9b2c... -> user_assigned
Saga 7f8a9b2c... -> compensating
Saga 7f8a9b2c... -> failed
Designing Compensation Events
Not all actions have symmetric rollbacks. Here's how to design compensations for common scenarios:
| Forward Action | Compensation | Notes |
|---|---|---|
| Create record | Delete record | Direct reversal |
| Reserve inventory | Release reservation | Direct reversal |
| Send email | Send correction email | Can't unsend, but can correct |
| Charge payment | Issue refund | May have fees, timing constraints |
| Update status | Revert to previous status | Need to store previous state |
Compensation design principles:
-
Idempotency: Compensations may run multiple times if failures occur during compensation. Design them to be safe to repeat.
-
Store what you need: If reverting requires previous state, store it before the forward action.
-
Accept semantic compensation: You can't truly undo "sent email"—but you can send a follow-up explaining the situation.
# Example: Compensation that handles repeated execution
def compensate_reservation(reservation_id: str, saga_id: str):
"""Release a reservation, safely handling double-execution."""
reservation = load_reservation(reservation_id)
# Already compensated? Don't release again
if reservation['status'] == 'released':
print(f"Reservation {reservation_id} already released")
return
# Haven't compensated yet? Release it
reservation['status'] = 'released'
reservation['released_at'] = datetime.utcnow().isoformat()
reservation['saga_id'] = saga_id
save_reservation(reservation)
print(f"Released reservation {reservation_id}")
Output (first call):
Released reservation rsv-789
Output (second call - idempotent):
Reservation rsv-789 already released
Handling Partial Failures
What happens if compensation itself fails? You need a fallback strategy:
class CompensationHandler:
def __init__(self, producer: Producer, max_retries: int = 3):
self.producer = producer
self.max_retries = max_retries
self.dead_letter_topic = 'saga.compensation.failed'
def compensate_with_retry(
self,
compensation_fn,
event: dict,
retry_count: int = 0
):
"""Execute compensation with retries and dead letter fallback."""
try:
compensation_fn(event)
print(f"Compensation successful for saga {event['saga_id'][:8]}...")
except Exception as e:
if retry_count < self.max_retries:
print(f"Compensation failed, retry {retry_count + 1}")
time.sleep(2 ** retry_count) # Exponential backoff
self.compensate_with_retry(
compensation_fn,
event,
retry_count + 1
)
else:
# All retries exhausted - send to dead letter for manual review
self._send_to_dead_letter(event, str(e))
def _send_to_dead_letter(self, event: dict, error: str):
"""Send failed compensation to dead letter topic for manual handling."""
dead_letter_event = {
**event,
"compensation_error": error,
"requires_manual_intervention": True
}
self.producer.produce(
topic=self.dead_letter_topic,
key=event['saga_id'],
value=json.dumps(dead_letter_event)
)
self.producer.flush()
print(f"MANUAL INTERVENTION REQUIRED: Saga {event['saga_id'][:8]}...")
Output:
Compensation failed, retry 1
Compensation failed, retry 2
Compensation failed, retry 3
MANUAL INTERVENTION REQUIRED: Saga 7f8a9b2c...
Complete Saga Flow Diagram
Here's the complete event flow for the task assignment saga:
SUCCESS PATH:
═════════════════════════════════════════════════════════════════════════
task.created
Task Service ────────────────────▶ User Service
│
│ user.assigned
▼
Notification Service
│
│ notification.sent
▼
Reminder Service
│
│ reminder.scheduled
▼
SAGA COMPLETE
FAILURE PATH (notification fails):
═════════════════════════════════════════════════════════════════════════
task.created
Task Service ────────────────────▶ User Service
│
│ user.assigned
▼
Notification Service
│
│ notification.failed
┌───────────────────────────────┘
│
▼
User Service (compensates)
│
│ user.unassigned
│
▼
Task Service (compensates)
│
│ task.marked.failed
│
▼
SAGA ROLLED BACK
Reflect on Your Skill
You built a kafka-events skill in Lesson 0. Test and improve it based on what you learned.
Test Your Skill
Using my kafka-events skill, implement the Saga pattern for an order fulfillment workflow with compensation events.
Does my skill generate the event schemas for both forward steps and compensating transactions?
Identify Gaps
Ask yourself:
- Did my skill explain choreography vs orchestration for Sagas?
- Did it show how to design compensation events and handle partial failures?
Improve Your Skill
If you found gaps:
My kafka-events skill is missing Saga pattern implementation (compensation events, choreography, orchestration).
Update it to include when to use Sagas and how to design reliable distributed workflows.
Try With AI
Setup: You're designing a saga for an order processing workflow in an e-commerce system.
Prompt 1: Design compensation events
I'm building an order saga with these steps:
1. Reserve inventory
2. Process payment
3. Ship order
4. Send confirmation
Design the compensation events for each step. Consider:
- What happens if payment fails after inventory is reserved?
- What if shipping fails after payment succeeds?
- Which compensations are direct reversals vs semantic corrections?
What you're learning: AI helps you think through asymmetric compensations—releasing inventory is straightforward, but refunding a payment has different timing and fee implications than never charging in the first place.
Prompt 2: Handle a tricky compensation scenario
In my saga, step 3 is "send welcome email to new user."
Email was sent, then step 4 (create user dashboard) failed.
I can't unsend the email. What are my options for compensation?
How do I design the saga to handle this gracefully?
What you're learning: AI collaborates on semantic compensation strategies—you can't reverse email, but you can adjust messaging, send follow-ups, or design the saga to make email the last step so failures earlier don't leave users confused.
Prompt 3: Apply to your domain
I'm building a [your system, e.g., "booking platform for fitness classes"].
The workflow is:
1. [first step, e.g., "Reserve spot in class"]
2. [second step, e.g., "Charge membership"]
3. [third step, e.g., "Send booking confirmation"]
Design the Kafka topics and event schemas for this saga.
Include compensation events and explain when each would trigger.
What you're learning: AI helps you map the saga pattern to your specific domain, identifying which steps have clean reversals and which need semantic compensation.
Safety note: When testing sagas, use isolated topic names (e.g., test.task.created) so compensation events don't interfere with production data. Always test the compensation path as thoroughly as the success path.