Skip to main content
Updated Feb 23, 2026

Event-Driven Actors

Your TaskActor stores state. It processes method calls. But it sits idle, waiting for someone to invoke it. In real systems, actors don't just wait---they react. A new task arrives in a queue, and the actor springs to life. A cron schedule fires, and the actor performs cleanup. A webhook triggers, and the actor processes external data.

This is the difference between a responsive actor and a reactive one. Responsive actors wait for requests. Reactive actors respond to events from the world around them.

In Lesson 5, you learned about timers and reminders---internal scheduling within an actor. Now you'll connect actors to the broader event ecosystem: Dapr pub/sub for internal service events, and Dapr bindings for external triggers like cron schedules and webhooks.


The Actor Event Architecture

Actors and events work together in a specific pattern. The actor handles state and logic. Events trigger and communicate between actors and services.

                                Event Sources

┌─────────────────────────┼─────────────────────────┐
│ │ │
┌──────▼──────┐ ┌───────▼───────┐ ┌──────▼──────┐
│ Pub/Sub │ │ Bindings │ │ Service │
│ Topics │ │ (cron, http) │ │ Invocation │
└──────┬──────┘ └───────┬───────┘ └──────┬──────┘
│ │ │
└─────────────────────────┼─────────────────────────┘

┌──────▼──────┐
│ FastAPI │
│ Handlers │
└──────┬──────┘

┌──────▼──────┐
│ ActorProxy │
│ Routing │
└──────┬──────┘

┌──────────────────────┼──────────────────────┐
│ │ │
┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ TaskActor │ │ TaskActor │ │ TaskActor │
│ task-001 │ │ task-002 │ │ task-003 │
└─────────────┘ └─────────────┘ └─────────────┘

The critical insight: Dapr delivers events to your FastAPI application, not directly to actors. You create handlers that receive events and route them to the appropriate actor instance using ActorProxy.


Actors Responding to Pub/Sub Events

When a pub/sub message arrives, Dapr calls a subscription endpoint on your FastAPI application. Your handler extracts the actor ID from the message and invokes the correct actor.

The Subscription Handler Pattern

Subscription YAML:

# components/subscription.yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: task-events-subscription
spec:
topic: task-events
pubsubname: pubsub
routes:
default: /TaskActor/ProcessEvent
scopes:
- task-actor-service

FastAPI Subscription Handler:

from fastapi import FastAPI
from dapr.ext.fastapi import DaprApp
from dapr.actor import ActorProxy, ActorId

app = FastAPI()
dapr_app = DaprApp(app)


@dapr_app.subscribe(pubsub='pubsub', topic='task-events', route='/TaskActor/ProcessEvent')
async def handle_task_event(event_data: dict):
"""Receive task events and route to the appropriate TaskActor."""
print(f"Received task event: {event_data}")

# Extract actor ID from event payload
data = event_data.get('data', {})
task_id = data.get('task_id')

if not task_id:
print("No task_id in event, skipping")
return {"status": "SKIPPED"}

# Route to the correct actor instance
proxy = ActorProxy.create(
'TaskActor',
ActorId(task_id),
TaskActorInterface
)

# Invoke actor method
await proxy.ProcessEvent(data)

return {"status": "SUCCESS"}

Output:

>>> # Event published to task-events topic
>>> # Dapr calls your subscription handler
Received task event: {'data': {'task_id': 'task-123', 'event_type': 'task.assigned', 'assignee': 'alice'}}

>>> # Handler routes to TaskActor instance
>>> # TaskActor task-123 processes the event
TaskActor task-123: Processing event task.assigned

Why This Pattern?

You might wonder: "Why can't Dapr deliver messages directly to actors?"

The answer lies in actor semantics:

  1. Actors are addressed by ID. Pub/sub topics don't know about actor IDs.
  2. Actors have typed methods. Events need routing logic to determine which method to call.
  3. Events might create actors. Your handler can activate actors on-demand when events arrive.

The subscription handler is the bridge---it maps event semantics (topic, payload) to actor semantics (ID, method).


Actors Publishing Events

Actors can also publish events outbound. This enables actors to communicate without knowing about each other.

Publishing from an Actor Method

from dapr.actor import Actor, ActorInterface, actormethod
from dapr.clients import DaprClient
import json


class TaskActorInterface(ActorInterface):
@actormethod(name="CompleteTask")
async def complete_task(self) -> dict: ...


class TaskActor(Actor, TaskActorInterface):
def __init__(self, ctx, actor_id):
super().__init__(ctx, actor_id)

async def complete_task(self) -> dict:
"""Mark task complete and publish event."""
actor_id = self.id.id

# Update internal state
state = await self._state_manager.get_state("task_data")
state["status"] = "completed"
await self._state_manager.set_state("task_data", state)

# Publish event for other services/actors
with DaprClient() as client:
client.publish_event(
pubsub_name='pubsub',
topic_name='task-events',
data=json.dumps({
'task_id': actor_id,
'event_type': 'task.completed',
'completed_at': state.get('completed_at')
}),
data_content_type='application/json'
)

print(f"TaskActor {actor_id}: Published task.completed event")
return {"status": "completed", "task_id": actor_id}

Output:

>>> # Client invokes actor method
await proxy.CompleteTask()

>>> # Actor updates state and publishes event
TaskActor task-123: Published task.completed event

>>> # Event flows to subscribers (other actors, services, etc.)
>>> # NotificationActor, AnalyticsService, etc. receive the event

Event Publishing Best Practices

When publishing events from actors, follow these patterns:

async def _publish_event(self, event_type: str, payload: dict):
"""Reusable event publishing helper."""
event_data = {
"actor_id": self.id.id,
"actor_type": "TaskActor",
"event_type": event_type,
"timestamp": datetime.utcnow().isoformat(),
**payload
}

with DaprClient() as client:
client.publish_event(
pubsub_name='pubsub',
topic_name='task-events',
data=json.dumps(event_data),
data_content_type='application/json'
)

Include enough context in events for subscribers to route correctly:

FieldPurpose
actor_idIdentifies which actor instance published
actor_typeAllows routing to same actor type
event_typeDetermines handler logic
timestampEnables ordering and deduplication

Input Bindings Triggering Actors

Input bindings connect actors to external event sources. A cron schedule fires, and an actor wakes up. A webhook arrives, and an actor processes it.

Cron Binding to Actor

Binding Component:

# components/scheduler-cron.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: scheduler-cron
namespace: default
spec:
type: bindings.cron
version: v1
metadata:
- name: schedule
value: "*/5 * * * *" # Every 5 minutes
- name: direction
value: "input"

FastAPI Handler:

@app.post("/scheduler-cron")
async def handle_scheduler_cron():
"""Triggered by cron, invokes SchedulerActor."""
print("Cron triggered, invoking SchedulerActor")

# Use a well-known actor ID for singleton scheduler
proxy = ActorProxy.create(
'SchedulerActor',
ActorId('global-scheduler'),
SchedulerActorInterface
)

# Trigger scheduled work
result = await proxy.ProcessScheduledTasks()

return {"status": "OK", "result": result}

SchedulerActor Implementation:

class SchedulerActorInterface(ActorInterface):
@actormethod(name="ProcessScheduledTasks")
async def process_scheduled_tasks(self) -> dict: ...


class SchedulerActor(Actor, SchedulerActorInterface):
async def process_scheduled_tasks(self) -> dict:
"""Find and process tasks with passed deadlines."""
print(f"SchedulerActor: Checking for scheduled tasks")

# Get list of tasks to process
task_ids = await self._get_due_tasks()

# Delegate to individual TaskActors
for task_id in task_ids:
proxy = ActorProxy.create(
'TaskActor',
ActorId(task_id),
TaskActorInterface
)
await proxy.CheckDeadline()

return {"processed": len(task_ids)}

Output:

>>> # Every 5 minutes, Dapr calls /scheduler-cron
Cron triggered, invoking SchedulerActor

>>> # SchedulerActor checks for due tasks
SchedulerActor: Checking for scheduled tasks

>>> # Delegates to individual TaskActors
TaskActor task-001: Checking deadline
TaskActor task-007: Checking deadline

Webhook Binding to Actor

External systems can trigger actors through HTTP bindings:

Binding Component:

# components/webhook-binding.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: external-webhook
namespace: default
spec:
type: bindings.http
version: v1
metadata:
- name: direction
value: "input"

FastAPI Handler:

from fastapi import Request

@app.post("/external-webhook")
async def handle_external_webhook(request: Request):
"""Receive webhook and route to appropriate actor."""
body = await request.json()
print(f"Webhook received: {body}")

# Extract routing information from webhook payload
resource_type = body.get('resource_type')
resource_id = body.get('resource_id')

if resource_type == 'task':
proxy = ActorProxy.create(
'TaskActor',
ActorId(resource_id),
TaskActorInterface
)
await proxy.ProcessWebhook(body)

return {"status": "OK"}

Output:

>>> # External system sends webhook
POST /external-webhook
{"resource_type": "task", "resource_id": "task-456", "action": "external_update"}

>>> # Handler routes to TaskActor
Webhook received: {'resource_type': 'task', 'resource_id': 'task-456', 'action': 'external_update'}

>>> # TaskActor processes webhook data
TaskActor task-456: Processing webhook external_update

Output Bindings from Actors

Actors can invoke external systems using output bindings---sending HTTP requests, writing to storage, or triggering notifications.

HTTP Output Binding from Actor

Binding Component:

# components/notification-http.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: notification-http
namespace: default
spec:
type: bindings.http
version: v1
metadata:
- name: url
value: "https://hooks.slack.com/services/xxx/yyy/zzz"
- name: direction
value: "output"

Actor Using Output Binding:

class TaskActor(Actor, TaskActorInterface):
async def send_notification(self, message: str) -> None:
"""Send notification via output binding."""
actor_id = self.id.id

payload = json.dumps({
"text": f"Task {actor_id}: {message}",
"channel": "#task-notifications"
})

with DaprClient() as client:
await client.invoke_binding(
binding_name='notification-http',
operation='post',
data=payload
)

print(f"TaskActor {actor_id}: Notification sent")

async def complete_task(self) -> dict:
"""Complete task and notify via output binding."""
# Update state
state = await self._state_manager.get_state("task_data")
state["status"] = "completed"
await self._state_manager.set_state("task_data", state)

# Send external notification
await self.send_notification("Task completed!")

# Also publish internal event
await self._publish_event("task.completed", state)

return {"status": "completed"}

Output:

>>> # Task completion triggers both internal and external notifications
await proxy.CompleteTask()

>>> # Output binding sends to Slack
TaskActor task-123: Notification sent

>>> # Pub/sub notifies internal services
TaskActor task-123: Published task.completed event

Complete Event-Driven Actor Example

Here's a complete example integrating all patterns---an event-driven task management system:

actors.py:

from dapr.actor import Actor, ActorInterface, actormethod, ActorProxy, ActorId
from dapr.clients import DaprClient
from datetime import datetime
import json


class TaskActorInterface(ActorInterface):
@actormethod(name="ProcessEvent")
async def process_event(self, event_data: dict) -> None: ...

@actormethod(name="CompleteTask")
async def complete_task(self) -> dict: ...

@actormethod(name="CheckDeadline")
async def check_deadline(self) -> None: ...


class TaskActor(Actor, TaskActorInterface):
def __init__(self, ctx, actor_id):
super().__init__(ctx, actor_id)

async def _on_activate(self) -> None:
"""Initialize state on activation."""
found, _ = await self._state_manager.try_get_state("task_data")
if not found:
await self._state_manager.set_state("task_data", {
"status": "pending",
"created_at": datetime.utcnow().isoformat()
})
print(f"TaskActor {self.id.id} activated")

async def process_event(self, event_data: dict) -> None:
"""Process incoming events."""
event_type = event_data.get('event_type', 'unknown')
print(f"TaskActor {self.id.id}: Processing {event_type}")

state = await self._state_manager.get_state("task_data")

if event_type == 'task.assigned':
state['assignee'] = event_data.get('assignee')
state['status'] = 'assigned'
elif event_type == 'task.updated':
state.update(event_data.get('updates', {}))

state['last_event'] = event_type
state['last_event_at'] = datetime.utcnow().isoformat()
await self._state_manager.set_state("task_data", state)

async def complete_task(self) -> dict:
"""Mark task complete, notify externally and internally."""
actor_id = self.id.id
state = await self._state_manager.get_state("task_data")

state["status"] = "completed"
state["completed_at"] = datetime.utcnow().isoformat()
await self._state_manager.set_state("task_data", state)

# External notification via output binding
with DaprClient() as client:
await client.invoke_binding(
binding_name='notification-http',
operation='post',
data=json.dumps({
"text": f"Task {actor_id} completed by {state.get('assignee', 'unknown')}"
})
)

# Internal event via pub/sub
client.publish_event(
pubsub_name='pubsub',
topic_name='task-events',
data=json.dumps({
'task_id': actor_id,
'event_type': 'task.completed',
'completed_at': state['completed_at']
}),
data_content_type='application/json'
)

print(f"TaskActor {actor_id}: Completed and notified")
return {"status": "completed", "task_id": actor_id}

async def check_deadline(self) -> None:
"""Check if task is past deadline, mark overdue if so."""
state = await self._state_manager.get_state("task_data")
deadline = state.get('deadline')

if deadline and datetime.fromisoformat(deadline) < datetime.utcnow():
state['status'] = 'overdue'
await self._state_manager.set_state("task_data", state)

# Publish overdue event
with DaprClient() as client:
client.publish_event(
pubsub_name='pubsub',
topic_name='task-events',
data=json.dumps({
'task_id': self.id.id,
'event_type': 'task.overdue'
}),
data_content_type='application/json'
)

print(f"TaskActor {self.id.id}: Marked overdue")

main.py:

from fastapi import FastAPI, Request
from dapr.ext.fastapi import DaprActor, DaprApp
from dapr.actor import ActorProxy, ActorId
from actors import TaskActor, TaskActorInterface

app = FastAPI()
dapr_app = DaprApp(app)
actor_extension = DaprActor(app)


@app.on_event("startup")
async def startup():
await actor_extension.register_actor(TaskActor)
print("Registered TaskActor")


# Pub/Sub: Route task events to actors
@dapr_app.subscribe(pubsub='pubsub', topic='task-events', route='/TaskActor/ProcessEvent')
async def handle_task_event(event_data: dict):
"""Route pub/sub events to TaskActor instances."""
data = event_data.get('data', {})
task_id = data.get('task_id')

if not task_id:
return {"status": "SKIPPED"}

proxy = ActorProxy.create('TaskActor', ActorId(task_id), TaskActorInterface)
await proxy.ProcessEvent(data)

return {"status": "SUCCESS"}


# Input Binding: Cron triggers scheduler
@app.post("/scheduler-cron")
async def handle_scheduler_cron():
"""Cron triggers deadline checks across active tasks."""
print("Scheduler cron triggered")

# In production, you'd query a list of active task IDs
active_tasks = ["task-001", "task-002", "task-003"]

for task_id in active_tasks:
proxy = ActorProxy.create('TaskActor', ActorId(task_id), TaskActorInterface)
await proxy.CheckDeadline()

return {"status": "OK", "checked": len(active_tasks)}


# Input Binding: External webhook
@app.post("/external-webhook")
async def handle_webhook(request: Request):
"""External systems trigger actor updates."""
body = await request.json()
task_id = body.get('task_id')

if task_id:
proxy = ActorProxy.create('TaskActor', ActorId(task_id), TaskActorInterface)
await proxy.ProcessEvent({
'event_type': 'external.update',
**body
})

return {"status": "OK"}


# Health check
@app.get("/health")
async def health():
return {"status": "OK"}

Output:

>>> # Pub/sub event arrives
Received task event: {'task_id': 'task-123', 'event_type': 'task.assigned', 'assignee': 'alice'}
TaskActor task-123 activated
TaskActor task-123: Processing task.assigned

>>> # Cron triggers deadline checks
Scheduler cron triggered
TaskActor task-001: Checking deadline
TaskActor task-002: Checking deadline
TaskActor task-002: Marked overdue

>>> # External webhook arrives
Webhook: {'task_id': 'task-456', 'source': 'calendar-api'}
TaskActor task-456: Processing external.update

>>> # Task completion triggers both pub/sub and output binding
TaskActor task-123: Completed and notified

When to Use Each Pattern

PatternUse WhenExample
Pub/Sub to ActorInternal services notify actorsOrder Service publishes order.created, TaskActor processes
Actor PublishesActor state changes need broadcastingTaskActor publishes task.completed for analytics
Cron to ActorScheduled work across actorsDaily cleanup, deadline checks
Webhook to ActorExternal systems trigger actorsCalendar API notifies task updates
Actor to Output BindingActors notify external systemsSend Slack message when task overdue

Reflect on Your Skill

You built a dapr-deployment skill earlier in this chapter. Test and improve it based on event-driven actor patterns.

Test Your Skill

Using my dapr-deployment skill, create a subscription handler that routes pub/sub events to TaskActor instances. Show me the subscription YAML and the FastAPI handler.

Does your skill understand the routing pattern---events to handlers to ActorProxy to actors?

Identify Gaps

Ask yourself:

  • Does my skill explain that Dapr delivers events to FastAPI, not directly to actors?
  • Can it show both pub/sub subscription handlers and binding handlers?
  • Does it include the ActorProxy pattern for routing?

Improve Your Skill

If you found gaps:

My dapr-deployment skill needs event-driven actor patterns.
Add these concepts:
1. Pub/sub events route through FastAPI handlers to ActorProxy
2. Binding triggers (cron, webhook) invoke actors via handlers
3. Actors publish events using DaprClient.publish_event
4. Actors invoke external systems via output bindings

Try With AI

Apply event-driven actor patterns to your domain.

Setup: Open Claude Code or your preferred AI assistant in your Dapr project directory.


Prompt 1: Pub/Sub to Actor Routing

Create a Dapr subscription that routes task events to TaskActor instances.
Show me:
1. The subscription YAML with correct routing
2. The FastAPI handler that extracts task_id and creates ActorProxy
3. The actor method that processes the event
4. How to test this with curl

I'm using Dapr 1.14 with dapr-ext-fastapi.

What you're learning: The subscription handler is the bridge between topic-based pub/sub and ID-based actors. You extract the actor ID from the event payload and route to the correct instance. This pattern enables actors to react to events without knowing about pub/sub directly.


Prompt 2: Cron Binding to Actor

Create a cron binding that triggers a SchedulerActor every 5 minutes.
Show me:
1. The cron binding component YAML
2. The FastAPI handler at the matching endpoint
3. How the handler invokes SchedulerActor
4. The SchedulerActor implementation that delegates to TaskActors

The scheduler should check deadlines across all active tasks.

What you're learning: Cron bindings replace actor timers when you need application-level scheduling rather than per-actor scheduling. The handler pattern is the same---binding delivers to FastAPI, handler routes to actor via proxy.


Prompt 3: Actor Publishing and Output Bindings

I have a TaskActor that needs to:
1. Publish a task.completed event via pub/sub when completed
2. Send a Slack notification via HTTP output binding
3. Both actions should happen in the complete_task method

Show me:
1. The output binding component YAML
2. The actor method that does both
3. How to structure the event payload for pub/sub
4. Error handling if the external notification fails

Include the full actor code.

What you're learning: Actors can be both event consumers and producers. Using DaprClient within actor methods lets you publish events (internal communication) and invoke bindings (external communication). This creates truly reactive actors that participate in broader event flows.


Safety Note: When processing events from pub/sub or bindings, always validate the payload structure before routing to actors. Malformed events should be logged and dropped (return SKIPPED), not crash your handler. Consider implementing a dead-letter pattern for events that fail processing.