Skip to main content
Updated Feb 23, 2026

Authoring Workflows

You understand workflow architecture from the previous lessons: durable execution through event sourcing, replay-based recovery, and the determinism rules that make it all work. Now it's time to write actual workflow code.

Your Task API needs to process tasks through multiple stages: validation, assignment, and notification. Each stage might fail. The process might take hours if it requires human approval. Your pod might restart mid-processing. With traditional code, you'd lose state. With a Dapr workflow, you pick up exactly where you left off.

The dapr-ext-workflow Python SDK gives you the building blocks: workflow functions that orchestrate, activity functions that execute units of work, and a runtime that manages durability. By the end of this lesson, you'll have a working workflow that survives restarts and chains activities together seamlessly.

Installing the Workflow SDK

Start by adding the workflow extension to your project:

pip install dapr-ext-workflow

Output:

Successfully installed dapr-ext-workflow-x.x.x durabletask-x.x.x

This installs dapr-ext-workflow and its dependency durabletask, which provides the underlying durable execution framework.

Activities: Your Units of Work

Activities are where real work happens. They call APIs, access databases, generate timestamps, read environment variables. All the non-deterministic operations that workflows cannot do directly.

An activity is a simple function decorated with @wfr.activity:

from dapr.ext.workflow import WorkflowRuntime, WorkflowActivityContext
from datetime import datetime

wfr = WorkflowRuntime()

@wfr.activity(name="validate_task")
def validate_task(ctx: WorkflowActivityContext, task: dict) -> dict:
"""Validate that task has required fields."""
print(f"Validating task: {task['task_id']}")

# Activities CAN use non-deterministic operations
validated_at = datetime.utcnow().isoformat()

if not task.get("title"):
return {"valid": False, "reason": "Missing title", "validated_at": validated_at}
if not task.get("assignee"):
return {"valid": False, "reason": "Missing assignee", "validated_at": validated_at}

return {"valid": True, "validated_at": validated_at}

Key points about activities:

AspectDetail
Decorator@wfr.activity(name="activity_name") registers the activity
ContextWorkflowActivityContext provides workflow and task IDs
InputsSecond parameter receives data passed from workflow
OutputsReturn value goes back to the workflow
DeterminismActivities CAN be non-deterministic (timestamps, random, API calls)

Let's add two more activities to complete our task processing pipeline:

@wfr.activity(name="assign_task")
def assign_task(ctx: WorkflowActivityContext, task: dict) -> dict:
"""Assign task to the specified user."""
print(f"Assigning task {task['task_id']} to {task['assignee']}")

# Simulate assignment (would call a user service in production)
assigned_at = datetime.utcnow().isoformat()

return {
"assigned": True,
"assignee": task["assignee"],
"assigned_at": assigned_at
}

@wfr.activity(name="send_notification")
def send_notification(ctx: WorkflowActivityContext, notification: dict) -> dict:
"""Send notification about task assignment."""
print(f"Sending notification to {notification['recipient']}: {notification['message']}")

# Would call notification service in production
sent_at = datetime.utcnow().isoformat()

return {"sent": True, "sent_at": sent_at}

Output when activities run:

Validating task: task-123
Assigning task task-123 to alice@example.com
Sending notification to alice@example.com: You have been assigned task: Review PR

Notice that activities print output, use datetime.utcnow(), and could call external services. This is exactly what workflows CANNOT do directly. Activities are the escape hatch for real-world operations.

Workflows: Your Orchestrators

A workflow function orchestrates activities. It defines the sequence, handles branching logic, and manages data flow. The magic happens with yield: each yield ctx.call_activity(...) is a durability checkpoint.

from dapr.ext.workflow import DaprWorkflowContext
from dataclasses import dataclass

@dataclass
class TaskInput:
task_id: str
title: str
assignee: str

@dataclass
class TaskResult:
task_id: str
status: str
message: str

@wfr.workflow(name="task_processing_workflow")
def task_processing_workflow(ctx: DaprWorkflowContext, task_input: TaskInput):
"""Orchestrate task processing: validate -> assign -> notify."""

# Prepare task data
task = {
"task_id": task_input.task_id,
"title": task_input.title,
"assignee": task_input.assignee
}

# Step 1: Validate the task
validation = yield ctx.call_activity(validate_task, input=task)

if not validation["valid"]:
return TaskResult(
task_id=task_input.task_id,
status="rejected",
message=f"Validation failed: {validation['reason']}"
)

# Step 2: Assign the task
assignment = yield ctx.call_activity(assign_task, input=task)

# Step 3: Send notification
notification = {
"recipient": task_input.assignee,
"message": f"You have been assigned task: {task_input.title}"
}
notify_result = yield ctx.call_activity(send_notification, input=notification)

return TaskResult(
task_id=task_input.task_id,
status="completed",
message=f"Task assigned to {assignment['assignee']} at {assignment['assigned_at']}"
)

Understanding the yield Keyword

The yield keyword is not just Python's generator mechanism. In Dapr workflows, each yield is a durability checkpoint:

Workflow starts


yield ctx.call_activity(validate_task, ...)

├─► Engine records: "activity validate_task called"
├─► Activity executes
└─► Engine records: "activity validate_task completed, result={...}"


yield ctx.call_activity(assign_task, ...)

├─► Engine records: "activity assign_task called"
└─► ... POD CRASHES HERE ...

... Pod restarts ...

Workflow replays:


yield ctx.call_activity(validate_task, ...)
└─► Engine sees: "already completed" → returns cached result (no re-execution)


yield ctx.call_activity(assign_task, ...)
└─► Engine sees: "was called but didn't complete" → re-executes activity

Each yield creates a checkpoint. If the workflow restarts, the engine replays from history: completed activities return cached results instantly, incomplete activities re-execute.

Setting Up the WorkflowRuntime

Activities and workflows must be registered with a WorkflowRuntime before they can execute. Here's a complete FastAPI application with proper lifecycle management:

from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowClient

# Create the runtime instance
wfr = WorkflowRuntime()

# ... (activities and workflow defined above using @wfr.activity and @wfr.workflow)

@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage workflow runtime lifecycle."""
# Startup: Start the workflow runtime
wfr.start()
print("Workflow runtime started")

yield

# Shutdown: Stop the workflow runtime
wfr.shutdown()
print("Workflow runtime stopped")

app = FastAPI(lifespan=lifespan)

Output on startup:

Workflow runtime started
INFO: Uvicorn running on http://0.0.0.0:8000

Output on shutdown:

Workflow runtime stopped
INFO: Shutting down

Why Lifespan Matters

The workflow runtime needs to:

  1. Connect to Dapr sidecar on startup (port 50001 gRPC)
  2. Register workflows and activities so Dapr knows what code to execute
  3. Clean up connections on shutdown

Using FastAPI's lifespan context manager ensures the runtime starts before your app accepts requests and shuts down gracefully when the app terminates.

Starting Workflows with DaprWorkflowClient

The DaprWorkflowClient schedules new workflow instances and queries their status:

from dapr.ext.workflow import DaprWorkflowClient

class StartTaskRequest(BaseModel):
task_id: str
title: str
assignee: str

class WorkflowResponse(BaseModel):
instance_id: str
status: str

@app.post("/tasks/process", response_model=WorkflowResponse)
async def start_task_processing(request: StartTaskRequest):
"""Start a new task processing workflow."""

# Create workflow client
client = DaprWorkflowClient()

# Prepare input
task_input = TaskInput(
task_id=request.task_id,
title=request.title,
assignee=request.assignee
)

# Schedule the workflow
instance_id = client.schedule_new_workflow(
workflow=task_processing_workflow,
input=task_input,
instance_id=f"task-{request.task_id}" # Optional: custom instance ID
)

return WorkflowResponse(
instance_id=instance_id,
status="scheduled"
)

Test it with curl:

curl -X POST http://localhost:8000/tasks/process \
-H "Content-Type: application/json" \
-d '{"task_id": "123", "title": "Review PR", "assignee": "alice@example.com"}'

Output:

{
"instance_id": "task-123",
"status": "scheduled"
}

Workflow logs:

Validating task: 123
Assigning task 123 to alice@example.com
Sending notification to alice@example.com: You have been assigned task: Review PR

Querying Workflow Status

Add an endpoint to check workflow progress:

@app.get("/tasks/{instance_id}/status")
async def get_task_status(instance_id: str):
"""Get the status of a task processing workflow."""

client = DaprWorkflowClient()

state = client.get_workflow_state(instance_id, fetch_payloads=True)

if not state:
raise HTTPException(status_code=404, detail="Workflow not found")

return {
"instance_id": instance_id,
"status": state.runtime_status.name,
"output": state.serialized_output if state.runtime_status.name == "COMPLETED" else None
}

Output for running workflow:

{
"instance_id": "task-123",
"status": "RUNNING",
"output": null
}

Output for completed workflow:

{
"instance_id": "task-123",
"status": "COMPLETED",
"output": "{\"task_id\": \"123\", \"status\": \"completed\", \"message\": \"Task assigned to alice@example.com at 2025-01-15T10:30:00\"}"
}

Complete Working Example

Here's the full application combining everything:

from contextlib import asynccontextmanager
from dataclasses import dataclass
from datetime import datetime
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from dapr.ext.workflow import (
WorkflowRuntime,
DaprWorkflowContext,
WorkflowActivityContext,
DaprWorkflowClient
)

# =============================================================================
# Data Classes
# =============================================================================

@dataclass
class TaskInput:
task_id: str
title: str
assignee: str

@dataclass
class TaskResult:
task_id: str
status: str
message: str

# =============================================================================
# Workflow Runtime
# =============================================================================

wfr = WorkflowRuntime()

# =============================================================================
# Activities
# =============================================================================

@wfr.activity(name="validate_task")
def validate_task(ctx: WorkflowActivityContext, task: dict) -> dict:
"""Validate task has required fields."""
print(f"[Activity] Validating task: {task['task_id']}")
validated_at = datetime.utcnow().isoformat()

if not task.get("title"):
return {"valid": False, "reason": "Missing title", "validated_at": validated_at}
if not task.get("assignee"):
return {"valid": False, "reason": "Missing assignee", "validated_at": validated_at}

return {"valid": True, "validated_at": validated_at}

@wfr.activity(name="assign_task")
def assign_task(ctx: WorkflowActivityContext, task: dict) -> dict:
"""Assign task to user."""
print(f"[Activity] Assigning task {task['task_id']} to {task['assignee']}")
assigned_at = datetime.utcnow().isoformat()

return {
"assigned": True,
"assignee": task["assignee"],
"assigned_at": assigned_at
}

@wfr.activity(name="send_notification")
def send_notification(ctx: WorkflowActivityContext, notification: dict) -> dict:
"""Send notification to user."""
print(f"[Activity] Notifying {notification['recipient']}: {notification['message']}")
sent_at = datetime.utcnow().isoformat()

return {"sent": True, "sent_at": sent_at}

# =============================================================================
# Workflow
# =============================================================================

@wfr.workflow(name="task_processing_workflow")
def task_processing_workflow(ctx: DaprWorkflowContext, task_input: TaskInput):
"""Orchestrate task processing through validation, assignment, and notification."""

task = {
"task_id": task_input.task_id,
"title": task_input.title,
"assignee": task_input.assignee
}

# Step 1: Validate
validation = yield ctx.call_activity(validate_task, input=task)
if not validation["valid"]:
return TaskResult(task_input.task_id, "rejected", validation["reason"])

# Step 2: Assign
assignment = yield ctx.call_activity(assign_task, input=task)

# Step 3: Notify
notification = {
"recipient": task_input.assignee,
"message": f"You have been assigned: {task_input.title}"
}
yield ctx.call_activity(send_notification, input=notification)

return TaskResult(
task_input.task_id,
"completed",
f"Assigned to {assignment['assignee']} at {assignment['assigned_at']}"
)

# =============================================================================
# FastAPI Application
# =============================================================================

@asynccontextmanager
async def lifespan(app: FastAPI):
wfr.start()
print("Workflow runtime started")
yield
wfr.shutdown()
print("Workflow runtime stopped")

app = FastAPI(title="Task Workflow Service", lifespan=lifespan)

class StartTaskRequest(BaseModel):
task_id: str
title: str
assignee: str

@app.post("/tasks/process")
async def start_task_processing(request: StartTaskRequest):
"""Start task processing workflow."""
client = DaprWorkflowClient()

instance_id = client.schedule_new_workflow(
workflow=task_processing_workflow,
input=TaskInput(request.task_id, request.title, request.assignee),
instance_id=f"task-{request.task_id}"
)

return {"instance_id": instance_id, "status": "scheduled"}

@app.get("/tasks/{instance_id}/status")
async def get_task_status(instance_id: str):
"""Get workflow status."""
client = DaprWorkflowClient()
state = client.get_workflow_state(instance_id, fetch_payloads=True)

if not state:
raise HTTPException(status_code=404, detail="Workflow not found")

return {
"instance_id": instance_id,
"status": state.runtime_status.name,
"output": state.serialized_output if state.runtime_status.name == "COMPLETED" else None
}

Run with Dapr:

dapr run --app-id task-workflow --app-port 8000 --dapr-grpc-port 50001 -- uvicorn main:app --host 0.0.0.0 --port 8000

Output:

== APP == Workflow runtime started
== APP == INFO: Uvicorn running on http://0.0.0.0:8000

Reflect on Your Skill

You extended your dapr-deployment skill with actor and workflow patterns in Lesson 0. Does it now cover workflow authoring?

Test Your Skill

Using my dapr-deployment skill, generate a workflow that processes customer orders
with three activities: validate_order, reserve_inventory, and process_payment.
Include the WorkflowRuntime setup and a FastAPI endpoint to start the workflow.

Does your skill generate:

  • Proper @wfr.activity decorators with WorkflowActivityContext?
  • A @wfr.workflow decorator with DaprWorkflowContext?
  • Correct yield statements for each activity call?
  • WorkflowRuntime with lifespan integration?
  • DaprWorkflowClient usage for scheduling?

Identify Gaps

Ask yourself:

  • Did the generated activities use dataclasses or dicts for input/output?
  • Did the workflow handle validation failure by returning early?
  • Did the FastAPI endpoint use async/await correctly?
  • Did the code include proper type hints?

Improve Your Skill

If you found gaps:

My dapr-deployment skill needs better workflow authoring patterns. Update it to include:
- Activity definition with @wfr.activity and WorkflowActivityContext
- Workflow definition with @wfr.workflow and yield for durability
- WorkflowRuntime setup with FastAPI lifespan
- DaprWorkflowClient for scheduling workflows
- Data passing between activities using input/return values

Try With AI

Open your AI companion (Claude, ChatGPT, Gemini) and explore these scenarios.

Prompt 1: Understand the yield Mechanism

Explain what happens at each yield statement in a Dapr workflow. I have this code:

validation = yield ctx.call_activity(validate_task, input=task)
assignment = yield ctx.call_activity(assign_task, input=task)

Walk me through:
1. What the workflow engine records when each yield is reached
2. What happens if my pod crashes between the two yield statements
3. How replay uses the recorded history to skip completed activities

Use a timeline or step-by-step breakdown to show the durability mechanism.

What you're learning: How yield creates durability checkpoints. The AI helps you trace through the event sourcing mechanism that makes workflows fault-tolerant.

Prompt 2: Design a Multi-Activity Workflow

Help me design a workflow for processing job applications. The workflow should:
1. Validate the application (check required fields)
2. Screen for keywords (skills matching)
3. Score the candidate (0-100 based on criteria)
4. Route based on score (high: interview, medium: review, low: reject)
5. Send appropriate notification

Show me the activities, workflow function, and data passing between steps.
Each activity should return data that the next activity needs.

What you're learning: Designing data flow between activities. The AI demonstrates how to structure input/output types so each activity has what it needs.

Prompt 3: Debug a Workflow Issue

My workflow isn't progressing past the first activity. Here's what I see in logs:

[Activity] Validating task: 123
... nothing after this ...

My workflow code:
@wfr.workflow(name="task_workflow")
def task_workflow(ctx: DaprWorkflowContext, task: dict):
validation = yield ctx.call_activity(validate_task, input=task)
print(f"Validation result: {validation}") # This never prints
# ... more activities

Help me debug:
1. What could cause the workflow to stop after the first activity?
2. How do I check if the activity returned an error?
3. What logs should I look at in the Dapr sidecar?

What you're learning: Debugging workflow execution. The AI walks you through common issues: activity exceptions, serialization errors, and sidecar connectivity problems.

Safety Note

When authoring workflows, remember that workflow code must be deterministic. Never use datetime.now(), random, or direct API calls inside the workflow function itself. Always put non-deterministic operations inside activities. The workflow engine replays your workflow code multiple times; non-deterministic operations will cause replay failures and corrupt workflow state.