Workflow Patterns: Chaining & Fan-Out
You've built workflows with sequential activities and learned to manage their lifecycle. Now a business requirement arrives: process a batch of 50 task items overnight. Each item needs validation, enrichment, and scoring. Running them one at a time would take hours. Running them in parallel would take minutes.
This is where workflow patterns become essential. The right pattern transforms a multi-hour batch job into a quick parallel operation. The wrong pattern creates race conditions or ignores critical dependencies.
Dapr Workflows provides two fundamental patterns that cover most orchestration needs:
- Task Chaining: When step B requires step A's output (sequential dependencies)
- Fan-Out/Fan-In: When steps are independent and can run simultaneously (parallel processing)
Understanding when to use each pattern is the difference between an efficient workflow and a slow, brittle one.
Task Chaining: Sequential Data Pipelines
Task chaining executes activities in sequence, passing each activity's output as the next activity's input. Think of it as an assembly line: raw material enters, gets transformed at station 1, the transformed output moves to station 2, and so on until the final product emerges.
Input ──> [Step 1] ──> result1 ──> [Step 2] ──> result2 ──> [Step 3] ──> Final Output
When to Use Chaining
Use task chaining when:
- Each step depends on the previous step's output
- Order matters (you can't score before you validate)
- Early failures should prevent later work
- You need a clear audit trail of transformations
Implementing Task Chaining
Here's a complete task processing pipeline that validates, enriches, and scores tasks in sequence:
import dapr.ext.workflow as wf
from dataclasses import dataclass
from datetime import datetime
@dataclass
class TaskInput:
task_id: str
title: str
description: str
@dataclass
class TaskOutput:
task_id: str
status: str
score: int
processed_at: str
def task_pipeline_workflow(ctx: wf.DaprWorkflowContext, task: TaskInput):
"""Process task through validation, enrichment, and scoring stages."""
# Step 1: Validate - check task meets requirements
validation_result = yield ctx.call_activity(validate_task, input=task)
if not validation_result["is_valid"]:
return TaskOutput(
task_id=task.task_id,
status="rejected",
score=0,
processed_at=ctx.current_utc_datetime.isoformat()
)
# Step 2: Enrich - add metadata using validation result
enriched_task = yield ctx.call_activity(
enrich_task,
input={
"task": task,
"validation": validation_result
}
)
# Step 3: Score - calculate priority using enriched data
score_result = yield ctx.call_activity(score_task, input=enriched_task)
return TaskOutput(
task_id=task.task_id,
status="completed",
score=score_result["score"],
processed_at=ctx.current_utc_datetime.isoformat()
)
Output:
Workflow started: task-pipeline-abc123
Step 1: Validating task task-001
-> Validation passed: {"is_valid": true, "issues": []}
Step 2: Enriching task task-001
-> Enriched with tags: ["high-priority", "backend"]
Step 3: Scoring task task-001
-> Score calculated: 85
Workflow completed: {"task_id": "task-001", "status": "completed", "score": 85}
Activity Implementations for Chaining
Each activity receives input and returns output that the next activity can use:
def validate_task(ctx, task: TaskInput) -> dict:
"""Validate task has required fields and meets criteria."""
issues = []
if len(task.title) < 5:
issues.append("Title too short (min 5 characters)")
if len(task.description) < 10:
issues.append("Description too short (min 10 characters)")
is_valid = len(issues) == 0
print(f"Validated task {task.task_id}: valid={is_valid}")
return {
"is_valid": is_valid,
"issues": issues,
"validated_at": datetime.utcnow().isoformat()
}
def enrich_task(ctx, data: dict) -> dict:
"""Add metadata based on validation results."""
task = data["task"]
validation = data["validation"]
# Activities CAN use datetime.utcnow() - they're not replayed
tags = []
if "urgent" in task.title.lower():
tags.append("high-priority")
if "api" in task.description.lower():
tags.append("backend")
print(f"Enriched task {task.task_id} with tags: {tags}")
return {
"task_id": task.task_id,
"title": task.title,
"description": task.description,
"tags": tags,
"validated_at": validation["validated_at"],
"enriched_at": datetime.utcnow().isoformat()
}
def score_task(ctx, enriched_task: dict) -> dict:
"""Calculate priority score based on enriched data."""
base_score = 50
if "high-priority" in enriched_task.get("tags", []):
base_score += 30
if "backend" in enriched_task.get("tags", []):
base_score += 5
print(f"Scored task {enriched_task['task_id']}: {base_score}")
return {
"task_id": enriched_task["task_id"],
"score": base_score,
"scored_at": datetime.utcnow().isoformat()
}
Output:
Validated task task-001: valid=True
Enriched task task-001 with tags: ['high-priority', 'backend']
Scored task task-001: 85
Chaining with Error Handling
Production workflows need error handling. Add compensation logic when a step fails:
def robust_pipeline_workflow(ctx: wf.DaprWorkflowContext, task: TaskInput):
"""Pipeline with error handling and cleanup."""
try:
validation = yield ctx.call_activity(validate_task, input=task)
if not validation["is_valid"]:
yield ctx.call_activity(log_rejection, input={
"task_id": task.task_id,
"reason": validation["issues"]
})
return {"status": "rejected", "issues": validation["issues"]}
enriched = yield ctx.call_activity(enrich_task, input={
"task": task,
"validation": validation
})
scored = yield ctx.call_activity(score_task, input=enriched)
return {"status": "completed", "score": scored["score"]}
except Exception as e:
# Log failure for investigation
yield ctx.call_activity(log_failure, input={
"task_id": task.task_id,
"error": str(e)
})
raise # Re-raise to mark workflow as failed
Fan-Out/Fan-In: Parallel Processing
Fan-out/fan-in schedules multiple activities simultaneously, waits for all to complete, then aggregates results. Think of it as a team working on independent tasks: everyone works in parallel, and you combine the results at the end.
┌──> [Process Item 1] ──┐
│ │
Input ──────┼──> [Process Item 2] ──┼──> Aggregate ──> Output
│ │
└──> [Process Item 3] ──┘
When to Use Fan-Out/Fan-In
Use fan-out/fan-in when:
- Tasks are independent (no data dependencies between items)
- You want to minimize total execution time
- Results need aggregation (sum, average, collect)
- Partial failures are acceptable (or you want individual error handling)
Implementing Fan-Out/Fan-In
Here's a batch processor that analyzes multiple tasks in parallel:
import dapr.ext.workflow as wf
def batch_analysis_workflow(ctx: wf.DaprWorkflowContext, task_ids: list[str]):
"""Analyze multiple tasks in parallel and aggregate scores."""
# Fan-out: Schedule all tasks in parallel
# Each call_activity returns immediately (doesn't block)
parallel_tasks = [
ctx.call_activity(analyze_task, input=task_id)
for task_id in task_ids
]
# Fan-in: Wait for ALL tasks to complete
results = yield wf.when_all(parallel_tasks)
# Aggregate results
total_score = sum(r["score"] for r in results)
avg_score = total_score / len(results) if results else 0
return {
"processed_count": len(results),
"total_score": total_score,
"average_score": round(avg_score, 2),
"individual_results": results
}
def analyze_task(ctx, task_id: str) -> dict:
"""Analyze a single task - can run in parallel with others."""
import time
import random
# Simulate variable processing time
processing_time = random.uniform(0.5, 2.0)
time.sleep(processing_time)
# Calculate score (in reality, this might call external services)
score = random.randint(60, 100)
print(f"Analyzed {task_id}: score={score}, time={processing_time:.2f}s")
return {
"task_id": task_id,
"score": score,
"processing_time": round(processing_time, 2)
}
Output:
Workflow started: batch-analysis-xyz789
Fan-out: Scheduling 5 parallel tasks
Analyzed task-003: score=82, time=0.63s
Analyzed task-001: score=95, time=1.12s
Analyzed task-005: score=71, time=1.34s
Analyzed task-002: score=88, time=1.67s
Analyzed task-004: score=79, time=1.89s
Fan-in: All 5 tasks completed
Workflow completed: {
"processed_count": 5,
"total_score": 415,
"average_score": 83.0
}
Notice how tasks complete out of order (task-003 finished first, task-004 last). The workflow engine handles coordination; you just specify what to run and when to aggregate.
Time Savings from Parallelism
Sequential processing (task chaining) would take the sum of all processing times:
Sequential: 0.63 + 1.12 + 1.34 + 1.67 + 1.89 = 6.65 seconds
Parallel processing (fan-out) takes only as long as the slowest task:
Parallel: max(0.63, 1.12, 1.34, 1.67, 1.89) = 1.89 seconds
For a batch of 50 items averaging 2 seconds each:
| Pattern | Execution Time |
|---|---|
| Sequential (chaining) | 100 seconds |
| Parallel (fan-out) | ~2 seconds |
Handling Partial Failures
What if one task fails? By default, when_all waits for all tasks, but individual failures don't stop others:
def resilient_batch_workflow(ctx: wf.DaprWorkflowContext, items: list[str]):
"""Process batch with individual error handling."""
parallel_tasks = [
ctx.call_activity(process_item_safely, input=item)
for item in items
]
results = yield wf.when_all(parallel_tasks)
# Separate successes from failures
successes = [r for r in results if r["status"] == "success"]
failures = [r for r in results if r["status"] == "failed"]
return {
"total": len(results),
"succeeded": len(successes),
"failed": len(failures),
"failure_ids": [f["item_id"] for f in failures]
}
def process_item_safely(ctx, item: str) -> dict:
"""Process with internal error handling - never raises."""
try:
# Actual processing logic here
result = do_processing(item)
return {"item_id": item, "status": "success", "result": result}
except Exception as e:
return {"item_id": item, "status": "failed", "error": str(e)}
Output:
Processed item-1: success
Processed item-2: success
Processed item-3: FAILED - Connection timeout
Processed item-4: success
Processed item-5: success
Workflow completed: {
"total": 5,
"succeeded": 4,
"failed": 1,
"failure_ids": ["item-3"]
}
Combining Patterns: Chained Fan-Out
Real workflows often combine patterns. Consider processing tasks that each require validation (chained), where validations can run in parallel (fan-out):
def hybrid_workflow(ctx: wf.DaprWorkflowContext, tasks: list[dict]):
"""First validate all in parallel, then process valid ones in parallel."""
# Stage 1: Fan-out for validation
validation_tasks = [
ctx.call_activity(validate_item, input=task)
for task in tasks
]
validations = yield wf.when_all(validation_tasks)
# Filter to valid items only
valid_items = [
v["item"] for v in validations if v["is_valid"]
]
if not valid_items:
return {"status": "no_valid_items", "results": []}
# Stage 2: Fan-out for processing (chained AFTER validation)
processing_tasks = [
ctx.call_activity(process_item, input=item)
for item in valid_items
]
results = yield wf.when_all(processing_tasks)
return {
"status": "completed",
"validated": len(tasks),
"processed": len(results),
"results": results
}
Stage 1 (parallel): Validate all items
├── validate(item-1) ──> valid
├── validate(item-2) ──> invalid (filtered out)
└── validate(item-3) ──> valid
Stage 2 (parallel): Process valid items only
├── process(item-1)
└── process(item-3)
Result: 3 validated, 2 processed
Pattern Selection Decision Framework
| Question | If Yes | If No |
|---|---|---|
| Does step B need step A's output? | Chain them | Consider parallel |
| Are items independent of each other? | Fan-out | Chain or separate workflows |
| Do you need aggregated results? | Fan-in with when_all | Return individual results |
| Must all items complete for success? | when_all + error handling | when_any or individual try/catch |
| Is order significant? | Chain | Fan-out (order undefined) |
Real-World Pattern Applications
| Scenario | Pattern | Reasoning |
|---|---|---|
| Order: Reserve inventory -> Charge payment -> Ship | Chain | Each step depends on previous success |
| Process 100 images for thumbnails | Fan-out | Images are independent |
| ETL: Extract -> Transform -> Load | Chain | Transform needs extracted data |
| Send notifications to 50 users | Fan-out | Each notification is independent |
| Validate form -> Save to DB -> Send confirmation | Chain | Each step depends on previous |
| Run tests across 10 environments | Fan-out | Environments are independent |
Reflect on Your Skill
You extended your dapr-deployment skill in Lesson 0. Does it explain workflow patterns and when to use each?
Test Your Skill
Using my dapr-deployment skill, help me design a workflow for processing
customer orders. Each order needs: (1) inventory check, (2) payment processing,
(3) shipping label creation. Should these be chained or parallel?
Does your skill:
- Recognize the sequential dependency (can't ship before payment)?
- Recommend task chaining for this scenario?
- Explain that fan-out would be wrong here?
Identify Gaps
Ask yourself:
- Does my skill explain
yield wf.when_all(parallel_tasks)for fan-in? - Does it show the list comprehension pattern for fan-out scheduling?
- Does it include the pattern selection decision framework?
Improve Your Skill
If you found gaps:
My dapr-deployment skill needs workflow pattern guidance. Update it to include:
- Task chaining pattern with yield ctx.call_activity passing data between steps
- Fan-out/fan-in pattern with list comprehension and when_all
- Decision framework: chaining when dependencies exist, fan-out when independent
- Hybrid pattern example showing chained fan-out stages
Try With AI
Open your AI companion (Claude, ChatGPT, Gemini) and explore these workflow pattern scenarios.
Prompt 1: Design a Chained Pipeline
Help me implement a Dapr workflow for document processing. The pipeline needs:
1. Parse the document (extract text and metadata)
2. Classify the document type using the parsed content
3. Route to appropriate handler based on classification
Each step depends on the previous step's output. Show me the complete
workflow function and activities with proper data passing between stages.
Use Python with dapr-ext-workflow. Include the yield keyword for durability
and demonstrate how result1 becomes input to step2.
What you're learning: How to structure data flow through chained activities. The AI helps you understand that each yield ctx.call_activity(...) creates a checkpoint, and how to pass structured data between stages.
Prompt 2: Implement Parallel Processing
I need to analyze sentiment for 20 customer feedback items. Each analysis is
independent and takes about 2 seconds. Sequential processing would take 40+
seconds. Show me how to:
1. Fan-out: Schedule all 20 analyses in parallel
2. Fan-in: Wait for all to complete with when_all
3. Aggregate: Calculate average sentiment score
Include the list comprehension pattern for scheduling and demonstrate
the time savings of parallel vs sequential execution.
What you're learning: The mechanics of parallel scheduling and aggregation. The AI shows you how list comprehension with call_activity schedules tasks without blocking, and how when_all synchronizes completion.
Prompt 3: Pattern Selection Challenge
I'm building a workflow for an e-commerce checkout. The steps are:
- Validate cart items are in stock (checks inventory for each item)
- Calculate shipping cost (depends on item weights)
- Apply discount codes (depends on cart total)
- Process payment (depends on final total)
- Send confirmation email (depends on payment success)
- Update inventory (depends on payment success)
Help me decide: which steps should be chained, which can be parallel, and
which can be hybrid (like parallel inventory checks followed by sequential
payment processing)?
Draw out the dependency graph and show me the workflow structure.
What you're learning: Real-world pattern selection requires analyzing dependencies. The AI helps you identify that inventory checks can fan-out (independent per item), but payment must chain after total calculation. This builds your intuition for decomposing complex workflows.
Safety Note
When implementing fan-out patterns, be aware of resource limits. Scheduling 10,000 parallel tasks at once could overwhelm downstream services or exhaust workflow engine resources. For large batches, consider batching your fan-out (process 100 at a time) or using rate limiting in activities. AI suggestions for parallel patterns should be validated against your infrastructure capacity.