Event-Driven Actors
Your TaskActor stores state. It processes method calls. But it sits idle, waiting for someone to invoke it. In real systems, actors don't just wait---they react. A new task arrives in a queue, and the actor springs to life. A cron schedule fires, and the actor performs cleanup. A webhook triggers, and the actor processes external data.
This is the difference between a responsive actor and a reactive one. Responsive actors wait for requests. Reactive actors respond to events from the world around them.
In Lesson 5, you learned about timers and reminders---internal scheduling within an actor. Now you'll connect actors to the broader event ecosystem: Dapr pub/sub for internal service events, and Dapr bindings for external triggers like cron schedules and webhooks.
The Actor Event Architecture
Actors and events work together in a specific pattern. The actor handles state and logic. Events trigger and communicate between actors and services.
Event Sources
│
┌─────────────────────────┼─────────────────────────┐
│ │ │
┌──────▼──────┐ ┌───────▼───────┐ ┌──────▼──────┐
│ Pub/Sub │ │ Bindings │ │ Service │
│ Topics │ │ (cron, http) │ │ Invocation │
└──────┬──────┘ └───────┬───────┘ └──────┬──────┘
│ │ │
└─────────────────────────┼─────────────────────────┘
│
┌──────▼──────┐
│ FastAPI │
│ Handlers │
└──────┬──────┘
│
┌──────▼──────┐
│ ActorProxy │
│ Routing │
└──────┬──────┘
│
┌──────────────────────┼──────────────────────┐
│ │ │
┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ TaskActor │ │ TaskActor │ │ TaskActor │
│ task-001 │ │ task-002 │ │ task-003 │
└─────────────┘ └─────────────┘ └─────────────┘
The critical insight: Dapr delivers events to your FastAPI application, not directly to actors. You create handlers that receive events and route them to the appropriate actor instance using ActorProxy.
Actors Responding to Pub/Sub Events
When a pub/sub message arrives, Dapr calls a subscription endpoint on your FastAPI application. Your handler extracts the actor ID from the message and invokes the correct actor.
The Subscription Handler Pattern
Subscription YAML:
# components/subscription.yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: task-events-subscription
spec:
topic: task-events
pubsubname: pubsub
routes:
default: /TaskActor/ProcessEvent
scopes:
- task-actor-service
FastAPI Subscription Handler:
from fastapi import FastAPI
from dapr.ext.fastapi import DaprApp
from dapr.actor import ActorProxy, ActorId
app = FastAPI()
dapr_app = DaprApp(app)
@dapr_app.subscribe(pubsub='pubsub', topic='task-events', route='/TaskActor/ProcessEvent')
async def handle_task_event(event_data: dict):
"""Receive task events and route to the appropriate TaskActor."""
print(f"Received task event: {event_data}")
# Extract actor ID from event payload
data = event_data.get('data', {})
task_id = data.get('task_id')
if not task_id:
print("No task_id in event, skipping")
return {"status": "SKIPPED"}
# Route to the correct actor instance
proxy = ActorProxy.create(
'TaskActor',
ActorId(task_id),
TaskActorInterface
)
# Invoke actor method
await proxy.ProcessEvent(data)
return {"status": "SUCCESS"}
Output:
>>> # Event published to task-events topic
>>> # Dapr calls your subscription handler
Received task event: {'data': {'task_id': 'task-123', 'event_type': 'task.assigned', 'assignee': 'alice'}}
>>> # Handler routes to TaskActor instance
>>> # TaskActor task-123 processes the event
TaskActor task-123: Processing event task.assigned
Why This Pattern?
You might wonder: "Why can't Dapr deliver messages directly to actors?"
The answer lies in actor semantics:
- Actors are addressed by ID. Pub/sub topics don't know about actor IDs.
- Actors have typed methods. Events need routing logic to determine which method to call.
- Events might create actors. Your handler can activate actors on-demand when events arrive.
The subscription handler is the bridge---it maps event semantics (topic, payload) to actor semantics (ID, method).
Actors Publishing Events
Actors can also publish events outbound. This enables actors to communicate without knowing about each other.
Publishing from an Actor Method
from dapr.actor import Actor, ActorInterface, actormethod
from dapr.clients import DaprClient
import json
class TaskActorInterface(ActorInterface):
@actormethod(name="CompleteTask")
async def complete_task(self) -> dict: ...
class TaskActor(Actor, TaskActorInterface):
def __init__(self, ctx, actor_id):
super().__init__(ctx, actor_id)
async def complete_task(self) -> dict:
"""Mark task complete and publish event."""
actor_id = self.id.id
# Update internal state
state = await self._state_manager.get_state("task_data")
state["status"] = "completed"
await self._state_manager.set_state("task_data", state)
# Publish event for other services/actors
with DaprClient() as client:
client.publish_event(
pubsub_name='pubsub',
topic_name='task-events',
data=json.dumps({
'task_id': actor_id,
'event_type': 'task.completed',
'completed_at': state.get('completed_at')
}),
data_content_type='application/json'
)
print(f"TaskActor {actor_id}: Published task.completed event")
return {"status": "completed", "task_id": actor_id}
Output:
>>> # Client invokes actor method
await proxy.CompleteTask()
>>> # Actor updates state and publishes event
TaskActor task-123: Published task.completed event
>>> # Event flows to subscribers (other actors, services, etc.)
>>> # NotificationActor, AnalyticsService, etc. receive the event
Event Publishing Best Practices
When publishing events from actors, follow these patterns:
async def _publish_event(self, event_type: str, payload: dict):
"""Reusable event publishing helper."""
event_data = {
"actor_id": self.id.id,
"actor_type": "TaskActor",
"event_type": event_type,
"timestamp": datetime.utcnow().isoformat(),
**payload
}
with DaprClient() as client:
client.publish_event(
pubsub_name='pubsub',
topic_name='task-events',
data=json.dumps(event_data),
data_content_type='application/json'
)
Include enough context in events for subscribers to route correctly:
| Field | Purpose |
|---|---|
actor_id | Identifies which actor instance published |
actor_type | Allows routing to same actor type |
event_type | Determines handler logic |
timestamp | Enables ordering and deduplication |
Input Bindings Triggering Actors
Input bindings connect actors to external event sources. A cron schedule fires, and an actor wakes up. A webhook arrives, and an actor processes it.
Cron Binding to Actor
Binding Component:
# components/scheduler-cron.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: scheduler-cron
namespace: default
spec:
type: bindings.cron
version: v1
metadata:
- name: schedule
value: "*/5 * * * *" # Every 5 minutes
- name: direction
value: "input"
FastAPI Handler:
@app.post("/scheduler-cron")
async def handle_scheduler_cron():
"""Triggered by cron, invokes SchedulerActor."""
print("Cron triggered, invoking SchedulerActor")
# Use a well-known actor ID for singleton scheduler
proxy = ActorProxy.create(
'SchedulerActor',
ActorId('global-scheduler'),
SchedulerActorInterface
)
# Trigger scheduled work
result = await proxy.ProcessScheduledTasks()
return {"status": "OK", "result": result}
SchedulerActor Implementation:
class SchedulerActorInterface(ActorInterface):
@actormethod(name="ProcessScheduledTasks")
async def process_scheduled_tasks(self) -> dict: ...
class SchedulerActor(Actor, SchedulerActorInterface):
async def process_scheduled_tasks(self) -> dict:
"""Find and process tasks with passed deadlines."""
print(f"SchedulerActor: Checking for scheduled tasks")
# Get list of tasks to process
task_ids = await self._get_due_tasks()
# Delegate to individual TaskActors
for task_id in task_ids:
proxy = ActorProxy.create(
'TaskActor',
ActorId(task_id),
TaskActorInterface
)
await proxy.CheckDeadline()
return {"processed": len(task_ids)}
Output:
>>> # Every 5 minutes, Dapr calls /scheduler-cron
Cron triggered, invoking SchedulerActor
>>> # SchedulerActor checks for due tasks
SchedulerActor: Checking for scheduled tasks
>>> # Delegates to individual TaskActors
TaskActor task-001: Checking deadline
TaskActor task-007: Checking deadline
Webhook Binding to Actor
External systems can trigger actors through HTTP bindings:
Binding Component:
# components/webhook-binding.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: external-webhook
namespace: default
spec:
type: bindings.http
version: v1
metadata:
- name: direction
value: "input"
FastAPI Handler:
from fastapi import Request
@app.post("/external-webhook")
async def handle_external_webhook(request: Request):
"""Receive webhook and route to appropriate actor."""
body = await request.json()
print(f"Webhook received: {body}")
# Extract routing information from webhook payload
resource_type = body.get('resource_type')
resource_id = body.get('resource_id')
if resource_type == 'task':
proxy = ActorProxy.create(
'TaskActor',
ActorId(resource_id),
TaskActorInterface
)
await proxy.ProcessWebhook(body)
return {"status": "OK"}
Output:
>>> # External system sends webhook
POST /external-webhook
{"resource_type": "task", "resource_id": "task-456", "action": "external_update"}
>>> # Handler routes to TaskActor
Webhook received: {'resource_type': 'task', 'resource_id': 'task-456', 'action': 'external_update'}
>>> # TaskActor processes webhook data
TaskActor task-456: Processing webhook external_update
Output Bindings from Actors
Actors can invoke external systems using output bindings---sending HTTP requests, writing to storage, or triggering notifications.
HTTP Output Binding from Actor
Binding Component:
# components/notification-http.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: notification-http
namespace: default
spec:
type: bindings.http
version: v1
metadata:
- name: url
value: "https://hooks.slack.com/services/xxx/yyy/zzz"
- name: direction
value: "output"
Actor Using Output Binding:
class TaskActor(Actor, TaskActorInterface):
async def send_notification(self, message: str) -> None:
"""Send notification via output binding."""
actor_id = self.id.id
payload = json.dumps({
"text": f"Task {actor_id}: {message}",
"channel": "#task-notifications"
})
with DaprClient() as client:
await client.invoke_binding(
binding_name='notification-http',
operation='post',
data=payload
)
print(f"TaskActor {actor_id}: Notification sent")
async def complete_task(self) -> dict:
"""Complete task and notify via output binding."""
# Update state
state = await self._state_manager.get_state("task_data")
state["status"] = "completed"
await self._state_manager.set_state("task_data", state)
# Send external notification
await self.send_notification("Task completed!")
# Also publish internal event
await self._publish_event("task.completed", state)
return {"status": "completed"}
Output:
>>> # Task completion triggers both internal and external notifications
await proxy.CompleteTask()
>>> # Output binding sends to Slack
TaskActor task-123: Notification sent
>>> # Pub/sub notifies internal services
TaskActor task-123: Published task.completed event
Complete Event-Driven Actor Example
Here's a complete example integrating all patterns---an event-driven task management system:
actors.py:
from dapr.actor import Actor, ActorInterface, actormethod, ActorProxy, ActorId
from dapr.clients import DaprClient
from datetime import datetime
import json
class TaskActorInterface(ActorInterface):
@actormethod(name="ProcessEvent")
async def process_event(self, event_data: dict) -> None: ...
@actormethod(name="CompleteTask")
async def complete_task(self) -> dict: ...
@actormethod(name="CheckDeadline")
async def check_deadline(self) -> None: ...
class TaskActor(Actor, TaskActorInterface):
def __init__(self, ctx, actor_id):
super().__init__(ctx, actor_id)
async def _on_activate(self) -> None:
"""Initialize state on activation."""
found, _ = await self._state_manager.try_get_state("task_data")
if not found:
await self._state_manager.set_state("task_data", {
"status": "pending",
"created_at": datetime.utcnow().isoformat()
})
print(f"TaskActor {self.id.id} activated")
async def process_event(self, event_data: dict) -> None:
"""Process incoming events."""
event_type = event_data.get('event_type', 'unknown')
print(f"TaskActor {self.id.id}: Processing {event_type}")
state = await self._state_manager.get_state("task_data")
if event_type == 'task.assigned':
state['assignee'] = event_data.get('assignee')
state['status'] = 'assigned'
elif event_type == 'task.updated':
state.update(event_data.get('updates', {}))
state['last_event'] = event_type
state['last_event_at'] = datetime.utcnow().isoformat()
await self._state_manager.set_state("task_data", state)
async def complete_task(self) -> dict:
"""Mark task complete, notify externally and internally."""
actor_id = self.id.id
state = await self._state_manager.get_state("task_data")
state["status"] = "completed"
state["completed_at"] = datetime.utcnow().isoformat()
await self._state_manager.set_state("task_data", state)
# External notification via output binding
with DaprClient() as client:
await client.invoke_binding(
binding_name='notification-http',
operation='post',
data=json.dumps({
"text": f"Task {actor_id} completed by {state.get('assignee', 'unknown')}"
})
)
# Internal event via pub/sub
client.publish_event(
pubsub_name='pubsub',
topic_name='task-events',
data=json.dumps({
'task_id': actor_id,
'event_type': 'task.completed',
'completed_at': state['completed_at']
}),
data_content_type='application/json'
)
print(f"TaskActor {actor_id}: Completed and notified")
return {"status": "completed", "task_id": actor_id}
async def check_deadline(self) -> None:
"""Check if task is past deadline, mark overdue if so."""
state = await self._state_manager.get_state("task_data")
deadline = state.get('deadline')
if deadline and datetime.fromisoformat(deadline) < datetime.utcnow():
state['status'] = 'overdue'
await self._state_manager.set_state("task_data", state)
# Publish overdue event
with DaprClient() as client:
client.publish_event(
pubsub_name='pubsub',
topic_name='task-events',
data=json.dumps({
'task_id': self.id.id,
'event_type': 'task.overdue'
}),
data_content_type='application/json'
)
print(f"TaskActor {self.id.id}: Marked overdue")
main.py:
from fastapi import FastAPI, Request
from dapr.ext.fastapi import DaprActor, DaprApp
from dapr.actor import ActorProxy, ActorId
from actors import TaskActor, TaskActorInterface
app = FastAPI()
dapr_app = DaprApp(app)
actor_extension = DaprActor(app)
@app.on_event("startup")
async def startup():
await actor_extension.register_actor(TaskActor)
print("Registered TaskActor")
# Pub/Sub: Route task events to actors
@dapr_app.subscribe(pubsub='pubsub', topic='task-events', route='/TaskActor/ProcessEvent')
async def handle_task_event(event_data: dict):
"""Route pub/sub events to TaskActor instances."""
data = event_data.get('data', {})
task_id = data.get('task_id')
if not task_id:
return {"status": "SKIPPED"}
proxy = ActorProxy.create('TaskActor', ActorId(task_id), TaskActorInterface)
await proxy.ProcessEvent(data)
return {"status": "SUCCESS"}
# Input Binding: Cron triggers scheduler
@app.post("/scheduler-cron")
async def handle_scheduler_cron():
"""Cron triggers deadline checks across active tasks."""
print("Scheduler cron triggered")
# In production, you'd query a list of active task IDs
active_tasks = ["task-001", "task-002", "task-003"]
for task_id in active_tasks:
proxy = ActorProxy.create('TaskActor', ActorId(task_id), TaskActorInterface)
await proxy.CheckDeadline()
return {"status": "OK", "checked": len(active_tasks)}
# Input Binding: External webhook
@app.post("/external-webhook")
async def handle_webhook(request: Request):
"""External systems trigger actor updates."""
body = await request.json()
task_id = body.get('task_id')
if task_id:
proxy = ActorProxy.create('TaskActor', ActorId(task_id), TaskActorInterface)
await proxy.ProcessEvent({
'event_type': 'external.update',
**body
})
return {"status": "OK"}
# Health check
@app.get("/health")
async def health():
return {"status": "OK"}
Output:
>>> # Pub/sub event arrives
Received task event: {'task_id': 'task-123', 'event_type': 'task.assigned', 'assignee': 'alice'}
TaskActor task-123 activated
TaskActor task-123: Processing task.assigned
>>> # Cron triggers deadline checks
Scheduler cron triggered
TaskActor task-001: Checking deadline
TaskActor task-002: Checking deadline
TaskActor task-002: Marked overdue
>>> # External webhook arrives
Webhook: {'task_id': 'task-456', 'source': 'calendar-api'}
TaskActor task-456: Processing external.update
>>> # Task completion triggers both pub/sub and output binding
TaskActor task-123: Completed and notified
When to Use Each Pattern
| Pattern | Use When | Example |
|---|---|---|
| Pub/Sub to Actor | Internal services notify actors | Order Service publishes order.created, TaskActor processes |
| Actor Publishes | Actor state changes need broadcasting | TaskActor publishes task.completed for analytics |
| Cron to Actor | Scheduled work across actors | Daily cleanup, deadline checks |
| Webhook to Actor | External systems trigger actors | Calendar API notifies task updates |
| Actor to Output Binding | Actors notify external systems | Send Slack message when task overdue |
Reflect on Your Skill
You built a dapr-deployment skill earlier in this chapter. Test and improve it based on event-driven actor patterns.
Test Your Skill
Using my dapr-deployment skill, create a subscription handler that routes pub/sub events to TaskActor instances. Show me the subscription YAML and the FastAPI handler.
Does your skill understand the routing pattern---events to handlers to ActorProxy to actors?
Identify Gaps
Ask yourself:
- Does my skill explain that Dapr delivers events to FastAPI, not directly to actors?
- Can it show both pub/sub subscription handlers and binding handlers?
- Does it include the ActorProxy pattern for routing?
Improve Your Skill
If you found gaps:
My dapr-deployment skill needs event-driven actor patterns.
Add these concepts:
1. Pub/sub events route through FastAPI handlers to ActorProxy
2. Binding triggers (cron, webhook) invoke actors via handlers
3. Actors publish events using DaprClient.publish_event
4. Actors invoke external systems via output bindings
Try With AI
Apply event-driven actor patterns to your domain.
Setup: Open Claude Code or your preferred AI assistant in your Dapr project directory.
Prompt 1: Pub/Sub to Actor Routing
Create a Dapr subscription that routes task events to TaskActor instances.
Show me:
1. The subscription YAML with correct routing
2. The FastAPI handler that extracts task_id and creates ActorProxy
3. The actor method that processes the event
4. How to test this with curl
I'm using Dapr 1.14 with dapr-ext-fastapi.
What you're learning: The subscription handler is the bridge between topic-based pub/sub and ID-based actors. You extract the actor ID from the event payload and route to the correct instance. This pattern enables actors to react to events without knowing about pub/sub directly.
Prompt 2: Cron Binding to Actor
Create a cron binding that triggers a SchedulerActor every 5 minutes.
Show me:
1. The cron binding component YAML
2. The FastAPI handler at the matching endpoint
3. How the handler invokes SchedulerActor
4. The SchedulerActor implementation that delegates to TaskActors
The scheduler should check deadlines across all active tasks.
What you're learning: Cron bindings replace actor timers when you need application-level scheduling rather than per-actor scheduling. The handler pattern is the same---binding delivers to FastAPI, handler routes to actor via proxy.
Prompt 3: Actor Publishing and Output Bindings
I have a TaskActor that needs to:
1. Publish a task.completed event via pub/sub when completed
2. Send a Slack notification via HTTP output binding
3. Both actions should happen in the complete_task method
Show me:
1. The output binding component YAML
2. The actor method that does both
3. How to structure the event payload for pub/sub
4. Error handling if the external notification fails
Include the full actor code.
What you're learning: Actors can be both event consumers and producers. Using DaprClient within actor methods lets you publish events (internal communication) and invoke bindings (external communication). This creates truly reactive actors that participate in broader event flows.
Safety Note: When processing events from pub/sub or bindings, always validate the payload structure before routing to actors. Malformed events should be logged and dropped (return SKIPPED), not crash your handler. Consider implementing a dead-letter pattern for events that fail processing.