Managing Workflows
You've authored your first workflow with activities. It runs, processes tasks, and returns results. But what happens when you need to:
- Check if a workflow is still running or has failed?
- Wait for a manager's approval before proceeding with a high-value transaction?
- Cancel a workflow that's stuck or no longer needed?
- Clean up old workflow data that's consuming state store space?
Authoring workflows is only half the story. Production systems require workflow management: starting instances on demand, querying their status, injecting external events, and cleaning up completed runs. This lesson gives you the operational toolkit for workflow lifecycle management.
The Workflow Management API
Dapr's DaprWorkflowClient provides the control plane for your workflows. While WorkflowRuntime handles execution (registering and running workflows), DaprWorkflowClient handles management (starting, querying, and controlling workflow instances).
Think of it like this:
| Component | Role | Analogy |
|---|---|---|
WorkflowRuntime | Executes workflows | The factory floor |
DaprWorkflowClient | Manages workflow instances | The control room |
The control room operators don't assemble products; they monitor production lines, start new runs, and intervene when something goes wrong.
Starting Workflows
Using the Python SDK
The schedule_new_workflow method starts a new workflow instance:
from dapr.ext.workflow import DaprWorkflowClient
client = DaprWorkflowClient()
# Start a workflow with auto-generated instance ID
instance_id = client.schedule_new_workflow(
workflow=task_processing_workflow,
input={"task_id": "task-123", "title": "Review PR #456"}
)
print(f"Started workflow: {instance_id}")
Output:
Started workflow: abc123-def456-ghi789
You can also specify a custom instance ID for easier tracking:
# Start with custom instance ID
instance_id = client.schedule_new_workflow(
workflow=task_processing_workflow,
input={"task_id": "task-123", "title": "Review PR #456"},
instance_id="task-123-workflow"
)
print(f"Started workflow: {instance_id}")
Output:
Started workflow: task-123-workflow
Custom instance IDs are valuable for:
- Idempotency: Starting the same workflow twice with the same ID returns the existing instance
- Correlation: Matching workflows to business entities (order-12345, approval-request-789)
- Debugging: Finding specific workflow instances in logs and traces
Using the CLI
The Dapr CLI provides workflow management without writing code:
# Start a workflow with input data
dapr workflow run task_processing_workflow \
--app-id task-service \
--input '{"task_id": "task-123", "title": "Review PR"}'
Output:
Workflow instance started: abc123-def456-ghi789
# Start with specific instance ID
dapr workflow run task_processing_workflow \
--app-id task-service \
--instance-id task-123-workflow \
--input '{"task_id": "task-123", "title": "Review PR"}'
Output:
Workflow instance started: task-123-workflow
Querying Workflow Status
Once a workflow is running, you need visibility into its state. The get_workflow_state method returns comprehensive status information:
from dapr.ext.workflow import DaprWorkflowClient
client = DaprWorkflowClient()
# Query workflow status
state = client.get_workflow_state(
instance_id="task-123-workflow",
fetch_payloads=True # Include input/output data
)
print(f"Instance ID: {state.instance_id}")
print(f"Workflow Name: {state.workflow_name}")
print(f"Status: {state.runtime_status}")
print(f"Created: {state.created_at}")
print(f"Last Updated: {state.last_updated_at}")
Output:
Instance ID: task-123-workflow
Workflow Name: task_processing_workflow
Status: RUNNING
Created: 2025-01-15 10:30:00+00:00
Last Updated: 2025-01-15 10:30:05+00:00
Runtime Status Values
| Status | Meaning |
|---|---|
RUNNING | Workflow is actively executing or waiting for an event/timer |
COMPLETED | Workflow finished successfully with output |
FAILED | Workflow threw an unhandled exception |
TERMINATED | Workflow was forcibly stopped via terminate_workflow |
SUSPENDED | Workflow was paused via pause_workflow |
PENDING | Workflow is scheduled but hasn't started yet |
CLI Status Query
# Get workflow status
dapr workflow get task-123-workflow --app-id task-service
Output:
Instance ID: task-123-workflow
Workflow Name: task_processing_workflow
Status: RUNNING
Created: 2025-01-15T10:30:00Z
Last Updated: 2025-01-15T10:30:05Z
# List all workflows (with optional filters)
dapr workflow list --app-id task-service --filter-status RUNNING
Output:
INSTANCE ID WORKFLOW NAME STATUS CREATED
task-123-workflow task_processing_workflow RUNNING 2025-01-15T10:30:00Z
task-456-workflow task_processing_workflow RUNNING 2025-01-15T10:28:00Z
Waiting for Completion
For synchronous scenarios where you need to block until a workflow finishes:
from dapr.ext.workflow import DaprWorkflowClient
client = DaprWorkflowClient()
# Start workflow
instance_id = client.schedule_new_workflow(
workflow=task_processing_workflow,
input={"task_id": "task-123"}
)
# Wait for completion (blocks until done or timeout)
final_state = client.wait_for_workflow_completion(
instance_id=instance_id,
timeout_in_seconds=60
)
print(f"Final status: {final_state.runtime_status}")
if final_state.runtime_status == "COMPLETED":
print(f"Output: {final_state.serialized_output}")
Output:
Final status: COMPLETED
Output: {"status": "completed", "assigned_to": "alice@example.com"}
Raising External Events
External events enable human-in-the-loop workflows and integration with external systems. A workflow pauses with wait_for_external_event, and an external process (API call, CLI command, another service) sends the event to resume execution.
Workflow Side: Waiting for Events
import dapr.ext.workflow as wf
from datetime import timedelta
@wfr.workflow(name="approval_workflow")
def approval_workflow(ctx: wf.DaprWorkflowContext, order: dict):
# Check if approval is required
if order["amount"] > 1000:
# Request approval from manager
yield ctx.call_activity(request_approval, input=order)
# Wait for external event OR timeout
approval_event = ctx.wait_for_external_event("approval_decision")
timeout = ctx.create_timer(timedelta(days=3))
# Race: whichever happens first wins
winner = yield wf.when_any([approval_event, timeout])
if winner == timeout:
return {"status": "rejected", "reason": "approval_timeout"}
# Get the approval decision
decision = approval_event.get_result()
if not decision.get("approved"):
return {"status": "rejected", "reason": decision.get("reason")}
# Process the order
yield ctx.call_activity(process_order, input=order)
return {"status": "completed"}
The workflow pauses at wait_for_external_event("approval_decision") until one of two things happens:
- Someone raises an event named
approval_decision - The 3-day timer expires
Raising Events via SDK
from dapr.ext.workflow import DaprWorkflowClient
client = DaprWorkflowClient()
# Raise the approval event
client.raise_workflow_event(
instance_id="order-12345-workflow",
event_name="approval_decision",
data={"approved": True, "approver": "manager@company.com"}
)
print("Approval event sent")
Output:
Approval event sent
Raising Events via CLI
# Approve the order
dapr workflow raise-event order-12345-workflow/approval_decision \
--app-id order-service \
--input '{"approved": true, "approver": "manager@company.com"}'
Output:
Event 'approval_decision' raised for workflow 'order-12345-workflow'
Event Timing: What If Events Arrive Early?
A common question: "What if I raise an event before the workflow reaches wait_for_external_event?"
Dapr buffers events. If an event arrives before the workflow is ready to receive it, the event is stored and delivered when the workflow reaches the wait point. This prevents race conditions in asynchronous systems.
Pausing and Resuming Workflows
For administrative control, you can pause a running workflow and resume it later:
from dapr.ext.workflow import DaprWorkflowClient
client = DaprWorkflowClient()
# Pause a running workflow
client.pause_workflow(instance_id="task-123-workflow")
print("Workflow paused")
# ... later ...
# Resume the paused workflow
client.resume_workflow(instance_id="task-123-workflow")
print("Workflow resumed")
Output:
Workflow paused
Workflow resumed
CLI Pause/Resume
# Pause with reason
dapr workflow suspend task-123-workflow \
--app-id task-service \
--reason "Waiting for database maintenance"
# Resume with reason
dapr workflow resume task-123-workflow \
--app-id task-service \
--reason "Maintenance complete"
Output:
Workflow 'task-123-workflow' suspended
Workflow 'task-123-workflow' resumed
Use cases for pause/resume:
- Maintenance windows: Pause workflows before infrastructure changes
- Manual intervention: Stop a workflow to investigate unexpected behavior
- Batch processing: Pause workflows during peak hours, resume during off-peak
Terminating Workflows
Termination forcibly stops a workflow immediately. The workflow is marked as TERMINATED and no further activities execute.
from dapr.ext.workflow import DaprWorkflowClient
client = DaprWorkflowClient()
# Terminate a stuck or unwanted workflow
client.terminate_workflow(instance_id="task-123-workflow")
print("Workflow terminated")
Output:
Workflow terminated
CLI Termination
dapr workflow terminate task-123-workflow \
--app-id task-service \
--output '{"reason": "Cancelled by customer"}'
Output:
Workflow 'task-123-workflow' terminated
Terminate vs Pause
| Operation | Effect | Recoverable? | Use When |
|---|---|---|---|
| Pause | Suspends execution, preserves state | Yes (resume) | Temporary hold, maintenance |
| Terminate | Stops execution permanently | No | Workflow is broken or no longer needed |
Warning: Termination does not run compensation logic. If you need cleanup (rollback transactions, release resources), implement a cancellation event pattern instead of hard termination.
Purging Workflow History
Completed, failed, and terminated workflows leave history in the state store. Over time, this accumulates. Purging removes workflow metadata permanently.
from dapr.ext.workflow import DaprWorkflowClient
client = DaprWorkflowClient()
# Purge a specific completed workflow
client.purge_workflow(instance_id="task-123-workflow")
print("Workflow history purged")
Output:
Workflow history purged
CLI Purge Operations
# Purge a specific instance
dapr workflow purge task-123-workflow --app-id task-service
Output:
Workflow 'task-123-workflow' purged
# Purge all completed workflows older than 30 days (720 hours)
dapr workflow purge --app-id task-service --all-older-than 720h
Output:
Purged 47 workflow instances older than 720h
# Purge all terminal workflows (COMPLETED, FAILED, TERMINATED)
dapr workflow purge --app-id task-service --all
Output:
Purged 156 workflow instances
Purge Constraints
You can only purge workflows in terminal states:
COMPLETEDFAILEDTERMINATED
Attempting to purge a RUNNING or SUSPENDED workflow returns an error. This prevents accidental deletion of active work.
Best Practice: Scheduled Purging
In production, schedule regular purge operations to manage state store size:
# Weekly cron job: purge workflows older than 90 days (2160 hours)
0 0 * * 0 dapr workflow purge --app-id task-service --all-older-than 2160h
Complete Management Example
Here's a FastAPI application exposing workflow management endpoints:
from fastapi import FastAPI, HTTPException
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowClient, DaprWorkflowContext
from contextlib import asynccontextmanager
wfr = WorkflowRuntime()
@wfr.workflow(name="approval_workflow")
def approval_workflow(ctx: DaprWorkflowContext, order: dict):
if order["amount"] > 1000:
yield ctx.call_activity(request_approval_activity, input=order)
approval = ctx.wait_for_external_event("approval_decision")
decision = yield approval
if not decision.get("approved"):
return {"status": "rejected", "reason": decision.get("reason")}
yield ctx.call_activity(process_order_activity, input=order)
return {"status": "completed"}
@wfr.activity(name="request_approval_activity")
def request_approval_activity(ctx, order: dict):
print(f"Approval requested for order {order['order_id']}")
return {"requested": True}
@wfr.activity(name="process_order_activity")
def process_order_activity(ctx, order: dict):
print(f"Processing order {order['order_id']}")
return {"processed": True}
@asynccontextmanager
async def lifespan(app: FastAPI):
wfr.start()
yield
wfr.shutdown()
app = FastAPI(lifespan=lifespan)
@app.post("/orders/{order_id}/start")
async def start_order_workflow(order_id: str, amount: float = 500):
client = DaprWorkflowClient()
instance_id = client.schedule_new_workflow(
workflow=approval_workflow,
input={"order_id": order_id, "amount": amount},
instance_id=f"order-{order_id}-workflow"
)
return {"instance_id": instance_id, "status": "started"}
@app.get("/orders/{order_id}/status")
async def get_order_status(order_id: str):
client = DaprWorkflowClient()
state = client.get_workflow_state(f"order-{order_id}-workflow")
if not state:
raise HTTPException(404, "Workflow not found")
return {
"instance_id": state.instance_id,
"status": state.runtime_status,
"created_at": str(state.created_at)
}
@app.post("/orders/{order_id}/approve")
async def approve_order(order_id: str, approved: bool = True, reason: str = None):
client = DaprWorkflowClient()
client.raise_workflow_event(
instance_id=f"order-{order_id}-workflow",
event_name="approval_decision",
data={"approved": approved, "reason": reason}
)
return {"event": "approval_decision", "sent": True}
@app.delete("/orders/{order_id}/workflow")
async def terminate_and_purge(order_id: str):
client = DaprWorkflowClient()
instance_id = f"order-{order_id}-workflow"
state = client.get_workflow_state(instance_id)
if state and state.runtime_status == "RUNNING":
client.terminate_workflow(instance_id)
client.purge_workflow(instance_id)
return {"instance_id": instance_id, "purged": True}
Testing the endpoints:
# Start high-value order (requires approval)
curl -X POST "http://localhost:8000/orders/12345/start?amount=1500"
Output:
{"instance_id": "order-12345-workflow", "status": "started"}
# Check status (waiting for approval)
curl "http://localhost:8000/orders/12345/status"
Output:
{"instance_id": "order-12345-workflow", "status": "RUNNING", "created_at": "2025-01-15 10:30:00"}
# Approve the order
curl -X POST "http://localhost:8000/orders/12345/approve?approved=true"
Output:
{"event": "approval_decision", "sent": true}
Reflect on Your Skill
Does your dapr-deployment skill understand workflow management operations?
Test Your Skill
Using my dapr-deployment skill, explain how to implement a human approval step
in a Dapr workflow. Show both the workflow code that waits for approval and
the client code that sends the approval event.
Your skill should cover:
wait_for_external_eventin workflow coderaise_workflow_eventfrom client code- The event buffering behavior (events can arrive before workflow is ready)
Identify Gaps
Ask yourself:
- Did my skill explain the difference between terminate and purge?
- Did it mention that purge only works on terminal workflows?
- Did it cover the CLI commands for operational management?
Improve Your Skill
If gaps exist:
My dapr-deployment skill needs workflow management coverage. Update it to include:
- The DaprWorkflowClient API for start/query/event/terminate/purge
- The wait_for_external_event + raise_workflow_event pattern
- CLI commands: dapr workflow run, get, raise-event, terminate, purge
- The constraint that purge only works on COMPLETED/FAILED/TERMINATED workflows
Try With AI
Open your AI companion and explore workflow management patterns.
Prompt 1: Design an Approval Workflow
Help me design a purchase approval workflow with these requirements:
- Orders under $500: auto-approve
- Orders $500-$5000: requires manager approval within 3 days
- Orders over $5000: requires VP approval within 5 days
Show me the workflow code with wait_for_external_event and the FastAPI
endpoints for managers/VPs to submit their approval decisions.
Include proper timeout handling: what happens if approval doesn't arrive in time?
What you're learning: How to compose external events, timers, and conditional logic into real business workflows. The AI helps you structure multi-tier approval with escalation.
Prompt 2: Build a Workflow Dashboard
I want to build a simple dashboard for monitoring my Dapr workflows. Help me create:
1. A FastAPI endpoint that lists all running workflows with their status
2. An endpoint that shows workflow history (when it started, what activities ran)
3. Bulk operations: terminate all stuck workflows, purge all completed older than X days
Show me how to use get_workflow_state effectively and any CLI commands that
would help with operational visibility.
What you're learning: Operational patterns for production workflow systems. The AI guides you through monitoring and bulk management capabilities.
Prompt 3: Event Pattern Troubleshooting
I'm having trouble with external events in my Dapr workflow. My workflow calls
wait_for_external_event("payment_confirmed") but even after I call
raise_workflow_event with the same event name, the workflow doesn't continue.
Help me debug this:
- How do I verify the event was received?
- What are common mistakes with event names?
- How do I check if the workflow is actually at the wait point?
- What logs should I look at in the Dapr sidecar?
What you're learning: Debugging techniques for workflow event handling. The AI helps you systematically isolate event delivery issues.
Safety Note
Workflow management operations are powerful. terminate_workflow stops execution without compensation logic; purge_workflow permanently deletes history. In production:
- Implement access controls on management endpoints
- Log all management operations for audit trails
- Test terminate/purge behavior in staging before production use
- Consider soft-delete patterns (mark as cancelled) before hard termination