Skip to main content
Updated Feb 23, 2026

Managing Workflows

You've authored your first workflow with activities. It runs, processes tasks, and returns results. But what happens when you need to:

  • Check if a workflow is still running or has failed?
  • Wait for a manager's approval before proceeding with a high-value transaction?
  • Cancel a workflow that's stuck or no longer needed?
  • Clean up old workflow data that's consuming state store space?

Authoring workflows is only half the story. Production systems require workflow management: starting instances on demand, querying their status, injecting external events, and cleaning up completed runs. This lesson gives you the operational toolkit for workflow lifecycle management.

The Workflow Management API

Dapr's DaprWorkflowClient provides the control plane for your workflows. While WorkflowRuntime handles execution (registering and running workflows), DaprWorkflowClient handles management (starting, querying, and controlling workflow instances).

Think of it like this:

ComponentRoleAnalogy
WorkflowRuntimeExecutes workflowsThe factory floor
DaprWorkflowClientManages workflow instancesThe control room

The control room operators don't assemble products; they monitor production lines, start new runs, and intervene when something goes wrong.

Starting Workflows

Using the Python SDK

The schedule_new_workflow method starts a new workflow instance:

from dapr.ext.workflow import DaprWorkflowClient

client = DaprWorkflowClient()

# Start a workflow with auto-generated instance ID
instance_id = client.schedule_new_workflow(
workflow=task_processing_workflow,
input={"task_id": "task-123", "title": "Review PR #456"}
)
print(f"Started workflow: {instance_id}")

Output:

Started workflow: abc123-def456-ghi789

You can also specify a custom instance ID for easier tracking:

# Start with custom instance ID
instance_id = client.schedule_new_workflow(
workflow=task_processing_workflow,
input={"task_id": "task-123", "title": "Review PR #456"},
instance_id="task-123-workflow"
)
print(f"Started workflow: {instance_id}")

Output:

Started workflow: task-123-workflow

Custom instance IDs are valuable for:

  • Idempotency: Starting the same workflow twice with the same ID returns the existing instance
  • Correlation: Matching workflows to business entities (order-12345, approval-request-789)
  • Debugging: Finding specific workflow instances in logs and traces

Using the CLI

The Dapr CLI provides workflow management without writing code:

# Start a workflow with input data
dapr workflow run task_processing_workflow \
--app-id task-service \
--input '{"task_id": "task-123", "title": "Review PR"}'

Output:

Workflow instance started: abc123-def456-ghi789
# Start with specific instance ID
dapr workflow run task_processing_workflow \
--app-id task-service \
--instance-id task-123-workflow \
--input '{"task_id": "task-123", "title": "Review PR"}'

Output:

Workflow instance started: task-123-workflow

Querying Workflow Status

Once a workflow is running, you need visibility into its state. The get_workflow_state method returns comprehensive status information:

from dapr.ext.workflow import DaprWorkflowClient

client = DaprWorkflowClient()

# Query workflow status
state = client.get_workflow_state(
instance_id="task-123-workflow",
fetch_payloads=True # Include input/output data
)

print(f"Instance ID: {state.instance_id}")
print(f"Workflow Name: {state.workflow_name}")
print(f"Status: {state.runtime_status}")
print(f"Created: {state.created_at}")
print(f"Last Updated: {state.last_updated_at}")

Output:

Instance ID: task-123-workflow
Workflow Name: task_processing_workflow
Status: RUNNING
Created: 2025-01-15 10:30:00+00:00
Last Updated: 2025-01-15 10:30:05+00:00

Runtime Status Values

StatusMeaning
RUNNINGWorkflow is actively executing or waiting for an event/timer
COMPLETEDWorkflow finished successfully with output
FAILEDWorkflow threw an unhandled exception
TERMINATEDWorkflow was forcibly stopped via terminate_workflow
SUSPENDEDWorkflow was paused via pause_workflow
PENDINGWorkflow is scheduled but hasn't started yet

CLI Status Query

# Get workflow status
dapr workflow get task-123-workflow --app-id task-service

Output:

Instance ID: task-123-workflow
Workflow Name: task_processing_workflow
Status: RUNNING
Created: 2025-01-15T10:30:00Z
Last Updated: 2025-01-15T10:30:05Z
# List all workflows (with optional filters)
dapr workflow list --app-id task-service --filter-status RUNNING

Output:

INSTANCE ID          WORKFLOW NAME              STATUS    CREATED
task-123-workflow task_processing_workflow RUNNING 2025-01-15T10:30:00Z
task-456-workflow task_processing_workflow RUNNING 2025-01-15T10:28:00Z

Waiting for Completion

For synchronous scenarios where you need to block until a workflow finishes:

from dapr.ext.workflow import DaprWorkflowClient

client = DaprWorkflowClient()

# Start workflow
instance_id = client.schedule_new_workflow(
workflow=task_processing_workflow,
input={"task_id": "task-123"}
)

# Wait for completion (blocks until done or timeout)
final_state = client.wait_for_workflow_completion(
instance_id=instance_id,
timeout_in_seconds=60
)

print(f"Final status: {final_state.runtime_status}")
if final_state.runtime_status == "COMPLETED":
print(f"Output: {final_state.serialized_output}")

Output:

Final status: COMPLETED
Output: {"status": "completed", "assigned_to": "alice@example.com"}

Raising External Events

External events enable human-in-the-loop workflows and integration with external systems. A workflow pauses with wait_for_external_event, and an external process (API call, CLI command, another service) sends the event to resume execution.

Workflow Side: Waiting for Events

import dapr.ext.workflow as wf
from datetime import timedelta

@wfr.workflow(name="approval_workflow")
def approval_workflow(ctx: wf.DaprWorkflowContext, order: dict):
# Check if approval is required
if order["amount"] > 1000:
# Request approval from manager
yield ctx.call_activity(request_approval, input=order)

# Wait for external event OR timeout
approval_event = ctx.wait_for_external_event("approval_decision")
timeout = ctx.create_timer(timedelta(days=3))

# Race: whichever happens first wins
winner = yield wf.when_any([approval_event, timeout])

if winner == timeout:
return {"status": "rejected", "reason": "approval_timeout"}

# Get the approval decision
decision = approval_event.get_result()
if not decision.get("approved"):
return {"status": "rejected", "reason": decision.get("reason")}

# Process the order
yield ctx.call_activity(process_order, input=order)
return {"status": "completed"}

The workflow pauses at wait_for_external_event("approval_decision") until one of two things happens:

  1. Someone raises an event named approval_decision
  2. The 3-day timer expires

Raising Events via SDK

from dapr.ext.workflow import DaprWorkflowClient

client = DaprWorkflowClient()

# Raise the approval event
client.raise_workflow_event(
instance_id="order-12345-workflow",
event_name="approval_decision",
data={"approved": True, "approver": "manager@company.com"}
)
print("Approval event sent")

Output:

Approval event sent

Raising Events via CLI

# Approve the order
dapr workflow raise-event order-12345-workflow/approval_decision \
--app-id order-service \
--input '{"approved": true, "approver": "manager@company.com"}'

Output:

Event 'approval_decision' raised for workflow 'order-12345-workflow'

Event Timing: What If Events Arrive Early?

A common question: "What if I raise an event before the workflow reaches wait_for_external_event?"

Dapr buffers events. If an event arrives before the workflow is ready to receive it, the event is stored and delivered when the workflow reaches the wait point. This prevents race conditions in asynchronous systems.

Pausing and Resuming Workflows

For administrative control, you can pause a running workflow and resume it later:

from dapr.ext.workflow import DaprWorkflowClient

client = DaprWorkflowClient()

# Pause a running workflow
client.pause_workflow(instance_id="task-123-workflow")
print("Workflow paused")

# ... later ...

# Resume the paused workflow
client.resume_workflow(instance_id="task-123-workflow")
print("Workflow resumed")

Output:

Workflow paused
Workflow resumed

CLI Pause/Resume

# Pause with reason
dapr workflow suspend task-123-workflow \
--app-id task-service \
--reason "Waiting for database maintenance"

# Resume with reason
dapr workflow resume task-123-workflow \
--app-id task-service \
--reason "Maintenance complete"

Output:

Workflow 'task-123-workflow' suspended
Workflow 'task-123-workflow' resumed

Use cases for pause/resume:

  • Maintenance windows: Pause workflows before infrastructure changes
  • Manual intervention: Stop a workflow to investigate unexpected behavior
  • Batch processing: Pause workflows during peak hours, resume during off-peak

Terminating Workflows

Termination forcibly stops a workflow immediately. The workflow is marked as TERMINATED and no further activities execute.

from dapr.ext.workflow import DaprWorkflowClient

client = DaprWorkflowClient()

# Terminate a stuck or unwanted workflow
client.terminate_workflow(instance_id="task-123-workflow")
print("Workflow terminated")

Output:

Workflow terminated

CLI Termination

dapr workflow terminate task-123-workflow \
--app-id task-service \
--output '{"reason": "Cancelled by customer"}'

Output:

Workflow 'task-123-workflow' terminated

Terminate vs Pause

OperationEffectRecoverable?Use When
PauseSuspends execution, preserves stateYes (resume)Temporary hold, maintenance
TerminateStops execution permanentlyNoWorkflow is broken or no longer needed

Warning: Termination does not run compensation logic. If you need cleanup (rollback transactions, release resources), implement a cancellation event pattern instead of hard termination.

Purging Workflow History

Completed, failed, and terminated workflows leave history in the state store. Over time, this accumulates. Purging removes workflow metadata permanently.

from dapr.ext.workflow import DaprWorkflowClient

client = DaprWorkflowClient()

# Purge a specific completed workflow
client.purge_workflow(instance_id="task-123-workflow")
print("Workflow history purged")

Output:

Workflow history purged

CLI Purge Operations

# Purge a specific instance
dapr workflow purge task-123-workflow --app-id task-service

Output:

Workflow 'task-123-workflow' purged
# Purge all completed workflows older than 30 days (720 hours)
dapr workflow purge --app-id task-service --all-older-than 720h

Output:

Purged 47 workflow instances older than 720h
# Purge all terminal workflows (COMPLETED, FAILED, TERMINATED)
dapr workflow purge --app-id task-service --all

Output:

Purged 156 workflow instances

Purge Constraints

You can only purge workflows in terminal states:

  • COMPLETED
  • FAILED
  • TERMINATED

Attempting to purge a RUNNING or SUSPENDED workflow returns an error. This prevents accidental deletion of active work.

Best Practice: Scheduled Purging

In production, schedule regular purge operations to manage state store size:

# Weekly cron job: purge workflows older than 90 days (2160 hours)
0 0 * * 0 dapr workflow purge --app-id task-service --all-older-than 2160h

Complete Management Example

Here's a FastAPI application exposing workflow management endpoints:

from fastapi import FastAPI, HTTPException
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowClient, DaprWorkflowContext
from contextlib import asynccontextmanager

wfr = WorkflowRuntime()

@wfr.workflow(name="approval_workflow")
def approval_workflow(ctx: DaprWorkflowContext, order: dict):
if order["amount"] > 1000:
yield ctx.call_activity(request_approval_activity, input=order)
approval = ctx.wait_for_external_event("approval_decision")
decision = yield approval
if not decision.get("approved"):
return {"status": "rejected", "reason": decision.get("reason")}
yield ctx.call_activity(process_order_activity, input=order)
return {"status": "completed"}

@wfr.activity(name="request_approval_activity")
def request_approval_activity(ctx, order: dict):
print(f"Approval requested for order {order['order_id']}")
return {"requested": True}

@wfr.activity(name="process_order_activity")
def process_order_activity(ctx, order: dict):
print(f"Processing order {order['order_id']}")
return {"processed": True}

@asynccontextmanager
async def lifespan(app: FastAPI):
wfr.start()
yield
wfr.shutdown()

app = FastAPI(lifespan=lifespan)

@app.post("/orders/{order_id}/start")
async def start_order_workflow(order_id: str, amount: float = 500):
client = DaprWorkflowClient()
instance_id = client.schedule_new_workflow(
workflow=approval_workflow,
input={"order_id": order_id, "amount": amount},
instance_id=f"order-{order_id}-workflow"
)
return {"instance_id": instance_id, "status": "started"}

@app.get("/orders/{order_id}/status")
async def get_order_status(order_id: str):
client = DaprWorkflowClient()
state = client.get_workflow_state(f"order-{order_id}-workflow")
if not state:
raise HTTPException(404, "Workflow not found")
return {
"instance_id": state.instance_id,
"status": state.runtime_status,
"created_at": str(state.created_at)
}

@app.post("/orders/{order_id}/approve")
async def approve_order(order_id: str, approved: bool = True, reason: str = None):
client = DaprWorkflowClient()
client.raise_workflow_event(
instance_id=f"order-{order_id}-workflow",
event_name="approval_decision",
data={"approved": approved, "reason": reason}
)
return {"event": "approval_decision", "sent": True}

@app.delete("/orders/{order_id}/workflow")
async def terminate_and_purge(order_id: str):
client = DaprWorkflowClient()
instance_id = f"order-{order_id}-workflow"
state = client.get_workflow_state(instance_id)
if state and state.runtime_status == "RUNNING":
client.terminate_workflow(instance_id)
client.purge_workflow(instance_id)
return {"instance_id": instance_id, "purged": True}

Testing the endpoints:

# Start high-value order (requires approval)
curl -X POST "http://localhost:8000/orders/12345/start?amount=1500"

Output:

{"instance_id": "order-12345-workflow", "status": "started"}
# Check status (waiting for approval)
curl "http://localhost:8000/orders/12345/status"

Output:

{"instance_id": "order-12345-workflow", "status": "RUNNING", "created_at": "2025-01-15 10:30:00"}
# Approve the order
curl -X POST "http://localhost:8000/orders/12345/approve?approved=true"

Output:

{"event": "approval_decision", "sent": true}

Reflect on Your Skill

Does your dapr-deployment skill understand workflow management operations?

Test Your Skill

Using my dapr-deployment skill, explain how to implement a human approval step
in a Dapr workflow. Show both the workflow code that waits for approval and
the client code that sends the approval event.

Your skill should cover:

  • wait_for_external_event in workflow code
  • raise_workflow_event from client code
  • The event buffering behavior (events can arrive before workflow is ready)

Identify Gaps

Ask yourself:

  • Did my skill explain the difference between terminate and purge?
  • Did it mention that purge only works on terminal workflows?
  • Did it cover the CLI commands for operational management?

Improve Your Skill

If gaps exist:

My dapr-deployment skill needs workflow management coverage. Update it to include:
- The DaprWorkflowClient API for start/query/event/terminate/purge
- The wait_for_external_event + raise_workflow_event pattern
- CLI commands: dapr workflow run, get, raise-event, terminate, purge
- The constraint that purge only works on COMPLETED/FAILED/TERMINATED workflows

Try With AI

Open your AI companion and explore workflow management patterns.

Prompt 1: Design an Approval Workflow

Help me design a purchase approval workflow with these requirements:
- Orders under $500: auto-approve
- Orders $500-$5000: requires manager approval within 3 days
- Orders over $5000: requires VP approval within 5 days

Show me the workflow code with wait_for_external_event and the FastAPI
endpoints for managers/VPs to submit their approval decisions.

Include proper timeout handling: what happens if approval doesn't arrive in time?

What you're learning: How to compose external events, timers, and conditional logic into real business workflows. The AI helps you structure multi-tier approval with escalation.

Prompt 2: Build a Workflow Dashboard

I want to build a simple dashboard for monitoring my Dapr workflows. Help me create:
1. A FastAPI endpoint that lists all running workflows with their status
2. An endpoint that shows workflow history (when it started, what activities ran)
3. Bulk operations: terminate all stuck workflows, purge all completed older than X days

Show me how to use get_workflow_state effectively and any CLI commands that
would help with operational visibility.

What you're learning: Operational patterns for production workflow systems. The AI guides you through monitoring and bulk management capabilities.

Prompt 3: Event Pattern Troubleshooting

I'm having trouble with external events in my Dapr workflow. My workflow calls
wait_for_external_event("payment_confirmed") but even after I call
raise_workflow_event with the same event name, the workflow doesn't continue.

Help me debug this:
- How do I verify the event was received?
- What are common mistakes with event names?
- How do I check if the workflow is actually at the wait point?
- What logs should I look at in the Dapr sidecar?

What you're learning: Debugging techniques for workflow event handling. The AI helps you systematically isolate event delivery issues.

Safety Note

Workflow management operations are powerful. terminate_workflow stops execution without compensation logic; purge_workflow permanently deletes history. In production:

  • Implement access controls on management endpoints
  • Log all management operations for audit trails
  • Test terminate/purge behavior in staging before production use
  • Consider soft-delete patterns (mark as cancelled) before hard termination