Workflow Patterns: Saga & Monitor
Your task processing system handles thousands of operations daily. Most complete successfully. But what happens when step 3 of a 5-step workflow fails? Do you leave step 1 and 2 in an inconsistent state? What about a health monitoring job that needs to run forever, checking service status every 5 minutes? Can a workflow really run for months without running out of memory?
These are the problems that saga and monitor patterns solve. The saga pattern ensures transactional consistency across distributed operations without traditional database transactions. The monitor pattern creates eternal workflows that can run indefinitely without accumulating unbounded history. Together with human interaction patterns, they handle the complex, long-running scenarios that real agent systems encounter.
Distributed systems don't have traditional ACID transactions. You can't wrap a Redis write, a Kafka publish, and an external API call in a single BEGIN TRANSACTION / COMMIT. The saga pattern provides an alternative: track what you've done, and if something fails, undo it step by step. The monitor pattern addresses a different challenge: workflows that need to poll, check, and alert forever without ever "completing."
The Saga Pattern: Compensation for Consistency
Traditional database transactions follow ACID properties: if any step fails, everything rolls back automatically. But in distributed systems, each step might touch a different service, each with its own database. There's no global transaction coordinator.
The saga pattern solves this by recording compensating actions as you go. For each step that succeeds, you remember how to undo it. If a later step fails, you execute those compensations in reverse order.
Why Reverse Order Matters
Consider an order processing workflow:
- Reserve inventory (compensation: release inventory)
- Process payment (compensation: refund payment)
- Ship order (compensation: cancel shipment)
If shipping fails, you must undo in reverse: first cancel shipment (nothing to cancel, it failed), then refund payment, then release inventory. If you compensated in forward order, you'd release inventory before refunding payment, potentially allowing someone else to buy inventory before the refund completes.
Saga Implementation
Here's a task processing saga that handles failures gracefully:
import dapr.ext.workflow as wf
from dataclasses import dataclass
from typing import List, Tuple
@dataclass
class TaskOrder:
task_id: str
title: str
assignee: str
priority: str
@dataclass
class SagaResult:
status: str
task_id: str
error: str | None = None
def task_processing_saga(ctx: wf.DaprWorkflowContext, order: TaskOrder):
"""Saga workflow with compensation on failure."""
compensations: List[Tuple[str, dict]] = []
try:
# Step 1: Create task record
yield ctx.call_activity(create_task_record, input=order)
compensations.append(("delete_task_record", {"task_id": order.task_id}))
# Step 2: Reserve assignee capacity
yield ctx.call_activity(reserve_assignee_capacity, input=order)
compensations.append(("release_assignee_capacity", {"task_id": order.task_id, "assignee": order.assignee}))
# Step 3: Send notification to assignee
yield ctx.call_activity(notify_assignee, input=order)
compensations.append(("send_cancellation_notice", {"task_id": order.task_id, "assignee": order.assignee}))
# Step 4: Update dashboard (might fail due to external service)
yield ctx.call_activity(update_dashboard, input=order)
return SagaResult(status="success", task_id=order.task_id)
except Exception as e:
# Compensate in reverse order
for comp_name, comp_data in reversed(compensations):
try:
yield ctx.call_activity(comp_name, input=comp_data)
except Exception as comp_error:
# Log but continue compensating
# In production, you might want to alert on compensation failures
pass
return SagaResult(status="failed", task_id=order.task_id, error=str(e))
Output:
# Success case:
SagaResult(status='success', task_id='task-123', error=None)
# Failure case (dashboard update failed):
Creating task record: task-123
Reserving capacity for alice@example.com
Notifying assignee: alice@example.com
Dashboard update failed: Service unavailable
Compensating: send_cancellation_notice for task-123
Compensating: release_assignee_capacity for alice@example.com
Compensating: delete_task_record for task-123
SagaResult(status='failed', task_id='task-123', error='Dashboard service unavailable')
Activity Definitions for Saga
Each activity represents a step that can be compensated:
def create_task_record(ctx, order: TaskOrder) -> dict:
"""Create task in database."""
print(f"Creating task record: {order.task_id}")
# In production: INSERT INTO tasks ...
return {"created": True, "task_id": order.task_id}
def delete_task_record(ctx, data: dict) -> None:
"""Compensation: delete task record."""
print(f"Compensating: delete_task_record for {data['task_id']}")
# In production: DELETE FROM tasks WHERE id = ...
def reserve_assignee_capacity(ctx, order: TaskOrder) -> dict:
"""Reserve slot in assignee's workload."""
print(f"Reserving capacity for {order.assignee}")
return {"reserved": True}
def release_assignee_capacity(ctx, data: dict) -> None:
"""Compensation: release reserved capacity."""
print(f"Compensating: release_assignee_capacity for {data['assignee']}")
def notify_assignee(ctx, order: TaskOrder) -> dict:
"""Send task assignment notification."""
print(f"Notifying assignee: {order.assignee}")
return {"notified": True}
def send_cancellation_notice(ctx, data: dict) -> None:
"""Compensation: notify assignee of cancellation."""
print(f"Compensating: send_cancellation_notice for {data['task_id']}")
def update_dashboard(ctx, order: TaskOrder) -> dict:
"""Update external dashboard (might fail)."""
print(f"Updating dashboard for {order.task_id}")
# Simulate potential failure
import random
if random.random() < 0.3: # 30% failure rate for demo
raise Exception("Dashboard service unavailable")
return {"updated": True}
Key Saga Principles
| Principle | Description |
|---|---|
| Track compensations as you go | Don't wait until failure to figure out rollback |
| Compensate in reverse order | Undo most recent operations first |
| Compensations should be idempotent | Running compensation twice should be safe |
| Handle compensation failures | Log and continue; don't stop mid-compensation |
| Keep compensation logic simple | Complex compensation is a design smell |
The Monitor Pattern: Eternal Workflows
Some workflows need to run forever: health monitors, SLA checkers, quota enforcers. You might think of using a while True: loop, but that's an anti-pattern in Dapr Workflows. Each iteration would add to the workflow history, eventually exhausting memory.
The continue_as_new method solves this. It restarts the workflow from the beginning with new state, discarding the accumulated history. The workflow remains logically continuous while staying resource-bounded.
Monitor Implementation
from dataclasses import dataclass
from datetime import timedelta
@dataclass
class MonitorState:
job_id: str
is_healthy: bool = True
check_count: int = 0
consecutive_failures: int = 0
def health_monitor_workflow(ctx: wf.DaprWorkflowContext, state: MonitorState):
"""Eternal monitoring workflow with continue_as_new."""
# Check current status
status = yield ctx.call_activity(check_service_status, input=state.job_id)
# Determine sleep interval and actions based on status
if status == "healthy":
state.is_healthy = True
state.consecutive_failures = 0
sleep_interval = timedelta(minutes=60) # Check less frequently when healthy
else:
if state.is_healthy:
# Transition from healthy to unhealthy - send alert
state.is_healthy = False
yield ctx.call_activity(send_alert, input={
"job_id": state.job_id,
"message": f"Service {state.job_id} became unhealthy",
"severity": "warning"
})
state.consecutive_failures += 1
# Escalate if failures persist
if state.consecutive_failures >= 3:
yield ctx.call_activity(send_alert, input={
"job_id": state.job_id,
"message": f"Service {state.job_id} unhealthy for {state.consecutive_failures} checks",
"severity": "critical"
})
sleep_interval = timedelta(minutes=5) # Check more frequently when unhealthy
# Sleep until next check
yield ctx.create_timer(sleep_interval)
# Update check count
state.check_count += 1
# Restart workflow with new state (keeps history bounded)
ctx.continue_as_new(state)
Output:
# Workflow runs indefinitely:
Check #1: task-service healthy, sleeping 60 minutes
Check #2: task-service healthy, sleeping 60 minutes
Check #3: task-service UNHEALTHY, alerting (warning), sleeping 5 minutes
Check #4: task-service UNHEALTHY, sleeping 5 minutes
Check #5: task-service UNHEALTHY, alerting (critical - 3 consecutive), sleeping 5 minutes
Check #6: task-service healthy, sleeping 60 minutes
...continues forever...
Monitor Activities
def check_service_status(ctx, job_id: str) -> str:
"""Check if service is healthy."""
# In production: actual health check (HTTP, TCP, custom logic)
import httpx
try:
response = httpx.get(f"http://{job_id}:8000/health", timeout=5)
return "healthy" if response.status_code == 200 else "unhealthy"
except Exception:
return "unhealthy"
def send_alert(ctx, data: dict) -> None:
"""Send alert via notification service."""
print(f"[{data['severity'].upper()}] {data['message']}")
# In production: PagerDuty, Slack, email, etc.
Why continue_as_new?
| Approach | Problem |
|---|---|
| while True loop | History grows unbounded; workflow eventually OOMs |
| Recurring timers only | Same problem; each timer adds to history |
| continue_as_new | Restarts fresh; history stays constant size |
The key insight: continue_as_new doesn't just "continue" - it starts a brand new workflow instance with the provided input, preserving your logical workflow while resetting the execution history.
Human Interaction: Waiting for Approval
Real workflows often need human input: approvals, reviews, decisions. Dapr Workflows support this through external events. Your workflow pauses, waiting for a human (or external system) to raise an event, with an optional timeout.
Approval Workflow Implementation
@dataclass
class ApprovalRequest:
request_id: str
requester: str
amount: float
description: str
@dataclass
class ApprovalDecision:
approved: bool
approver: str
reason: str | None = None
def approval_workflow(ctx: wf.DaprWorkflowContext, request: ApprovalRequest):
"""Workflow that waits for human approval with timeout."""
# Auto-approve small amounts
if request.amount < 1000:
return {
"status": "auto_approved",
"request_id": request.request_id,
"reason": "Amount below approval threshold"
}
# Request approval from manager
yield ctx.call_activity(send_approval_request, input=request)
# Wait for approval or timeout
approval_event = ctx.wait_for_external_event("approval_received")
timeout = ctx.create_timer(timedelta(days=3))
winner = yield wf.when_any([approval_event, timeout])
if winner == timeout:
# Approval timed out
yield ctx.call_activity(send_timeout_notification, input=request)
return {
"status": "timeout",
"request_id": request.request_id,
"reason": "No approval received within 3 days"
}
# Got approval decision
decision: ApprovalDecision = approval_event.get_result()
if not decision.approved:
yield ctx.call_activity(send_rejection_notification, input={
"request": request,
"decision": decision
})
return {
"status": "rejected",
"request_id": request.request_id,
"approver": decision.approver,
"reason": decision.reason
}
# Approved - proceed with action
yield ctx.call_activity(execute_approved_action, input=request)
yield ctx.call_activity(send_approval_confirmation, input={
"request": request,
"decision": decision
})
return {
"status": "approved",
"request_id": request.request_id,
"approver": decision.approver
}
Output:
# Scenario 1: Small amount (auto-approved)
{"status": "auto_approved", "request_id": "req-001", "reason": "Amount below approval threshold"}
# Scenario 2: Approved by manager
Sending approval request for req-002 ($5000) to manager
[3 hours later, manager approves via API]
Executing approved action for req-002
{"status": "approved", "request_id": "req-002", "approver": "manager@company.com"}
# Scenario 3: Timeout (no response)
Sending approval request for req-003 ($5000) to manager
[3 days pass with no response]
Sending timeout notification for req-003
{"status": "timeout", "request_id": "req-003", "reason": "No approval received within 3 days"}
Raising External Events
From outside the workflow (API endpoint, webhook handler, etc.):
from dapr.ext.workflow import DaprWorkflowClient
# In your FastAPI endpoint
@app.post("/approve/{instance_id}")
async def approve_request(instance_id: str, decision: ApprovalDecision):
client = DaprWorkflowClient()
await client.raise_workflow_event(
instance_id=instance_id,
event_name="approval_received",
data={
"approved": decision.approved,
"approver": decision.approver,
"reason": decision.reason
}
)
return {"status": "event_raised", "instance_id": instance_id}
Or via CLI:
dapr workflow raise-event request-002/approval_received \
--app-id approval-service \
--input '{"approved": true, "approver": "manager@company.com"}'
Human Interaction Activities
def send_approval_request(ctx, request: ApprovalRequest) -> None:
"""Send approval request to manager."""
print(f"Sending approval request for {request.request_id} (${request.amount}) to manager")
# In production: email, Slack, dashboard notification
def send_timeout_notification(ctx, request: ApprovalRequest) -> None:
"""Notify requester that approval timed out."""
print(f"Sending timeout notification for {request.request_id}")
def send_rejection_notification(ctx, data: dict) -> None:
"""Notify requester of rejection."""
print(f"Sending rejection notification: {data['decision'].reason}")
def send_approval_confirmation(ctx, data: dict) -> None:
"""Confirm approval to requester."""
print(f"Sending approval confirmation for {data['request'].request_id}")
def execute_approved_action(ctx, request: ApprovalRequest) -> None:
"""Execute the approved action."""
print(f"Executing approved action for {request.request_id}")
Pattern Comparison
| Pattern | Use Case | Key Mechanism |
|---|---|---|
| Saga | Multi-step transactions needing rollback | Compensation list, reverse execution |
| Monitor | Eternal polling/checking | continue_as_new, bounded history |
| Human Interaction | Approval workflows, reviews | wait_for_external_event, timeout |
Common Mistakes
| Mistake | Problem | Fix |
|---|---|---|
| Compensating in forward order | Inconsistent state during rollback | Always reverse the compensations list |
Using while True for monitor | Unbounded history growth | Use continue_as_new |
| No timeout on external events | Workflow stuck forever | Always pair with create_timer |
| Complex compensation logic | Hard to verify correctness | Keep compensations simple and idempotent |
| Ignoring compensation failures | Silent data inconsistency | Log, alert, and consider manual intervention |
Reflect on Your Skill
You've been extending your dapr-deployment skill throughout this chapter. Does it understand saga and monitor patterns?
Test Your Skill
Using my dapr-deployment skill, explain when I should use the saga pattern vs
just retrying failed operations. My task processing has 4 steps, and step 3
sometimes fails due to external API timeouts.
Does your skill cover:
- When saga compensation is needed vs simple retries?
- How to track and execute compensations?
- The monitor pattern for eternal workflows?
- Human interaction with timeouts?
Identify Gaps
Ask yourself:
- Does my skill explain why compensations must execute in reverse order?
- Does it know about
continue_as_newfor bounded history? - Does it understand
when_anyfor combining events with timeouts?
Improve Your Skill
If you found gaps:
My dapr-deployment skill needs saga and monitor pattern knowledge. Update it to include:
- Saga pattern with compensation tracking and reverse execution
- Monitor pattern with continue_as_new for eternal workflows
- Human interaction using wait_for_external_event with timeout
- Common mistakes: forward compensation, while True loops, missing timeouts
Try With AI
Open your AI companion and explore these advanced workflow patterns.
Prompt 1: Design a Saga for Your Domain
I'm building a task management system with these steps:
1. Create task record in database
2. Reserve capacity from assignee's workload
3. Send notification to assignee
4. Update external analytics dashboard
Help me design a saga workflow that:
- Tracks compensation for each step
- Handles the case where step 4 (dashboard) fails
- Compensates in the correct order
Show me the Python code using Dapr Workflows and explain why compensation
order matters for data consistency.
What you're learning: How to identify compensation actions for each step in your specific domain. The AI helps you map your business operations to saga semantics, understanding what "undo" means for each step.
Prompt 2: Implement an Eternal Monitor
I need a workflow that monitors my AI agent's health every 5 minutes forever.
When it's healthy, it should check every 60 minutes. When unhealthy, check
every 5 minutes and send an alert.
The key requirement: this must run for months without running out of memory.
Show me how to implement this using Dapr Workflows with continue_as_new.
Explain what happens to workflow history with vs without continue_as_new.
What you're learning: How to design eternal workflows that don't accumulate unbounded state. The AI explains the technical mechanism of continue_as_new and helps you understand why naive approaches fail.
Prompt 3: Build an Approval Workflow
My task system needs manager approval for high-priority tasks. Design a
workflow that:
1. Auto-approves low-priority tasks
2. Requests manager approval for high-priority tasks
3. Waits up to 48 hours for approval
4. Escalates to director if manager doesn't respond
5. Times out after 72 hours total
Show me the code using wait_for_external_event and create_timer. Include the
API endpoint that managers would call to approve/reject.
What you're learning: How to combine external events with timeouts to create robust approval workflows. The AI demonstrates the pattern of racing events against timers and handling escalation logic.
Safety Note
As you implement saga patterns, remember that compensation logic is critical for data consistency. Test your compensations thoroughly: run them twice (idempotency), run them out of order (should still work), and verify they actually undo the original operation. Monitor patterns that run eternally can accumulate operational costs; ensure your health checks and alerts are appropriately tuned to avoid alert fatigue. External event patterns require secure API endpoints to prevent unauthorized approval injection.