Skip to main content
Updated Feb 23, 2026

Combining Actors with Workflows

You've built actors that maintain state for individual entities. You've built workflows that orchestrate multi-step processes. Now the real power emerges: combining both to create systems where stateful entities participate in durable orchestration.

Consider an AI agent task management system. Each task needs its own state—title, status, deadline, assignee, history. That's perfect for a TaskActor. But completing a task isn't a single operation—it requires validation, assignment, notification, and status updates across multiple steps. That's perfect for a TaskProcessingWorkflow. The question isn't "actors OR workflows?" but "actors AND workflows working together."

This hybrid approach mirrors how real organizations work. Individual employees (actors) have their own responsibilities and state, while business processes (workflows) coordinate work across multiple employees. Your Digital FTE needs both: stateful agents that remember context and durable processes that orchestrate complex work reliably.


The Decision Framework

When should you use actors, workflows, or both? Apply this analysis:

Use Actors When

CharacteristicExample
Entity has identityUser-123, Task-456, Device-789
State belongs to the entityConversation history, task status, device settings
Turn-based access neededOnly one operation on this entity at a time
Timers/reminders on entityDeadline notifications, session timeouts

Actor examples: ChatActor (per user), TaskActor (per task), DeviceActor (per IoT device)

Use Workflows When

CharacteristicExample
Multi-step processValidate -> Assign -> Notify -> Track
Durability requiredMust survive crashes, complete after restart
Parallel executionProcess 10 items concurrently, aggregate results
Compensation neededUndo step 2 if step 3 fails (saga pattern)
Human approvalWait hours/days for external event

Workflow examples: OrderProcessingWorkflow, ApprovalWorkflow, BatchProcessingWorkflow

Use Both When

The most powerful systems combine actors and workflows:

Actor ResponsibilityWorkflow Responsibility
Own entity stateOrchestrate multi-entity processes
Enforce turn-based accessManage long-running operations
Handle entity-specific remindersCoordinate parallel work
Maintain consistencyImplement compensation

Hybrid example: TaskActor owns task state while TaskProcessingWorkflow orchestrates the task through its lifecycle.

Decision Tree

Does the concept have a unique identity that persists?
├── YES → Consider Actor
│ └── Does it also need multi-step orchestration?
│ ├── YES → Actor + Workflow (hybrid)
│ └── NO → Actor alone
└── NO → Is it a multi-step process?
├── YES → Workflow alone
└── NO → Neither (simple service logic)

Pattern 1: Workflow Calling Actors

The most common hybrid pattern: workflows invoke actor methods through activities to update entity state at each orchestration step.

Why Use Activities for Actor Calls

Workflow code must be deterministic for replay. Actor calls involve network I/O—you can't call them directly in workflow code. Instead, wrap actor invocations in activities:

from dapr.ext.workflow import WorkflowRuntime
from dapr.actor import ActorProxy, ActorId
from dataclasses import dataclass

wfr = WorkflowRuntime()

@dataclass
class TaskStatusUpdate:
task_id: str
status: str
message: str

# Activity that calls an actor
@wfr.activity
def update_task_status(ctx, update: TaskStatusUpdate) -> dict:
"""
Activity wrapping actor invocation.
Activities CAN be non-deterministic (IO, external calls).
"""
from task_actor import TaskActorInterface

proxy = ActorProxy.create(
"TaskActor",
ActorId(update.task_id),
TaskActorInterface
)

# Call actor method
result = proxy.UpdateStatus(update.status)

return {
"task_id": update.task_id,
"new_status": update.status,
"actor_response": result
}

Output:

Activity update_task_status called for task-123
Actor TaskActor/task-123 updated to: in_progress

Complete Workflow with Actor Updates

Here's a TaskProcessingWorkflow that updates a TaskActor at each step:

from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext
import dapr.ext.workflow as wf
from dataclasses import dataclass
from datetime import timedelta

wfr = WorkflowRuntime()

@dataclass
class TaskInput:
task_id: str
title: str
assignee: str

@dataclass
class TaskResult:
task_id: str
final_status: str
steps_completed: list

# Activity: Validate task
@wfr.activity
def validate_task(ctx, task: TaskInput) -> dict:
"""Validate task can be processed."""
# Validation logic here
if not task.title or not task.assignee:
return {"valid": False, "reason": "Missing required fields"}
return {"valid": True}

# Activity: Update actor status
@wfr.activity
def update_actor_status(ctx, data: dict) -> dict:
"""Update TaskActor with new status."""
from task_actor import TaskActorInterface
from dapr.actor import ActorProxy, ActorId

proxy = ActorProxy.create(
"TaskActor",
ActorId(data["task_id"]),
TaskActorInterface
)

proxy.UpdateStatus(data["status"])
return {"updated": True, "task_id": data["task_id"]}

# Activity: Send notification
@wfr.activity
def send_notification(ctx, data: dict) -> dict:
"""Notify assignee about task."""
print(f"Notifying {data['assignee']} about task {data['task_id']}")
# In production: call notification service
return {"notified": True}

# The workflow orchestrating actor updates
@wfr.workflow
def task_processing_workflow(ctx: DaprWorkflowContext, task: TaskInput):
"""
Orchestrate task processing with actor state updates.
Each step updates the TaskActor's state.
"""
steps_completed = []

# Step 1: Validate
validation = yield ctx.call_activity(validate_task, input=task)
if not validation["valid"]:
yield ctx.call_activity(
update_actor_status,
input={"task_id": task.task_id, "status": "rejected"}
)
return TaskResult(task.task_id, "rejected", ["validation_failed"])
steps_completed.append("validated")

# Step 2: Mark as in_progress in actor
yield ctx.call_activity(
update_actor_status,
input={"task_id": task.task_id, "status": "in_progress"}
)
steps_completed.append("started")

# Step 3: Notify assignee
yield ctx.call_activity(
send_notification,
input={
"task_id": task.task_id,
"assignee": task.assignee,
"title": task.title
}
)
steps_completed.append("notified")

# Step 4: Mark as assigned in actor
yield ctx.call_activity(
update_actor_status,
input={"task_id": task.task_id, "status": "assigned"}
)
steps_completed.append("assigned")

return TaskResult(task.task_id, "assigned", steps_completed)

Output (workflow execution):

[Workflow task-processing-task-123] Starting...
[Activity validate_task] Task valid
[Activity update_actor_status] TaskActor/task-123 -> in_progress
[Activity send_notification] Notifying alice@example.com about task-123
[Activity update_actor_status] TaskActor/task-123 -> assigned
[Workflow task-processing-task-123] Completed: assigned
Steps: ['validated', 'started', 'notified', 'assigned']

Runtime Setup

Register both workflow and activities in your FastAPI application:

from contextlib import asynccontextmanager
from fastapi import FastAPI
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowClient

wfr = WorkflowRuntime()

# Register workflow and activities
wfr.register_workflow(task_processing_workflow)
wfr.register_activity(validate_task)
wfr.register_activity(update_actor_status)
wfr.register_activity(send_notification)

@asynccontextmanager
async def lifespan(app: FastAPI):
wfr.start()
yield
wfr.shutdown()

app = FastAPI(lifespan=lifespan)

@app.post("/tasks/{task_id}/process")
async def process_task(task_id: str, title: str, assignee: str):
"""Start workflow to process a task."""
client = DaprWorkflowClient()

instance_id = client.schedule_new_workflow(
workflow=task_processing_workflow,
input=TaskInput(task_id=task_id, title=title, assignee=assignee),
instance_id=f"task-processing-{task_id}"
)

return {"workflow_instance": instance_id, "task_id": task_id}

Output (API call):

POST /tasks/task-123/process?title=Review%20PR&assignee=alice@example.com

{
"workflow_instance": "task-processing-task-123",
"task_id": "task-123"
}

Pattern 2: Actor Triggering Workflow

Sometimes state changes in an actor should kick off a workflow. For example, when a TaskActor's status changes to "ready_for_processing", it should start the processing workflow.

Actor with Workflow Triggering

from dapr.actor import Actor, ActorInterface, actormethod
from dapr.ext.workflow import DaprWorkflowClient
from datetime import datetime

class TaskActorInterface(ActorInterface):
@actormethod(name="SubmitForProcessing")
async def submit_for_processing(self, assignee: str) -> dict: ...

@actormethod(name="GetTask")
async def get_task(self) -> dict: ...

class TaskActor(Actor, TaskActorInterface):
def __init__(self, ctx, actor_id):
super().__init__(ctx, actor_id)

async def _on_activate(self) -> None:
found, _ = await self._state_manager.try_get_state("task_data")
if not found:
await self._state_manager.set_state("task_data", {
"status": "pending",
"created_at": datetime.utcnow().isoformat(),
"workflow_instance": None
})
await self._state_manager.save_state()

async def submit_for_processing(self, assignee: str) -> dict:
"""
Submit task for processing - triggers a workflow.
Actor coordinates with workflow for multi-step processing.
"""
task_data = await self._state_manager.get_state("task_data")

# Don't re-submit if already processing
if task_data.get("workflow_instance"):
return {
"success": False,
"reason": "Already submitted",
"workflow_instance": task_data["workflow_instance"]
}

# Start workflow
client = DaprWorkflowClient()

workflow_input = {
"task_id": self.id.id,
"title": task_data.get("title", "Untitled"),
"assignee": assignee
}

instance_id = client.schedule_new_workflow(
workflow="task_processing_workflow",
input=workflow_input,
instance_id=f"process-{self.id.id}"
)

# Update actor state with workflow reference
task_data["status"] = "processing"
task_data["workflow_instance"] = instance_id
task_data["assignee"] = assignee
task_data["submitted_at"] = datetime.utcnow().isoformat()

await self._state_manager.set_state("task_data", task_data)
await self._state_manager.save_state()

return {
"success": True,
"workflow_instance": instance_id,
"task_id": self.id.id
}

async def get_task(self) -> dict:
task_data = await self._state_manager.get_state("task_data")
return {"id": self.id.id, **task_data}

Output (actor triggering workflow):

# Call actor method
proxy = ActorProxy.create("TaskActor", ActorId("task-456"), TaskActorInterface)
result = await proxy.SubmitForProcessing("bob@example.com")
print(result)
{
"success": true,
"workflow_instance": "process-task-456",
"task_id": "task-456"
}

Workflow Tracking in Actor State

The actor stores the workflow instance ID, creating traceability between entity state and orchestration:

async def get_processing_status(self) -> dict:
"""Check both actor state and workflow status."""
task_data = await self._state_manager.get_state("task_data")

result = {
"task_id": self.id.id,
"actor_status": task_data["status"],
"workflow_instance": task_data.get("workflow_instance")
}

# Query workflow status if active
if task_data.get("workflow_instance"):
client = DaprWorkflowClient()
workflow_state = client.get_workflow_state(
task_data["workflow_instance"],
fetch_payloads=True
)
result["workflow_status"] = workflow_state.runtime_status.name

return result

Output:

{
"task_id": "task-456",
"actor_status": "processing",
"workflow_instance": "process-task-456",
"workflow_status": "RUNNING"
}

Designing State Boundaries

The key to hybrid systems is clear ownership of state and responsibility.

What Actors Own

State TypeActor Responsibility
Entity attributesTitle, description, assignee
Current statuspending, in_progress, completed
Entity-specific historyStatus changes, comments
Scheduled remindersDeadline notifications
Workflow referencesWhich workflow is processing this

What Workflows Own

State TypeWorkflow Responsibility
Process progressWhich step completed, which pending
Retry stateHow many attempts, backoff timing
Compensation trackingWhat to undo if later steps fail
External event waitingApproval received, timeout handling
Aggregate resultsFan-out/fan-in collection

Architecture Diagram

┌─────────────────────────────────────────────────────────────────────┐
│ Task Management System │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ACTORS (Entity State) WORKFLOWS (Orchestration) │
│ ───────────────────── ──────────────────────── │
│ │
│ ┌─────────────────┐ ┌─────────────────────────┐ │
│ │ TaskActor │ │ TaskProcessingWorkflow │ │
│ │ (task-123) │◄──────────────│ │ │
│ ├─────────────────┤ Calls via │ 1. Validate │ │
│ │ - status │ Activity │ 2. Update Actor ───────┤ │
│ │ - title │ │ 3. Notify │ │
│ │ - assignee │ │ 4. Update Actor ───────┤ │
│ │ - workflow_id │ │ 5. Complete │ │
│ │ - history[] │ └─────────────────────────┘ │
│ └────────┬────────┘ ▲ │
│ │ │ │
│ │ Triggers │ │
│ └──────────────────────────────────┘ │
│ │
│ ┌─────────────────┐ ┌─────────────────────────┐ │
│ │ TaskActor │ │ BatchWorkflow │ │
│ │ (task-456) │◄──────────────│ │ │
│ └─────────────────┘ │ Fan-out to 10 tasks │ │
│ │ Aggregate results │ │
│ ┌─────────────────┐ └─────────────────────────┘ │
│ │ TaskActor │ ▲ │
│ │ (task-789) │◄────────────────────────┘ │
│ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘

Reflect on Your Skill

Your dapr-deployment skill now covers actors and workflows separately. Extend it with hybrid patterns.

Test Your Skill

Using my dapr-deployment skill, design a system for processing customer orders:
- Each order needs to track its own status and items
- Orders go through: validate -> charge payment -> ship -> notify

Should I use actors, workflows, or both? Explain the responsibilities of each component.

Identify Gaps

Ask yourself:

  • Does my skill explain when to use actors vs workflows?
  • Does it show how workflows call actors through activities?
  • Does it demonstrate actors triggering workflows?
  • Can it design state boundaries between components?

Improve Your Skill

If you found gaps:

Update my dapr-deployment skill to include hybrid actor-workflow patterns:
- Decision framework: actors for entities, workflows for processes
- Activity pattern for workflow-to-actor communication
- Actor method for triggering workflows
- State boundary guidelines (what actors own vs workflows own)

Try With AI

Design a Hybrid System

I'm building a task management system for AI agents. Help me design the architecture:

Requirements:
- Each task has status, assignee, deadline, and history
- Tasks go through: create -> validate -> assign -> process -> complete
- If assignment fails, task should return to unassigned state
- Deadline reminders should fire 24 hours before due

What components do I need? Should I use actors, workflows, or both?
Show me the responsibilities of each component.

What you're learning: The decision framework helps you decompose requirements into entity state (actor responsibility) and process orchestration (workflow responsibility). Neither actors nor workflows alone can satisfy all these requirements efficiently.


Implement Workflow-to-Actor Communication

Show me how to implement a workflow activity that updates a TaskActor.
The workflow should:
1. Validate the task
2. Update TaskActor status to "validated"
3. Assign to a user
4. Update TaskActor status to "assigned"

Include the activity functions and workflow code.

What you're learning: Actor calls from workflows must go through activities because workflow code must be deterministic. Activities can perform non-deterministic operations (network calls, database access, actor invocation) and their results are recorded for replay.


Implement Actor-to-Workflow Triggering

Implement a TaskActor with a submit_for_processing method that:
1. Checks if task isn't already being processed
2. Starts a TaskProcessingWorkflow
3. Stores the workflow instance ID in actor state
4. Returns the workflow ID to the caller

Also show me how to query both actor state and workflow status together.

What you're learning: Actors can initiate workflows, creating a clean separation where actors own entity state and workflows own process state. Storing the workflow instance ID in the actor creates traceability between the two systems.

Safety note: When actors trigger workflows, ensure idempotency. The actor should check if a workflow is already running before starting a new one. Multiple workflow instances processing the same entity can cause state conflicts. Always store and check workflow instance IDs in actor state before triggering new workflows.