Combining Actors with Workflows
You've built actors that maintain state for individual entities. You've built workflows that orchestrate multi-step processes. Now the real power emerges: combining both to create systems where stateful entities participate in durable orchestration.
Consider an AI agent task management system. Each task needs its own state—title, status, deadline, assignee, history. That's perfect for a TaskActor. But completing a task isn't a single operation—it requires validation, assignment, notification, and status updates across multiple steps. That's perfect for a TaskProcessingWorkflow. The question isn't "actors OR workflows?" but "actors AND workflows working together."
This hybrid approach mirrors how real organizations work. Individual employees (actors) have their own responsibilities and state, while business processes (workflows) coordinate work across multiple employees. Your Digital FTE needs both: stateful agents that remember context and durable processes that orchestrate complex work reliably.
The Decision Framework
When should you use actors, workflows, or both? Apply this analysis:
Use Actors When
| Characteristic | Example |
|---|---|
| Entity has identity | User-123, Task-456, Device-789 |
| State belongs to the entity | Conversation history, task status, device settings |
| Turn-based access needed | Only one operation on this entity at a time |
| Timers/reminders on entity | Deadline notifications, session timeouts |
Actor examples: ChatActor (per user), TaskActor (per task), DeviceActor (per IoT device)
Use Workflows When
| Characteristic | Example |
|---|---|
| Multi-step process | Validate -> Assign -> Notify -> Track |
| Durability required | Must survive crashes, complete after restart |
| Parallel execution | Process 10 items concurrently, aggregate results |
| Compensation needed | Undo step 2 if step 3 fails (saga pattern) |
| Human approval | Wait hours/days for external event |
Workflow examples: OrderProcessingWorkflow, ApprovalWorkflow, BatchProcessingWorkflow
Use Both When
The most powerful systems combine actors and workflows:
| Actor Responsibility | Workflow Responsibility |
|---|---|
| Own entity state | Orchestrate multi-entity processes |
| Enforce turn-based access | Manage long-running operations |
| Handle entity-specific reminders | Coordinate parallel work |
| Maintain consistency | Implement compensation |
Hybrid example: TaskActor owns task state while TaskProcessingWorkflow orchestrates the task through its lifecycle.
Decision Tree
Does the concept have a unique identity that persists?
├── YES → Consider Actor
│ └── Does it also need multi-step orchestration?
│ ├── YES → Actor + Workflow (hybrid)
│ └── NO → Actor alone
└── NO → Is it a multi-step process?
├── YES → Workflow alone
└── NO → Neither (simple service logic)
Pattern 1: Workflow Calling Actors
The most common hybrid pattern: workflows invoke actor methods through activities to update entity state at each orchestration step.
Why Use Activities for Actor Calls
Workflow code must be deterministic for replay. Actor calls involve network I/O—you can't call them directly in workflow code. Instead, wrap actor invocations in activities:
from dapr.ext.workflow import WorkflowRuntime
from dapr.actor import ActorProxy, ActorId
from dataclasses import dataclass
wfr = WorkflowRuntime()
@dataclass
class TaskStatusUpdate:
task_id: str
status: str
message: str
# Activity that calls an actor
@wfr.activity
def update_task_status(ctx, update: TaskStatusUpdate) -> dict:
"""
Activity wrapping actor invocation.
Activities CAN be non-deterministic (IO, external calls).
"""
from task_actor import TaskActorInterface
proxy = ActorProxy.create(
"TaskActor",
ActorId(update.task_id),
TaskActorInterface
)
# Call actor method
result = proxy.UpdateStatus(update.status)
return {
"task_id": update.task_id,
"new_status": update.status,
"actor_response": result
}
Output:
Activity update_task_status called for task-123
Actor TaskActor/task-123 updated to: in_progress
Complete Workflow with Actor Updates
Here's a TaskProcessingWorkflow that updates a TaskActor at each step:
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext
import dapr.ext.workflow as wf
from dataclasses import dataclass
from datetime import timedelta
wfr = WorkflowRuntime()
@dataclass
class TaskInput:
task_id: str
title: str
assignee: str
@dataclass
class TaskResult:
task_id: str
final_status: str
steps_completed: list
# Activity: Validate task
@wfr.activity
def validate_task(ctx, task: TaskInput) -> dict:
"""Validate task can be processed."""
# Validation logic here
if not task.title or not task.assignee:
return {"valid": False, "reason": "Missing required fields"}
return {"valid": True}
# Activity: Update actor status
@wfr.activity
def update_actor_status(ctx, data: dict) -> dict:
"""Update TaskActor with new status."""
from task_actor import TaskActorInterface
from dapr.actor import ActorProxy, ActorId
proxy = ActorProxy.create(
"TaskActor",
ActorId(data["task_id"]),
TaskActorInterface
)
proxy.UpdateStatus(data["status"])
return {"updated": True, "task_id": data["task_id"]}
# Activity: Send notification
@wfr.activity
def send_notification(ctx, data: dict) -> dict:
"""Notify assignee about task."""
print(f"Notifying {data['assignee']} about task {data['task_id']}")
# In production: call notification service
return {"notified": True}
# The workflow orchestrating actor updates
@wfr.workflow
def task_processing_workflow(ctx: DaprWorkflowContext, task: TaskInput):
"""
Orchestrate task processing with actor state updates.
Each step updates the TaskActor's state.
"""
steps_completed = []
# Step 1: Validate
validation = yield ctx.call_activity(validate_task, input=task)
if not validation["valid"]:
yield ctx.call_activity(
update_actor_status,
input={"task_id": task.task_id, "status": "rejected"}
)
return TaskResult(task.task_id, "rejected", ["validation_failed"])
steps_completed.append("validated")
# Step 2: Mark as in_progress in actor
yield ctx.call_activity(
update_actor_status,
input={"task_id": task.task_id, "status": "in_progress"}
)
steps_completed.append("started")
# Step 3: Notify assignee
yield ctx.call_activity(
send_notification,
input={
"task_id": task.task_id,
"assignee": task.assignee,
"title": task.title
}
)
steps_completed.append("notified")
# Step 4: Mark as assigned in actor
yield ctx.call_activity(
update_actor_status,
input={"task_id": task.task_id, "status": "assigned"}
)
steps_completed.append("assigned")
return TaskResult(task.task_id, "assigned", steps_completed)
Output (workflow execution):
[Workflow task-processing-task-123] Starting...
[Activity validate_task] Task valid
[Activity update_actor_status] TaskActor/task-123 -> in_progress
[Activity send_notification] Notifying alice@example.com about task-123
[Activity update_actor_status] TaskActor/task-123 -> assigned
[Workflow task-processing-task-123] Completed: assigned
Steps: ['validated', 'started', 'notified', 'assigned']
Runtime Setup
Register both workflow and activities in your FastAPI application:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowClient
wfr = WorkflowRuntime()
# Register workflow and activities
wfr.register_workflow(task_processing_workflow)
wfr.register_activity(validate_task)
wfr.register_activity(update_actor_status)
wfr.register_activity(send_notification)
@asynccontextmanager
async def lifespan(app: FastAPI):
wfr.start()
yield
wfr.shutdown()
app = FastAPI(lifespan=lifespan)
@app.post("/tasks/{task_id}/process")
async def process_task(task_id: str, title: str, assignee: str):
"""Start workflow to process a task."""
client = DaprWorkflowClient()
instance_id = client.schedule_new_workflow(
workflow=task_processing_workflow,
input=TaskInput(task_id=task_id, title=title, assignee=assignee),
instance_id=f"task-processing-{task_id}"
)
return {"workflow_instance": instance_id, "task_id": task_id}
Output (API call):
POST /tasks/task-123/process?title=Review%20PR&assignee=alice@example.com
{
"workflow_instance": "task-processing-task-123",
"task_id": "task-123"
}
Pattern 2: Actor Triggering Workflow
Sometimes state changes in an actor should kick off a workflow. For example, when a TaskActor's status changes to "ready_for_processing", it should start the processing workflow.
Actor with Workflow Triggering
from dapr.actor import Actor, ActorInterface, actormethod
from dapr.ext.workflow import DaprWorkflowClient
from datetime import datetime
class TaskActorInterface(ActorInterface):
@actormethod(name="SubmitForProcessing")
async def submit_for_processing(self, assignee: str) -> dict: ...
@actormethod(name="GetTask")
async def get_task(self) -> dict: ...
class TaskActor(Actor, TaskActorInterface):
def __init__(self, ctx, actor_id):
super().__init__(ctx, actor_id)
async def _on_activate(self) -> None:
found, _ = await self._state_manager.try_get_state("task_data")
if not found:
await self._state_manager.set_state("task_data", {
"status": "pending",
"created_at": datetime.utcnow().isoformat(),
"workflow_instance": None
})
await self._state_manager.save_state()
async def submit_for_processing(self, assignee: str) -> dict:
"""
Submit task for processing - triggers a workflow.
Actor coordinates with workflow for multi-step processing.
"""
task_data = await self._state_manager.get_state("task_data")
# Don't re-submit if already processing
if task_data.get("workflow_instance"):
return {
"success": False,
"reason": "Already submitted",
"workflow_instance": task_data["workflow_instance"]
}
# Start workflow
client = DaprWorkflowClient()
workflow_input = {
"task_id": self.id.id,
"title": task_data.get("title", "Untitled"),
"assignee": assignee
}
instance_id = client.schedule_new_workflow(
workflow="task_processing_workflow",
input=workflow_input,
instance_id=f"process-{self.id.id}"
)
# Update actor state with workflow reference
task_data["status"] = "processing"
task_data["workflow_instance"] = instance_id
task_data["assignee"] = assignee
task_data["submitted_at"] = datetime.utcnow().isoformat()
await self._state_manager.set_state("task_data", task_data)
await self._state_manager.save_state()
return {
"success": True,
"workflow_instance": instance_id,
"task_id": self.id.id
}
async def get_task(self) -> dict:
task_data = await self._state_manager.get_state("task_data")
return {"id": self.id.id, **task_data}
Output (actor triggering workflow):
# Call actor method
proxy = ActorProxy.create("TaskActor", ActorId("task-456"), TaskActorInterface)
result = await proxy.SubmitForProcessing("bob@example.com")
print(result)
{
"success": true,
"workflow_instance": "process-task-456",
"task_id": "task-456"
}
Workflow Tracking in Actor State
The actor stores the workflow instance ID, creating traceability between entity state and orchestration:
async def get_processing_status(self) -> dict:
"""Check both actor state and workflow status."""
task_data = await self._state_manager.get_state("task_data")
result = {
"task_id": self.id.id,
"actor_status": task_data["status"],
"workflow_instance": task_data.get("workflow_instance")
}
# Query workflow status if active
if task_data.get("workflow_instance"):
client = DaprWorkflowClient()
workflow_state = client.get_workflow_state(
task_data["workflow_instance"],
fetch_payloads=True
)
result["workflow_status"] = workflow_state.runtime_status.name
return result
Output:
{
"task_id": "task-456",
"actor_status": "processing",
"workflow_instance": "process-task-456",
"workflow_status": "RUNNING"
}
Designing State Boundaries
The key to hybrid systems is clear ownership of state and responsibility.
What Actors Own
| State Type | Actor Responsibility |
|---|---|
| Entity attributes | Title, description, assignee |
| Current status | pending, in_progress, completed |
| Entity-specific history | Status changes, comments |
| Scheduled reminders | Deadline notifications |
| Workflow references | Which workflow is processing this |
What Workflows Own
| State Type | Workflow Responsibility |
|---|---|
| Process progress | Which step completed, which pending |
| Retry state | How many attempts, backoff timing |
| Compensation tracking | What to undo if later steps fail |
| External event waiting | Approval received, timeout handling |
| Aggregate results | Fan-out/fan-in collection |
Architecture Diagram
┌─────────────────────────────────────────────────────────────────────┐
│ Task Management System │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ACTORS (Entity State) WORKFLOWS (Orchestration) │
│ ───────────────────── ──────────────────────── │
│ │
│ ┌─────────────────┐ ┌─────────────────────────┐ │
│ │ TaskActor │ │ TaskProcessingWorkflow │ │
│ │ (task-123) │◄──────────────│ │ │
│ ├─────────────────┤ Calls via │ 1. Validate │ │
│ │ - status │ Activity │ 2. Update Actor ───────┤ │
│ │ - title │ │ 3. Notify │ │
│ │ - assignee │ │ 4. Update Actor ───────┤ │
│ │ - workflow_id │ │ 5. Complete │ │
│ │ - history[] │ └─────────────────────────┘ │
│ └────────┬────────┘ ▲ │
│ │ │ │
│ │ Triggers │ │
│ └──────────────────────────────────┘ │
│ │
│ ┌─────────────────┐ ┌─────────────────────────┐ │
│ │ TaskActor │ │ BatchWorkflow │ │
│ │ (task-456) │◄──────────────│ │ │
│ └─────────────────┘ │ Fan-out to 10 tasks │ │
│ │ Aggregate results │ │
│ ┌─────────────────┐ └─────────────────────────┘ │
│ │ TaskActor │ ▲ │
│ │ (task-789) │◄────────────────────────┘ │
│ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Reflect on Your Skill
Your dapr-deployment skill now covers actors and workflows separately. Extend it with hybrid patterns.
Test Your Skill
Using my dapr-deployment skill, design a system for processing customer orders:
- Each order needs to track its own status and items
- Orders go through: validate -> charge payment -> ship -> notify
Should I use actors, workflows, or both? Explain the responsibilities of each component.
Identify Gaps
Ask yourself:
- Does my skill explain when to use actors vs workflows?
- Does it show how workflows call actors through activities?
- Does it demonstrate actors triggering workflows?
- Can it design state boundaries between components?
Improve Your Skill
If you found gaps:
Update my dapr-deployment skill to include hybrid actor-workflow patterns:
- Decision framework: actors for entities, workflows for processes
- Activity pattern for workflow-to-actor communication
- Actor method for triggering workflows
- State boundary guidelines (what actors own vs workflows own)
Try With AI
Design a Hybrid System
I'm building a task management system for AI agents. Help me design the architecture:
Requirements:
- Each task has status, assignee, deadline, and history
- Tasks go through: create -> validate -> assign -> process -> complete
- If assignment fails, task should return to unassigned state
- Deadline reminders should fire 24 hours before due
What components do I need? Should I use actors, workflows, or both?
Show me the responsibilities of each component.
What you're learning: The decision framework helps you decompose requirements into entity state (actor responsibility) and process orchestration (workflow responsibility). Neither actors nor workflows alone can satisfy all these requirements efficiently.
Implement Workflow-to-Actor Communication
Show me how to implement a workflow activity that updates a TaskActor.
The workflow should:
1. Validate the task
2. Update TaskActor status to "validated"
3. Assign to a user
4. Update TaskActor status to "assigned"
Include the activity functions and workflow code.
What you're learning: Actor calls from workflows must go through activities because workflow code must be deterministic. Activities can perform non-deterministic operations (network calls, database access, actor invocation) and their results are recorded for replay.
Implement Actor-to-Workflow Triggering
Implement a TaskActor with a submit_for_processing method that:
1. Checks if task isn't already being processed
2. Starts a TaskProcessingWorkflow
3. Stores the workflow instance ID in actor state
4. Returns the workflow ID to the caller
Also show me how to query both actor state and workflow status together.
What you're learning: Actors can initiate workflows, creating a clean separation where actors own entity state and workflows own process state. Storing the workflow instance ID in the actor creates traceability between the two systems.
Safety note: When actors trigger workflows, ensure idempotency. The actor should check if a workflow is already running before starting a new one. Multiple workflow instances processing the same entity can cause state conflicts. Always store and check workflow instance IDs in actor state before triggering new workflows.