Skip to main content
Updated Feb 23, 2026

Workflow Patterns: Chaining & Fan-Out

You've built workflows with sequential activities and learned to manage their lifecycle. Now a business requirement arrives: process a batch of 50 task items overnight. Each item needs validation, enrichment, and scoring. Running them one at a time would take hours. Running them in parallel would take minutes.

This is where workflow patterns become essential. The right pattern transforms a multi-hour batch job into a quick parallel operation. The wrong pattern creates race conditions or ignores critical dependencies.

Dapr Workflows provides two fundamental patterns that cover most orchestration needs:

  • Task Chaining: When step B requires step A's output (sequential dependencies)
  • Fan-Out/Fan-In: When steps are independent and can run simultaneously (parallel processing)

Understanding when to use each pattern is the difference between an efficient workflow and a slow, brittle one.

Task Chaining: Sequential Data Pipelines

Task chaining executes activities in sequence, passing each activity's output as the next activity's input. Think of it as an assembly line: raw material enters, gets transformed at station 1, the transformed output moves to station 2, and so on until the final product emerges.

Input ──> [Step 1] ──> result1 ──> [Step 2] ──> result2 ──> [Step 3] ──> Final Output

When to Use Chaining

Use task chaining when:

  • Each step depends on the previous step's output
  • Order matters (you can't score before you validate)
  • Early failures should prevent later work
  • You need a clear audit trail of transformations

Implementing Task Chaining

Here's a complete task processing pipeline that validates, enriches, and scores tasks in sequence:

import dapr.ext.workflow as wf
from dataclasses import dataclass
from datetime import datetime


@dataclass
class TaskInput:
task_id: str
title: str
description: str


@dataclass
class TaskOutput:
task_id: str
status: str
score: int
processed_at: str


def task_pipeline_workflow(ctx: wf.DaprWorkflowContext, task: TaskInput):
"""Process task through validation, enrichment, and scoring stages."""

# Step 1: Validate - check task meets requirements
validation_result = yield ctx.call_activity(validate_task, input=task)

if not validation_result["is_valid"]:
return TaskOutput(
task_id=task.task_id,
status="rejected",
score=0,
processed_at=ctx.current_utc_datetime.isoformat()
)

# Step 2: Enrich - add metadata using validation result
enriched_task = yield ctx.call_activity(
enrich_task,
input={
"task": task,
"validation": validation_result
}
)

# Step 3: Score - calculate priority using enriched data
score_result = yield ctx.call_activity(score_task, input=enriched_task)

return TaskOutput(
task_id=task.task_id,
status="completed",
score=score_result["score"],
processed_at=ctx.current_utc_datetime.isoformat()
)

Output:

Workflow started: task-pipeline-abc123
Step 1: Validating task task-001
-> Validation passed: {"is_valid": true, "issues": []}
Step 2: Enriching task task-001
-> Enriched with tags: ["high-priority", "backend"]
Step 3: Scoring task task-001
-> Score calculated: 85
Workflow completed: {"task_id": "task-001", "status": "completed", "score": 85}

Activity Implementations for Chaining

Each activity receives input and returns output that the next activity can use:

def validate_task(ctx, task: TaskInput) -> dict:
"""Validate task has required fields and meets criteria."""
issues = []

if len(task.title) < 5:
issues.append("Title too short (min 5 characters)")
if len(task.description) < 10:
issues.append("Description too short (min 10 characters)")

is_valid = len(issues) == 0
print(f"Validated task {task.task_id}: valid={is_valid}")

return {
"is_valid": is_valid,
"issues": issues,
"validated_at": datetime.utcnow().isoformat()
}


def enrich_task(ctx, data: dict) -> dict:
"""Add metadata based on validation results."""
task = data["task"]
validation = data["validation"]

# Activities CAN use datetime.utcnow() - they're not replayed
tags = []
if "urgent" in task.title.lower():
tags.append("high-priority")
if "api" in task.description.lower():
tags.append("backend")

print(f"Enriched task {task.task_id} with tags: {tags}")

return {
"task_id": task.task_id,
"title": task.title,
"description": task.description,
"tags": tags,
"validated_at": validation["validated_at"],
"enriched_at": datetime.utcnow().isoformat()
}


def score_task(ctx, enriched_task: dict) -> dict:
"""Calculate priority score based on enriched data."""
base_score = 50

if "high-priority" in enriched_task.get("tags", []):
base_score += 30
if "backend" in enriched_task.get("tags", []):
base_score += 5

print(f"Scored task {enriched_task['task_id']}: {base_score}")

return {
"task_id": enriched_task["task_id"],
"score": base_score,
"scored_at": datetime.utcnow().isoformat()
}

Output:

Validated task task-001: valid=True
Enriched task task-001 with tags: ['high-priority', 'backend']
Scored task task-001: 85

Chaining with Error Handling

Production workflows need error handling. Add compensation logic when a step fails:

def robust_pipeline_workflow(ctx: wf.DaprWorkflowContext, task: TaskInput):
"""Pipeline with error handling and cleanup."""

try:
validation = yield ctx.call_activity(validate_task, input=task)

if not validation["is_valid"]:
yield ctx.call_activity(log_rejection, input={
"task_id": task.task_id,
"reason": validation["issues"]
})
return {"status": "rejected", "issues": validation["issues"]}

enriched = yield ctx.call_activity(enrich_task, input={
"task": task,
"validation": validation
})

scored = yield ctx.call_activity(score_task, input=enriched)

return {"status": "completed", "score": scored["score"]}

except Exception as e:
# Log failure for investigation
yield ctx.call_activity(log_failure, input={
"task_id": task.task_id,
"error": str(e)
})
raise # Re-raise to mark workflow as failed

Fan-Out/Fan-In: Parallel Processing

Fan-out/fan-in schedules multiple activities simultaneously, waits for all to complete, then aggregates results. Think of it as a team working on independent tasks: everyone works in parallel, and you combine the results at the end.

            ┌──> [Process Item 1] ──┐
│ │
Input ──────┼──> [Process Item 2] ──┼──> Aggregate ──> Output
│ │
└──> [Process Item 3] ──┘

When to Use Fan-Out/Fan-In

Use fan-out/fan-in when:

  • Tasks are independent (no data dependencies between items)
  • You want to minimize total execution time
  • Results need aggregation (sum, average, collect)
  • Partial failures are acceptable (or you want individual error handling)

Implementing Fan-Out/Fan-In

Here's a batch processor that analyzes multiple tasks in parallel:

import dapr.ext.workflow as wf


def batch_analysis_workflow(ctx: wf.DaprWorkflowContext, task_ids: list[str]):
"""Analyze multiple tasks in parallel and aggregate scores."""

# Fan-out: Schedule all tasks in parallel
# Each call_activity returns immediately (doesn't block)
parallel_tasks = [
ctx.call_activity(analyze_task, input=task_id)
for task_id in task_ids
]

# Fan-in: Wait for ALL tasks to complete
results = yield wf.when_all(parallel_tasks)

# Aggregate results
total_score = sum(r["score"] for r in results)
avg_score = total_score / len(results) if results else 0

return {
"processed_count": len(results),
"total_score": total_score,
"average_score": round(avg_score, 2),
"individual_results": results
}


def analyze_task(ctx, task_id: str) -> dict:
"""Analyze a single task - can run in parallel with others."""
import time
import random

# Simulate variable processing time
processing_time = random.uniform(0.5, 2.0)
time.sleep(processing_time)

# Calculate score (in reality, this might call external services)
score = random.randint(60, 100)

print(f"Analyzed {task_id}: score={score}, time={processing_time:.2f}s")

return {
"task_id": task_id,
"score": score,
"processing_time": round(processing_time, 2)
}

Output:

Workflow started: batch-analysis-xyz789
Fan-out: Scheduling 5 parallel tasks
Analyzed task-003: score=82, time=0.63s
Analyzed task-001: score=95, time=1.12s
Analyzed task-005: score=71, time=1.34s
Analyzed task-002: score=88, time=1.67s
Analyzed task-004: score=79, time=1.89s
Fan-in: All 5 tasks completed
Workflow completed: {
"processed_count": 5,
"total_score": 415,
"average_score": 83.0
}

Notice how tasks complete out of order (task-003 finished first, task-004 last). The workflow engine handles coordination; you just specify what to run and when to aggregate.

Time Savings from Parallelism

Sequential processing (task chaining) would take the sum of all processing times:

Sequential: 0.63 + 1.12 + 1.34 + 1.67 + 1.89 = 6.65 seconds

Parallel processing (fan-out) takes only as long as the slowest task:

Parallel: max(0.63, 1.12, 1.34, 1.67, 1.89) = 1.89 seconds

For a batch of 50 items averaging 2 seconds each:

PatternExecution Time
Sequential (chaining)100 seconds
Parallel (fan-out)~2 seconds

Handling Partial Failures

What if one task fails? By default, when_all waits for all tasks, but individual failures don't stop others:

def resilient_batch_workflow(ctx: wf.DaprWorkflowContext, items: list[str]):
"""Process batch with individual error handling."""

parallel_tasks = [
ctx.call_activity(process_item_safely, input=item)
for item in items
]

results = yield wf.when_all(parallel_tasks)

# Separate successes from failures
successes = [r for r in results if r["status"] == "success"]
failures = [r for r in results if r["status"] == "failed"]

return {
"total": len(results),
"succeeded": len(successes),
"failed": len(failures),
"failure_ids": [f["item_id"] for f in failures]
}


def process_item_safely(ctx, item: str) -> dict:
"""Process with internal error handling - never raises."""
try:
# Actual processing logic here
result = do_processing(item)
return {"item_id": item, "status": "success", "result": result}
except Exception as e:
return {"item_id": item, "status": "failed", "error": str(e)}

Output:

Processed item-1: success
Processed item-2: success
Processed item-3: FAILED - Connection timeout
Processed item-4: success
Processed item-5: success

Workflow completed: {
"total": 5,
"succeeded": 4,
"failed": 1,
"failure_ids": ["item-3"]
}

Combining Patterns: Chained Fan-Out

Real workflows often combine patterns. Consider processing tasks that each require validation (chained), where validations can run in parallel (fan-out):

def hybrid_workflow(ctx: wf.DaprWorkflowContext, tasks: list[dict]):
"""First validate all in parallel, then process valid ones in parallel."""

# Stage 1: Fan-out for validation
validation_tasks = [
ctx.call_activity(validate_item, input=task)
for task in tasks
]
validations = yield wf.when_all(validation_tasks)

# Filter to valid items only
valid_items = [
v["item"] for v in validations if v["is_valid"]
]

if not valid_items:
return {"status": "no_valid_items", "results": []}

# Stage 2: Fan-out for processing (chained AFTER validation)
processing_tasks = [
ctx.call_activity(process_item, input=item)
for item in valid_items
]
results = yield wf.when_all(processing_tasks)

return {
"status": "completed",
"validated": len(tasks),
"processed": len(results),
"results": results
}
Stage 1 (parallel): Validate all items
├── validate(item-1) ──> valid
├── validate(item-2) ──> invalid (filtered out)
└── validate(item-3) ──> valid

Stage 2 (parallel): Process valid items only
├── process(item-1)
└── process(item-3)

Result: 3 validated, 2 processed

Pattern Selection Decision Framework

QuestionIf YesIf No
Does step B need step A's output?Chain themConsider parallel
Are items independent of each other?Fan-outChain or separate workflows
Do you need aggregated results?Fan-in with when_allReturn individual results
Must all items complete for success?when_all + error handlingwhen_any or individual try/catch
Is order significant?ChainFan-out (order undefined)

Real-World Pattern Applications

ScenarioPatternReasoning
Order: Reserve inventory -> Charge payment -> ShipChainEach step depends on previous success
Process 100 images for thumbnailsFan-outImages are independent
ETL: Extract -> Transform -> LoadChainTransform needs extracted data
Send notifications to 50 usersFan-outEach notification is independent
Validate form -> Save to DB -> Send confirmationChainEach step depends on previous
Run tests across 10 environmentsFan-outEnvironments are independent

Reflect on Your Skill

You extended your dapr-deployment skill in Lesson 0. Does it explain workflow patterns and when to use each?

Test Your Skill

Using my dapr-deployment skill, help me design a workflow for processing
customer orders. Each order needs: (1) inventory check, (2) payment processing,
(3) shipping label creation. Should these be chained or parallel?

Does your skill:

  • Recognize the sequential dependency (can't ship before payment)?
  • Recommend task chaining for this scenario?
  • Explain that fan-out would be wrong here?

Identify Gaps

Ask yourself:

  • Does my skill explain yield wf.when_all(parallel_tasks) for fan-in?
  • Does it show the list comprehension pattern for fan-out scheduling?
  • Does it include the pattern selection decision framework?

Improve Your Skill

If you found gaps:

My dapr-deployment skill needs workflow pattern guidance. Update it to include:
- Task chaining pattern with yield ctx.call_activity passing data between steps
- Fan-out/fan-in pattern with list comprehension and when_all
- Decision framework: chaining when dependencies exist, fan-out when independent
- Hybrid pattern example showing chained fan-out stages

Try With AI

Open your AI companion (Claude, ChatGPT, Gemini) and explore these workflow pattern scenarios.

Prompt 1: Design a Chained Pipeline

Help me implement a Dapr workflow for document processing. The pipeline needs:
1. Parse the document (extract text and metadata)
2. Classify the document type using the parsed content
3. Route to appropriate handler based on classification

Each step depends on the previous step's output. Show me the complete
workflow function and activities with proper data passing between stages.

Use Python with dapr-ext-workflow. Include the yield keyword for durability
and demonstrate how result1 becomes input to step2.

What you're learning: How to structure data flow through chained activities. The AI helps you understand that each yield ctx.call_activity(...) creates a checkpoint, and how to pass structured data between stages.

Prompt 2: Implement Parallel Processing

I need to analyze sentiment for 20 customer feedback items. Each analysis is
independent and takes about 2 seconds. Sequential processing would take 40+
seconds. Show me how to:

1. Fan-out: Schedule all 20 analyses in parallel
2. Fan-in: Wait for all to complete with when_all
3. Aggregate: Calculate average sentiment score

Include the list comprehension pattern for scheduling and demonstrate
the time savings of parallel vs sequential execution.

What you're learning: The mechanics of parallel scheduling and aggregation. The AI shows you how list comprehension with call_activity schedules tasks without blocking, and how when_all synchronizes completion.

Prompt 3: Pattern Selection Challenge

I'm building a workflow for an e-commerce checkout. The steps are:
- Validate cart items are in stock (checks inventory for each item)
- Calculate shipping cost (depends on item weights)
- Apply discount codes (depends on cart total)
- Process payment (depends on final total)
- Send confirmation email (depends on payment success)
- Update inventory (depends on payment success)

Help me decide: which steps should be chained, which can be parallel, and
which can be hybrid (like parallel inventory checks followed by sequential
payment processing)?

Draw out the dependency graph and show me the workflow structure.

What you're learning: Real-world pattern selection requires analyzing dependencies. The AI helps you identify that inventory checks can fan-out (independent per item), but payment must chain after total calculation. This builds your intuition for decomposing complex workflows.

Safety Note

When implementing fan-out patterns, be aware of resource limits. Scheduling 10,000 parallel tasks at once could overwhelm downstream services or exhaust workflow engine resources. For large batches, consider batching your fan-out (process 100 at a time) or using rate limiting in activities. AI suggestions for parallel patterns should be validated against your infrastructure capacity.