Skip to main content
Updated Feb 23, 2026

Saga Pattern for Multi-Step Workflows

Your Task API now publishes events when tasks are created. The notification service sends emails. The reminder service schedules follow-ups. The audit service logs everything. Each service works independently, consuming events and doing its job.

Then a user creates a high-priority task that requires team assignment. The workflow becomes: create task, assign to user, send notification, schedule reminder. Each step depends on the previous. What happens if notification fails after the user is already assigned? You can't leave the system half-done—the user expects an assignment notification. But you also can't use a distributed transaction across independent services.

This is where the saga pattern saves you. Instead of trying to make multiple services act as one atomic transaction, you embrace eventual consistency and design explicit compensation steps that undo work when things go wrong. In this lesson, you'll implement a choreography-based saga where services coordinate through events, each knowing how to reverse its own actions.

Why Distributed Transactions Don't Work

Traditional databases give you ACID transactions: either all changes commit, or none do. When you try to extend this across multiple services, you hit fundamental problems.

The two-phase commit trap:

┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│ Task API │ │ User Service │ │ Notification │
│ │ │ │ │ Service │
└──────┬───────┘ └──────┬────────┘ └──────┬───────┘
│ │ │
│ PREPARE │ PREPARE │ PREPARE
├────────────────────┼─────────────────────┤
│ │ │
│ READY │ READY │ TIMEOUT!
├────────────────────┼─────────────────────┤
│ │ │
│ ??? (Resources locked, waiting) │
│ │

The coordinator must lock resources across all services, wait for all to be ready, then commit. If any service is slow or fails, everything blocks. In microservices with network partitions and varying latencies, this approach creates cascading failures.

What we actually need:

Distributed TransactionSaga Pattern
All or nothing (atomic)Eventually consistent
Lock resources during coordinationNo locks, independent processing
Coordinator blocks until all readyServices react asynchronously
Failure = abort everythingFailure = compensate completed steps

The saga pattern trades atomicity for availability. You accept that the system may be temporarily inconsistent, but you guarantee it will reach a consistent state—either fully complete or fully rolled back.

The Saga Pattern: Sequence with Compensations

A saga is a sequence of local transactions where each step publishes an event that triggers the next. If a step fails, the saga executes compensation events in reverse order to undo completed work.

┌─────────────────────────────────────────────────────────────────────┐
│ Task Assignment Saga │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ FORWARD PATH (Happy Path): │
│ │
│ ┌─────────┐ ┌─────────────┐ ┌──────────────┐ ┌──────────┐ │
│ │ Create │───▶│ Assign User │───▶│ Send │───▶│ Schedule │ │
│ │ Task │ │ │ │ Notification │ │ Reminder │ │
│ └─────────┘ └─────────────┘ └──────────────┘ └──────────┘ │
│ │
│ COMPENSATION PATH (Failure at notification): │
│ │
│ ┌─────────┐ ┌─────────────┐ │
│ │ Mark │◀───│ Unassign │◀─── Failure detected! │
│ │ Failed │ │ User │ │
│ └─────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘

Key principle: Every forward action must have a corresponding compensation that can undo it. Not all actions are reversible the same way—you can't "unsend" an email—but you can send a correction or update status to reflect the failure.

Choreography vs Orchestration

There are two ways to coordinate a saga:

Choreography (decentralized): Each service knows the overall flow and reacts to events. No central coordinator.

task.created ──▶ User Service listens, assigns user, emits user.assigned
user.assigned ──▶ Notification Service listens, sends email, emits notification.sent
notification.sent ──▶ Reminder Service listens, schedules reminder

Orchestration (centralized): A saga coordinator tells each service what to do and tracks progress.

Saga Coordinator:
1. Tell User Service: "Assign user"
2. Wait for response
3. Tell Notification Service: "Send notification"
4. Wait for response
5. Tell Reminder Service: "Schedule reminder"
AspectChoreographyOrchestration
CoordinationImplicit in event flowExplicit coordinator
Service couplingLooser—services only know eventsTighter—coordinator knows all services
Adding servicesAdd subscriber, no code changes elsewhereUpdate coordinator logic
DebuggingHarder—trace events across servicesEasier—saga state in one place
Failure handlingEach service handles its compensationsCoordinator manages rollback

This lesson focuses on choreography because it's more aligned with Kafka's event-driven model and keeps services truly independent. Orchestration has its place for complex workflows with many conditional branches.

Implementing a Choreography Saga

Let's implement the task assignment saga with four services. Each service:

  1. Listens for relevant events
  2. Performs its local work
  3. Publishes success or failure events

Step 1: Define the Event Schema

First, establish the events that drive the saga:

# events.py - Event definitions for the saga

from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import json

@dataclass
class SagaEvent:
"""Base class for saga events with correlation tracking."""
saga_id: str # Unique identifier for this saga instance
correlation_id: str # Traces the entire request
occurred_at: str # ISO timestamp

def to_json(self) -> str:
return json.dumps(self.__dict__)

@classmethod
def from_json(cls, data: str):
return cls(**json.loads(data))


@dataclass
class TaskCreated(SagaEvent):
task_id: str
title: str
assignee_id: str
priority: str


@dataclass
class UserAssigned(SagaEvent):
task_id: str
user_id: str
assignment_id: str


@dataclass
class UserAssignmentFailed(SagaEvent):
task_id: str
user_id: str
reason: str


@dataclass
class NotificationSent(SagaEvent):
task_id: str
notification_id: str
channel: str # email, slack, etc.


@dataclass
class NotificationFailed(SagaEvent):
task_id: str
reason: str


# Compensation events
@dataclass
class UserUnassigned(SagaEvent):
"""Compensation: Reverses a user assignment."""
task_id: str
user_id: str
assignment_id: str
compensation_reason: str


@dataclass
class TaskMarkedFailed(SagaEvent):
"""Compensation: Marks task creation as failed in saga."""
task_id: str
failed_step: str
reason: str

Output:

The event classes ensure every saga event carries:

  • saga_id: Links all events in one saga instance
  • correlation_id: Traces across sagas for request debugging
  • occurred_at: Timestamp for ordering and audit

Step 2: Task Service (Saga Initiator)

The task service starts the saga by creating a task and publishing the initiating event:

# task_service.py - Initiates the saga

from confluent_kafka import Producer
from datetime import datetime
import uuid
import json

class TaskService:
def __init__(self, producer: Producer):
self.producer = producer

def create_task_with_assignment(
self,
title: str,
assignee_id: str,
priority: str = "normal"
) -> str:
"""Create a task and initiate assignment saga."""

task_id = str(uuid.uuid4())
saga_id = str(uuid.uuid4())
correlation_id = str(uuid.uuid4())

# Local transaction: Create task in database
task = {
"id": task_id,
"title": title,
"assignee_id": assignee_id,
"priority": priority,
"status": "pending_assignment", # Saga in progress
"saga_id": saga_id
}
self._save_task(task)

# Publish event to start saga
event = {
"saga_id": saga_id,
"correlation_id": correlation_id,
"occurred_at": datetime.utcnow().isoformat(),
"task_id": task_id,
"title": title,
"assignee_id": assignee_id,
"priority": priority
}

self.producer.produce(
topic='task.created',
key=task_id,
value=json.dumps(event),
callback=self._delivery_report
)
self.producer.flush()

print(f"Saga {saga_id} started for task {task_id}")
return task_id

def handle_saga_failure(self, event: dict):
"""Compensation: Mark task as failed when saga cannot complete."""
task_id = event['task_id']
task = self._load_task(task_id)

task['status'] = 'assignment_failed'
task['failure_reason'] = event.get('reason', 'Unknown failure')
self._save_task(task)

print(f"Task {task_id} marked as failed: {task['failure_reason']}")

def _save_task(self, task: dict):
# In production: Save to database
print(f"Saved task: {task['id']} with status {task['status']}")

def _load_task(self, task_id: str) -> dict:
# In production: Load from database
return {"id": task_id, "status": "pending"}

def _delivery_report(self, err, msg):
if err:
print(f"Delivery failed: {err}")
else:
print(f"Event delivered to {msg.topic()}")

Output:

Saved task: abc-123 with status pending_assignment
Event delivered to task.created
Saga 7f8a9b2c started for task abc-123

Step 3: User Service (First Saga Step)

The user service assigns users when tasks are created, or publishes a failure event if assignment fails:

# user_service.py - Handles user assignment step

from confluent_kafka import Consumer, Producer
import json
import uuid

class UserAssignmentService:
def __init__(self, consumer: Consumer, producer: Producer):
self.consumer = consumer
self.producer = producer
self.consumer.subscribe(['task.created', 'notification.failed'])

def run(self):
"""Process events in the saga."""
while True:
msg = self.consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue

topic = msg.topic()
event = json.loads(msg.value().decode())

if topic == 'task.created':
self._handle_task_created(event)
elif topic == 'notification.failed':
self._handle_notification_failed(event)

self.consumer.commit(message=msg)

def _handle_task_created(self, event: dict):
"""Forward step: Assign user to task."""
task_id = event['task_id']
user_id = event['assignee_id']
saga_id = event['saga_id']

try:
# Local transaction: Create assignment
assignment_id = self._create_assignment(task_id, user_id)

# Success: Publish user.assigned
success_event = {
"saga_id": saga_id,
"correlation_id": event['correlation_id'],
"occurred_at": datetime.utcnow().isoformat(),
"task_id": task_id,
"user_id": user_id,
"assignment_id": assignment_id
}

self.producer.produce(
topic='user.assigned',
key=task_id,
value=json.dumps(success_event)
)
print(f"User {user_id} assigned to task {task_id}")

except Exception as e:
# Failure: Publish failure event
failure_event = {
"saga_id": saga_id,
"correlation_id": event['correlation_id'],
"occurred_at": datetime.utcnow().isoformat(),
"task_id": task_id,
"user_id": user_id,
"reason": str(e)
}

self.producer.produce(
topic='user.assignment.failed',
key=task_id,
value=json.dumps(failure_event)
)
print(f"Assignment failed for task {task_id}: {e}")

self.producer.poll(0)

def _handle_notification_failed(self, event: dict):
"""Compensation: Unassign user when downstream step fails."""
task_id = event['task_id']
saga_id = event['saga_id']

# Load the assignment we made
assignment = self._load_assignment_for_task(task_id)

if assignment:
# Reverse the assignment
self._delete_assignment(assignment['id'])

# Publish compensation event
compensation_event = {
"saga_id": saga_id,
"correlation_id": event['correlation_id'],
"occurred_at": datetime.utcnow().isoformat(),
"task_id": task_id,
"user_id": assignment['user_id'],
"assignment_id": assignment['id'],
"compensation_reason": f"Downstream failure: {event['reason']}"
}

self.producer.produce(
topic='user.unassigned',
key=task_id,
value=json.dumps(compensation_event)
)
print(f"Compensated: Unassigned user from task {task_id}")

self.producer.poll(0)

def _create_assignment(self, task_id: str, user_id: str) -> str:
# In production: Insert into database
assignment_id = str(uuid.uuid4())
print(f"Created assignment {assignment_id}")
return assignment_id

def _load_assignment_for_task(self, task_id: str) -> dict:
# In production: Query database
return {"id": "assign-123", "user_id": "user-456"}

def _delete_assignment(self, assignment_id: str):
# In production: Delete from database
print(f"Deleted assignment {assignment_id}")

Output (success path):

Created assignment 8f9a2b3c
User user-456 assigned to task abc-123
Event delivered to user.assigned

Output (compensation path):

Deleted assignment assign-123
Compensated: Unassigned user from task abc-123
Event delivered to user.unassigned

Step 4: Notification Service (Second Saga Step)

The notification service sends notifications and can trigger compensations:

# notification_service.py - Handles notification step

from confluent_kafka import Consumer, Producer
import json
from datetime import datetime
import uuid

class NotificationService:
def __init__(self, consumer: Consumer, producer: Producer):
self.consumer = consumer
self.producer = producer
self.consumer.subscribe(['user.assigned'])

def run(self):
while True:
msg = self.consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue

event = json.loads(msg.value().decode())
self._handle_user_assigned(event)
self.consumer.commit(message=msg)

def _handle_user_assigned(self, event: dict):
"""Forward step: Send assignment notification."""
task_id = event['task_id']
user_id = event['user_id']
saga_id = event['saga_id']

try:
# Local transaction: Send notification
notification_id = self._send_notification(user_id, task_id)

# Success: Publish notification.sent
success_event = {
"saga_id": saga_id,
"correlation_id": event['correlation_id'],
"occurred_at": datetime.utcnow().isoformat(),
"task_id": task_id,
"notification_id": notification_id,
"channel": "email"
}

self.producer.produce(
topic='notification.sent',
key=task_id,
value=json.dumps(success_event)
)
print(f"Notification sent for task {task_id}")

except Exception as e:
# Failure: Trigger compensation chain
failure_event = {
"saga_id": saga_id,
"correlation_id": event['correlation_id'],
"occurred_at": datetime.utcnow().isoformat(),
"task_id": task_id,
"reason": str(e)
}

self.producer.produce(
topic='notification.failed',
key=task_id,
value=json.dumps(failure_event)
)
print(f"Notification failed for task {task_id}: {e}")

self.producer.poll(0)

def _send_notification(self, user_id: str, task_id: str) -> str:
# In production: Call email/SMS service
# Simulate occasional failure
import random
if random.random() < 0.1: # 10% failure rate
raise Exception("Email service unavailable")

notification_id = str(uuid.uuid4())
print(f"Sent email notification {notification_id}")
return notification_id

Output (success):

Sent email notification 3c4d5e6f
Notification sent for task abc-123

Output (failure - triggers compensation):

Notification failed for task abc-123: Email service unavailable

Tracking Saga State

In choreography, each service only sees its own events. To monitor saga progress, implement a saga state tracker:

# saga_tracker.py - Monitors saga progress across all events

from confluent_kafka import Consumer
import json
from datetime import datetime, timedelta
from enum import Enum

class SagaState(Enum):
STARTED = "started"
ASSIGNED = "user_assigned"
NOTIFIED = "notification_sent"
COMPLETED = "completed"
COMPENSATING = "compensating"
FAILED = "failed"

class SagaTracker:
def __init__(self, consumer: Consumer):
self.consumer = consumer
self.sagas: dict[str, dict] = {}

# Subscribe to ALL saga-related topics
self.consumer.subscribe([
'task.created',
'user.assigned',
'user.assignment.failed',
'notification.sent',
'notification.failed',
'user.unassigned',
'task.marked.failed'
])

def run(self):
while True:
msg = self.consumer.poll(1.0)
if msg is None:
self._check_timeouts()
continue
if msg.error():
continue

event = json.loads(msg.value().decode())
topic = msg.topic()
saga_id = event.get('saga_id')

if saga_id:
self._update_saga_state(saga_id, topic, event)

self.consumer.commit(message=msg)

def _update_saga_state(self, saga_id: str, topic: str, event: dict):
if saga_id not in self.sagas:
self.sagas[saga_id] = {
'saga_id': saga_id,
'started_at': event['occurred_at'],
'state': SagaState.STARTED,
'events': []
}

saga = self.sagas[saga_id]
saga['events'].append({
'topic': topic,
'occurred_at': event['occurred_at']
})
saga['last_updated'] = event['occurred_at']

# Update state based on event type
state_transitions = {
'task.created': SagaState.STARTED,
'user.assigned': SagaState.ASSIGNED,
'notification.sent': SagaState.NOTIFIED,
'notification.failed': SagaState.COMPENSATING,
'user.assignment.failed': SagaState.COMPENSATING,
'user.unassigned': SagaState.COMPENSATING,
'task.marked.failed': SagaState.FAILED
}

if topic in state_transitions:
saga['state'] = state_transitions[topic]
print(f"Saga {saga_id[:8]}... -> {saga['state'].value}")

def _check_timeouts(self):
"""Detect sagas stuck in intermediate states."""
timeout = timedelta(minutes=5)
now = datetime.utcnow()

for saga_id, saga in self.sagas.items():
last_update = datetime.fromisoformat(saga['last_updated'])
if now - last_update > timeout:
if saga['state'] not in [SagaState.COMPLETED, SagaState.FAILED]:
print(f"ALERT: Saga {saga_id[:8]}... stuck in {saga['state'].value}")

def get_saga_status(self, saga_id: str) -> dict:
return self.sagas.get(saga_id, {"error": "Saga not found"})

Output:

Saga 7f8a9b2c... -> started
Saga 7f8a9b2c... -> user_assigned
Saga 7f8a9b2c... -> notification_sent

Output (failure scenario):

Saga 7f8a9b2c... -> started
Saga 7f8a9b2c... -> user_assigned
Saga 7f8a9b2c... -> compensating
Saga 7f8a9b2c... -> failed

Designing Compensation Events

Not all actions have symmetric rollbacks. Here's how to design compensations for common scenarios:

Forward ActionCompensationNotes
Create recordDelete recordDirect reversal
Reserve inventoryRelease reservationDirect reversal
Send emailSend correction emailCan't unsend, but can correct
Charge paymentIssue refundMay have fees, timing constraints
Update statusRevert to previous statusNeed to store previous state

Compensation design principles:

  1. Idempotency: Compensations may run multiple times if failures occur during compensation. Design them to be safe to repeat.

  2. Store what you need: If reverting requires previous state, store it before the forward action.

  3. Accept semantic compensation: You can't truly undo "sent email"—but you can send a follow-up explaining the situation.

# Example: Compensation that handles repeated execution

def compensate_reservation(reservation_id: str, saga_id: str):
"""Release a reservation, safely handling double-execution."""

reservation = load_reservation(reservation_id)

# Already compensated? Don't release again
if reservation['status'] == 'released':
print(f"Reservation {reservation_id} already released")
return

# Haven't compensated yet? Release it
reservation['status'] = 'released'
reservation['released_at'] = datetime.utcnow().isoformat()
reservation['saga_id'] = saga_id
save_reservation(reservation)

print(f"Released reservation {reservation_id}")

Output (first call):

Released reservation rsv-789

Output (second call - idempotent):

Reservation rsv-789 already released

Handling Partial Failures

What happens if compensation itself fails? You need a fallback strategy:

class CompensationHandler:
def __init__(self, producer: Producer, max_retries: int = 3):
self.producer = producer
self.max_retries = max_retries
self.dead_letter_topic = 'saga.compensation.failed'

def compensate_with_retry(
self,
compensation_fn,
event: dict,
retry_count: int = 0
):
"""Execute compensation with retries and dead letter fallback."""

try:
compensation_fn(event)
print(f"Compensation successful for saga {event['saga_id'][:8]}...")

except Exception as e:
if retry_count < self.max_retries:
print(f"Compensation failed, retry {retry_count + 1}")
time.sleep(2 ** retry_count) # Exponential backoff
self.compensate_with_retry(
compensation_fn,
event,
retry_count + 1
)
else:
# All retries exhausted - send to dead letter for manual review
self._send_to_dead_letter(event, str(e))

def _send_to_dead_letter(self, event: dict, error: str):
"""Send failed compensation to dead letter topic for manual handling."""
dead_letter_event = {
**event,
"compensation_error": error,
"requires_manual_intervention": True
}

self.producer.produce(
topic=self.dead_letter_topic,
key=event['saga_id'],
value=json.dumps(dead_letter_event)
)
self.producer.flush()

print(f"MANUAL INTERVENTION REQUIRED: Saga {event['saga_id'][:8]}...")

Output:

Compensation failed, retry 1
Compensation failed, retry 2
Compensation failed, retry 3
MANUAL INTERVENTION REQUIRED: Saga 7f8a9b2c...

Complete Saga Flow Diagram

Here's the complete event flow for the task assignment saga:

SUCCESS PATH:
═════════════════════════════════════════════════════════════════════════

task.created
Task Service ────────────────────▶ User Service

│ user.assigned

Notification Service

│ notification.sent

Reminder Service

│ reminder.scheduled

SAGA COMPLETE


FAILURE PATH (notification fails):
═════════════════════════════════════════════════════════════════════════

task.created
Task Service ────────────────────▶ User Service

│ user.assigned

Notification Service

│ notification.failed
┌───────────────────────────────┘


User Service (compensates)

│ user.unassigned


Task Service (compensates)

│ task.marked.failed


SAGA ROLLED BACK

Reflect on Your Skill

You built a kafka-events skill in Lesson 0. Test and improve it based on what you learned.

Test Your Skill

Using my kafka-events skill, implement the Saga pattern for an order fulfillment workflow with compensation events.
Does my skill generate the event schemas for both forward steps and compensating transactions?

Identify Gaps

Ask yourself:

  • Did my skill explain choreography vs orchestration for Sagas?
  • Did it show how to design compensation events and handle partial failures?

Improve Your Skill

If you found gaps:

My kafka-events skill is missing Saga pattern implementation (compensation events, choreography, orchestration).
Update it to include when to use Sagas and how to design reliable distributed workflows.

Try With AI

Setup: You're designing a saga for an order processing workflow in an e-commerce system.

Prompt 1: Design compensation events

I'm building an order saga with these steps:
1. Reserve inventory
2. Process payment
3. Ship order
4. Send confirmation

Design the compensation events for each step. Consider:
- What happens if payment fails after inventory is reserved?
- What if shipping fails after payment succeeds?
- Which compensations are direct reversals vs semantic corrections?

What you're learning: AI helps you think through asymmetric compensations—releasing inventory is straightforward, but refunding a payment has different timing and fee implications than never charging in the first place.

Prompt 2: Handle a tricky compensation scenario

In my saga, step 3 is "send welcome email to new user."
Email was sent, then step 4 (create user dashboard) failed.

I can't unsend the email. What are my options for compensation?
How do I design the saga to handle this gracefully?

What you're learning: AI collaborates on semantic compensation strategies—you can't reverse email, but you can adjust messaging, send follow-ups, or design the saga to make email the last step so failures earlier don't leave users confused.

Prompt 3: Apply to your domain

I'm building a [your system, e.g., "booking platform for fitness classes"].

The workflow is:
1. [first step, e.g., "Reserve spot in class"]
2. [second step, e.g., "Charge membership"]
3. [third step, e.g., "Send booking confirmation"]

Design the Kafka topics and event schemas for this saga.
Include compensation events and explain when each would trigger.

What you're learning: AI helps you map the saga pattern to your specific domain, identifying which steps have clean reversals and which need semantic compensation.

Safety note: When testing sagas, use isolated topic names (e.g., test.task.created) so compensation events don't interfere with production data. Always test the compensation path as thoroughly as the success path.