Actor Communication Patterns
Your ChatAgent works. It stores conversation history, manages state across activations, and publishes events. But now you need something more sophisticated: a multi-agent system where specialized actors collaborate on complex tasks.
Picture an AI customer support system. When a customer sends a message, the ChatAgent shouldn't do everything itself. It should delegate: route the message to a ResponseAgent that generates intelligent replies, while a MemoryAgent tracks conversation context across sessions. Each agent has a single responsibility, isolated state, and clear boundaries.
This is the actor coordination problem. How do actors talk to each other? How do you build hierarchies where parent actors manage child workers? How do peers collaborate without creating tangled dependencies?
The answer is ActorProxy: Dapr's mechanism for actor-to-actor communication.
ActorProxy: The Actor Communication Gateway
When one actor needs to invoke methods on another actor, it doesn't call methods directly. Direct method calls would break actor isolation and turn-based concurrency guarantees. Instead, actors communicate through ActorProxy, which routes requests through the Dapr sidecar.
from dapr.actor import ActorProxy, ActorId
# Create a proxy to invoke another actor
proxy = ActorProxy.create(
actor_type="ResponseAgent", # The type of actor to invoke
actor_id=ActorId("response-user1"), # The specific instance
actor_interface=ResponseAgentInterface # The interface defining available methods
)
# Invoke a method on the target actor
response = await proxy.ProcessMessage({"content": "Hello!"})
Output:
{'role': 'assistant', 'content': 'Memory: user: Hello!. Got your message: Hello! at 2025-01-15T10:30:45+00:00'}
The ActorProxy.create() call doesn't actually create or activate the target actor. It creates a local proxy object that knows how to route requests. The target actor activates on first method invocation, following virtual actor semantics.
Method Naming Convention
When defining actor methods in Python, use snake_case for the implementation but PascalCase for the @actormethod decorator name:
class ResponseAgentInterface(ActorInterface):
@actormethod(name="ProcessMessage") # PascalCase in decorator
async def process_message(self, user_input: dict) -> dict | None: # snake_case implementation
pass
When invoking via proxy, use the PascalCase name from the decorator:
# Correct: Use PascalCase method name
response = await proxy.ProcessMessage(message_dict)
# This won't work:
# response = await proxy.process_message(message_dict)
Parent-Child Actor Pattern
The parent-child pattern establishes a hierarchy where one actor manages and coordinates others. The parent actor creates child actor IDs, delegates tasks to them, and aggregates results.
Consider a ChatAgent that delegates response generation to a ResponseAgent:
class ChatAgentInterface(ActorInterface):
@actormethod(name="ProcessMessage")
async def process_message(self, user_input: dict) -> dict | None:
pass
@actormethod(name="GetConversationHistory")
async def get_conversation_history(self) -> list[dict] | None:
pass
class ChatAgent(Actor, ChatAgentInterface):
"""Parent actor that manages conversation flow by delegating to child actors."""
def __init__(self, ctx, actor_id):
super().__init__(ctx, actor_id)
self._history_key = f"history-{actor_id.id}"
self._actor_id = actor_id
async def _on_activate(self) -> None:
"""Initialize conversation history on activation."""
logging.info(f"Activating ChatAgent for {self._history_key}")
try:
history = await self._state_manager.get_state(self._history_key)
if history is None:
await self._state_manager.set_state(self._history_key, [])
except Exception as e:
logging.warning(f"Initializing fresh state: {e}")
await self._state_manager.set_state(self._history_key, [])
async def process_message(self, user_input: dict) -> dict:
"""Delegate message processing to ResponseAgent child."""
logging.info(f"ChatAgent processing: {user_input}")
# Load and update history
history = await self._state_manager.get_state(self._history_key)
current_history = history if isinstance(history, list) else []
current_history.append(user_input)
# Create child actor proxy with predictable ID
response_actor_id = ActorId(f"response-{self._actor_id.id}")
response_proxy = ActorProxy.create(
"ResponseAgent",
response_actor_id,
ResponseAgentInterface
)
# Delegate to child actor
response = await response_proxy.ProcessMessage(user_input)
current_history.append(response)
# Save updated history
await self._state_manager.set_state(self._history_key, current_history)
logging.info(f"ChatAgent delegated successfully")
return response
async def get_conversation_history(self) -> list[dict]:
"""Retrieve full conversation history."""
history = await self._state_manager.get_state(self._history_key)
return history if isinstance(history, list) else []
Output (when invoking ChatAgent):
INFO: Activating ChatAgent for history-user1
INFO: ChatAgent processing: {'role': 'user', 'content': 'Hello!'}
INFO: ChatAgent delegated successfully
{'role': 'assistant', 'content': 'Memory: Message count: 1. Got your message: Hello! at 2025-01-15T10:30:45+00:00'}
The Child Actor
The ResponseAgent handles the actual response generation:
class ResponseAgentInterface(ActorInterface):
@actormethod(name="ProcessMessage")
async def process_message(self, user_input: dict) -> dict | None:
pass
@actormethod(name="GetMessageCount")
async def get_message_count(self) -> int | None:
pass
class ResponseAgent(Actor, ResponseAgentInterface):
"""Child actor that generates responses and tracks message count."""
def __init__(self, ctx, actor_id):
super().__init__(ctx, actor_id)
self._count_key = f"response-count-{actor_id.id}"
self._actor_id = actor_id
async def _on_activate(self) -> None:
"""Initialize message counter on activation."""
logging.info(f"Activating ResponseAgent for {self._count_key}")
try:
count = await self._state_manager.get_state(self._count_key)
if count is None:
await self._state_manager.set_state(self._count_key, 0)
except Exception:
await self._state_manager.set_state(self._count_key, 0)
async def process_message(self, user_input: dict) -> dict:
"""Generate response with timestamp and message count."""
logging.info(f"ResponseAgent processing: {user_input}")
# Increment and persist message count
count = await self._state_manager.get_state(self._count_key)
count = count if isinstance(count, int) else 0
count += 1
await self._state_manager.set_state(self._count_key, count)
# Generate timestamped response
timestamp = datetime.now(UTC).isoformat()
response_content = f"Memory: Message count: {count}. Got your message: {user_input['content']} at {timestamp}"
response = {"role": "assistant", "content": response_content}
logging.info(f"ResponseAgent generated response #{count}")
return response
async def get_message_count(self) -> int:
"""Return total messages processed."""
count = await self._state_manager.get_state(self._count_key)
return count if isinstance(count, int) else 0
Output (when ResponseAgent activates):
INFO: Activating ResponseAgent for response-count-response-user1
INFO: ResponseAgent processing: {'role': 'user', 'content': 'Hello!'}
INFO: ResponseAgent generated response #1
Actor ID Conventions
Notice the predictable ID pattern: when ChatAgent with ID user1 needs to delegate, it creates a ResponseAgent with ID response-user1. This convention enables:
- Predictable routing: Any actor can compute the ID of related actors
- State isolation: Each user gets dedicated actor instances
- Debugging clarity: Actor IDs reveal relationships in logs
| Parent ID | Child Type | Child ID Convention |
|---|---|---|
user1 | ResponseAgent | response-user1 |
user1 | MemoryAgent | memory-user1 |
order-123 | PaymentActor | payment-order-123 |
task-456 | WorkerActor | worker-task-456 |
Peer-to-Peer Actor Communication
Not all actor relationships are hierarchical. Sometimes actors communicate as equals, each maintaining their own domain while collaborating on shared workflows.
Consider a ResponseAgent that queries a MemoryAgent for conversation context:
class MemoryAgentInterface(ActorInterface):
@actormethod(name="UpdateMemory")
async def update_memory(self, message: dict) -> None:
pass
@actormethod(name="GetMemory")
async def get_memory(self) -> list[dict] | None:
pass
class MemoryAgent(Actor, MemoryAgentInterface):
"""Peer actor that maintains conversation memory for context-aware responses."""
def __init__(self, ctx, actor_id):
super().__init__(ctx, actor_id)
self._memory_key = f"memory-{actor_id.id}"
async def _on_activate(self) -> None:
"""Initialize empty memory on activation."""
logging.info(f"Activating MemoryAgent for {self._memory_key}")
try:
memory = await self._state_manager.get_state(self._memory_key)
if memory is None:
await self._state_manager.set_state(self._memory_key, [])
except Exception:
await self._state_manager.set_state(self._memory_key, [])
async def update_memory(self, message: dict) -> None:
"""Add messages to memory, keeping last 6 entries."""
user_message = message.get("user_message")
response_message = message.get("response_message")
logging.info(f"MemoryAgent updating: user={user_message}")
memory = await self._state_manager.get_state(self._memory_key)
current_memory = memory if isinstance(memory, list) else []
if user_message:
current_memory.append(user_message)
if response_message:
current_memory.append(response_message)
# Keep only last 6 messages for context window
if len(current_memory) > 6:
current_memory = current_memory[-6:]
await self._state_manager.set_state(self._memory_key, current_memory)
logging.info(f"MemoryAgent stored {len(current_memory)} messages")
async def get_memory(self) -> list[dict]:
"""Retrieve conversation memory."""
memory = await self._state_manager.get_state(self._memory_key)
return memory if isinstance(memory, list) else []
Now ResponseAgent can query MemoryAgent as a peer:
async def process_message(self, user_input: dict) -> dict:
"""Generate response enriched with memory context from peer actor."""
logging.info(f"ResponseAgent processing: {user_input}")
# Increment message count
count = await self._state_manager.get_state(self._count_key)
count = (count if isinstance(count, int) else 0) + 1
await self._state_manager.set_state(self._count_key, count)
# Query peer MemoryAgent for context
memory_actor_id = ActorId(f"memory-{self._actor_id.id}")
memory_proxy = ActorProxy.create(
"MemoryAgent",
memory_actor_id,
MemoryAgentInterface
)
memory = await memory_proxy.GetMemory()
# Build context from memory
if memory:
memory_context = "; ".join([
f"{m['role']}: {m['content']}" for m in memory
])
else:
memory_context = f"Message count: {count}"
# Generate response with context
timestamp = datetime.now(UTC).isoformat()
response_content = f"Memory: {memory_context}. Got your message: {user_input['content']} at {timestamp}"
logging.info(f"ResponseAgent used memory context: {memory_context[:50]}...")
return {"role": "assistant", "content": response_content}
Output (when ResponseAgent queries MemoryAgent):
INFO: ResponseAgent processing: {'role': 'user', 'content': 'What did I say earlier?'}
INFO: MemoryAgent retrieved 4 messages
INFO: ResponseAgent used memory context: user: Hello!; assistant: Got your...
Complete Multi-Agent System
Here's a complete example showing parent-child and peer-to-peer patterns working together:
import logging
import json
from datetime import datetime, UTC
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from dapr.ext.fastapi import DaprActor
from dapr.actor import Actor, ActorInterface, ActorProxy, ActorId, actormethod
from dapr.clients import DaprClient
logging.basicConfig(level=logging.INFO)
app = FastAPI(title="MultiAgentService")
actor = DaprActor(app)
class Message(BaseModel):
role: str
content: str
# ============ INTERFACES ============
class ChatAgentInterface(ActorInterface):
@actormethod(name="ProcessMessage")
async def process_message(self, user_input: dict) -> dict | None:
pass
@actormethod(name="GetConversationHistory")
async def get_conversation_history(self) -> list[dict] | None:
pass
class ResponseAgentInterface(ActorInterface):
@actormethod(name="ProcessMessage")
async def process_message(self, user_input: dict) -> dict | None:
pass
class MemoryAgentInterface(ActorInterface):
@actormethod(name="UpdateMemory")
async def update_memory(self, message: dict) -> None:
pass
@actormethod(name="GetMemory")
async def get_memory(self) -> list[dict] | None:
pass
# ============ IMPLEMENTATIONS ============
class ChatAgent(Actor, ChatAgentInterface):
"""Parent: Orchestrates conversation by delegating to child actors."""
def __init__(self, ctx, actor_id):
super().__init__(ctx, actor_id)
self._history_key = f"history-{actor_id.id}"
self._actor_id = actor_id
async def _on_activate(self) -> None:
has_state = await self._state_manager.try_get_state(self._history_key)
if not has_state[0]:
await self._state_manager.set_state(self._history_key, [])
async def process_message(self, user_input: dict) -> dict:
# Update history
history = await self._state_manager.get_state(self._history_key)
current_history = history if isinstance(history, list) else []
current_history.append(user_input)
# Delegate to ResponseAgent (parent-child)
response_proxy = ActorProxy.create(
"ResponseAgent",
ActorId(f"response-{self._actor_id.id}"),
ResponseAgentInterface
)
response = await response_proxy.ProcessMessage(user_input)
current_history.append(response)
await self._state_manager.set_state(self._history_key, current_history)
# Update MemoryAgent via pub/sub (triggers event-driven update)
await self._publish_conversation_event(user_input, response)
return response
async def _publish_conversation_event(self, user_input: dict, response: dict) -> None:
event_data = {
"actor_id": self._actor_id.id,
"input": user_input,
"output": response
}
with DaprClient() as client:
client.publish_event(
pubsub_name="daca-pubsub",
topic_name="user-chat",
data=json.dumps(event_data)
)
async def get_conversation_history(self) -> list[dict]:
history = await self._state_manager.get_state(self._history_key)
return history if isinstance(history, list) else []
class ResponseAgent(Actor, ResponseAgentInterface):
"""Child: Generates responses using memory from peer MemoryAgent."""
def __init__(self, ctx, actor_id):
super().__init__(ctx, actor_id)
self._count_key = f"count-{actor_id.id}"
self._actor_id = actor_id
async def _on_activate(self) -> None:
has_state = await self._state_manager.try_get_state(self._count_key)
if not has_state[0]:
await self._state_manager.set_state(self._count_key, 0)
async def process_message(self, user_input: dict) -> dict:
# Increment count
count = await self._state_manager.get_state(self._count_key)
count = (count if isinstance(count, int) else 0) + 1
await self._state_manager.set_state(self._count_key, count)
# Query peer MemoryAgent for context
memory_proxy = ActorProxy.create(
"MemoryAgent",
ActorId(f"memory-{self._actor_id.id}"),
MemoryAgentInterface
)
memory = await memory_proxy.GetMemory()
memory_context = (
"; ".join([f"{m['role']}: {m['content']}" for m in memory])
if memory else f"Message count: {count}"
)
timestamp = datetime.now(UTC).isoformat()
return {
"role": "assistant",
"content": f"Memory: {memory_context}. Got: {user_input['content']} at {timestamp}"
}
class MemoryAgent(Actor, MemoryAgentInterface):
"""Peer: Maintains conversation memory for ResponseAgent."""
def __init__(self, ctx, actor_id):
super().__init__(ctx, actor_id)
self._memory_key = f"memory-{actor_id.id}"
async def _on_activate(self) -> None:
has_state = await self._state_manager.try_get_state(self._memory_key)
if not has_state[0]:
await self._state_manager.set_state(self._memory_key, [])
async def update_memory(self, message: dict) -> None:
memory = await self._state_manager.get_state(self._memory_key)
current_memory = memory if isinstance(memory, list) else []
if message.get("user_message"):
current_memory.append(message["user_message"])
if message.get("response_message"):
current_memory.append(message["response_message"])
# Keep last 6 for context window
current_memory = current_memory[-6:] if len(current_memory) > 6 else current_memory
await self._state_manager.set_state(self._memory_key, current_memory)
async def get_memory(self) -> list[dict]:
memory = await self._state_manager.get_state(self._memory_key)
return memory if isinstance(memory, list) else []
# ============ REGISTRATION ============
@app.on_event("startup")
async def startup():
await actor.register_actor(ChatAgent)
await actor.register_actor(ResponseAgent)
await actor.register_actor(MemoryAgent)
logging.info("Registered: ChatAgent, ResponseAgent, MemoryAgent")
# ============ ENDPOINTS ============
@app.post("/chat/{actor_id}")
async def chat(actor_id: str, data: Message):
"""Send message to ChatAgent, which delegates to ResponseAgent."""
proxy = ActorProxy.create("ChatAgent", ActorId(actor_id), ChatAgentInterface)
response = await proxy.ProcessMessage(data.model_dump())
return {"response": response}
@app.get("/chat/{actor_id}/history")
async def get_history(actor_id: str):
"""Get conversation history from ChatAgent."""
proxy = ActorProxy.create("ChatAgent", ActorId(actor_id), ChatAgentInterface)
history = await proxy.GetConversationHistory()
return {"history": history}
@app.get("/memory/{actor_id}")
async def get_memory(actor_id: str):
"""Get memory from MemoryAgent."""
proxy = ActorProxy.create("MemoryAgent", ActorId(f"memory-response-{actor_id}"), MemoryAgentInterface)
memory = await proxy.GetMemory()
return {"memory": memory}
@app.post("/subscribe")
async def subscribe(data: dict):
"""Handle pub/sub events to update MemoryAgent."""
event_data = data.get("data", {})
if isinstance(event_data, str):
event_data = json.loads(event_data)
user_id = event_data.get("actor_id", "unknown")
memory_proxy = ActorProxy.create(
"MemoryAgent",
ActorId(f"memory-response-{user_id}"),
MemoryAgentInterface
)
await memory_proxy.UpdateMemory({
"user_message": event_data.get("input"),
"response_message": event_data.get("output")
})
return {"status": "processed"}
Output (complete flow):
POST /chat/alice {"role": "user", "content": "Hello!"}
INFO: Registered: ChatAgent, ResponseAgent, MemoryAgent
INFO: Activating ChatAgent for history-alice
INFO: Activating ResponseAgent for count-response-alice
INFO: Activating MemoryAgent for memory-response-alice
{"response": {"role": "assistant", "content": "Memory: Message count: 1. Got: Hello! at 2025-01-15T10:30:45+00:00"}}
POST /chat/alice {"role": "user", "content": "Remember me?"}
{"response": {"role": "assistant", "content": "Memory: user: Hello!; assistant: Memory: Message count: 1... Got: Remember me? at 2025-01-15T10:31:12+00:00"}}
Virtual Actor Semantics: What Happens When You Call a Non-Existent Actor?
A common question: "What if the target actor doesn't exist when I call it?"
The answer reveals a key insight about Dapr's virtual actor model: actors don't need to exist before you call them. When you invoke an actor that hasn't been activated:
- Dapr routes the request to the Placement service
- Placement assigns the actor to an available host
- The actor activates (runs
_on_activate) - The method executes
- Response returns to the caller
This means your ChatAgent can delegate to a ResponseAgent that has never been called before. The first method invocation creates and activates it automatically.
# ResponseAgent with ID "response-alice" doesn't exist yet
proxy = ActorProxy.create("ResponseAgent", ActorId("response-alice"), ResponseAgentInterface)
# This call activates the actor, then executes ProcessMessage
response = await proxy.ProcessMessage({"content": "Hello"})
# ResponseAgent "response-alice" now exists with initialized state
Communication Flow Visualization
User Request: POST /chat/alice {"role": "user", "content": "Hello!"}
|
v
+---------------+
| ChatAgent | (Parent)
| ID: alice |
+---------------+
|
| 1. ActorProxy.create("ResponseAgent", "response-alice")
| 2. await proxy.ProcessMessage(...)
v
+---------------+
| ResponseAgent | (Child)
| ID: response- |
| alice |
+---------------+
|
| 3. ActorProxy.create("MemoryAgent", "memory-response-alice")
| 4. await proxy.GetMemory()
v
+---------------+
| MemoryAgent | (Peer)
| ID: memory- |
| response-alice|
+---------------+
|
| 5. Return memory context
v
ResponseAgent generates response with context
|
| 6. Return response to ChatAgent
v
ChatAgent stores history, publishes event
|
| 7. Return response to user
v
{"response": {"role": "assistant", "content": "..."}}
Reflect on Your Skill
You built a dapr-deployment skill earlier. Does it understand actor communication patterns?
Test Your Skill
Using my dapr-deployment skill, generate code for two actors that communicate:
a TaskManagerActor that delegates to WorkerActors using ActorProxy.
Does your skill produce:
- Correct ActorProxy.create() calls with actor type, ID, and interface?
- Predictable ID conventions (e.g.,
worker-{task_id})? - Proper async/await on proxy method calls?
Identify Gaps
Ask yourself:
- Did my skill use PascalCase method names in proxy calls?
- Did it explain that target actors activate on first invocation?
- Did it show both parent-child and peer-to-peer patterns?
Improve Your Skill
If you found gaps:
My dapr-deployment skill needs actor communication patterns. Update it to include:
- ActorProxy.create() syntax with type, ActorId, and interface
- Parent-child delegation pattern with ID conventions
- Peer-to-peer actor queries
- Virtual actor activation semantics (actors created on first call)
Try With AI
Open your AI companion and explore actor communication patterns.
Prompt 1: Design Parent-Child Hierarchy
Help me design a parent-child actor hierarchy for a task processing system.
I need:
- TaskManagerActor: Receives tasks, delegates to workers, tracks completion
- WorkerActor: Processes individual tasks, reports results back
Show me:
1. The interface definitions for both actors
2. How TaskManagerActor uses ActorProxy to delegate
3. The ID naming convention (TaskManager ID "manager-1" should create Workers with predictable IDs)
4. How the manager tracks which workers are processing which tasks
Use Python with dapr-ext-fastapi.
What you're learning: Hierarchical actor coordination. The AI helps you design ID conventions and delegation patterns that scale to many workers.
Prompt 2: Implement Peer-to-Peer Communication
I have two actors that need to communicate as peers:
- OrderActor: Manages order state
- InventoryActor: Manages stock levels
When OrderActor processes a new order, it should:
1. Query InventoryActor to check stock availability
2. If available, tell InventoryActor to reserve the items
3. Complete the order only after reservation succeeds
Show me the ActorProxy calls and error handling for this peer-to-peer communication.
What happens if InventoryActor isn't available?
What you're learning: Peer coordination patterns. The AI guides you through bidirectional communication without hierarchy, including failure scenarios.
Prompt 3: Debug Actor Communication
My actor communication isn't working. Here's my code:
ChatAgent calls ResponseAgent like this:
proxy = ActorProxy.create("ResponseAgent", ActorId("response-1"), ResponseAgentInterface)
result = await proxy.process_message({"content": "Hello"})
But I get: "Method not found"
My ResponseAgent interface:
@actormethod(name="ProcessMessage")
async def process_message(self, user_input: dict) -> dict: ...
What's wrong and how do I fix it?
What you're learning: Common ActorProxy debugging. The AI identifies the PascalCase vs snake_case issue and explains the method naming convention.
Safety Note
Actor-to-actor communication adds latency (each proxy call goes through the Dapr sidecar) and complexity (more actors to debug). Start with simple hierarchies before building deep actor chains. For synchronous request-response patterns, consider whether actors are the right abstraction or whether direct service invocation would be simpler.