Chat Actor - Stateful Conversations
You've created a simple actor that stores greetings. Now consider what makes AI agents useful: they remember context. When a user asks a question, then follows up with "What about the deadline?", the agent understands "the deadline" refers to something from the previous exchange. Without conversation history, every interaction starts from zero.
This is the core use case for actors in AI systems. Each user session needs its own isolated state—their conversation history, preferences, and context. A traditional approach might use Redis keys like chat:user123:history, but you'd manage concurrency, serialization, and cleanup yourself. With Dapr actors, each chat session IS an actor instance. The runtime handles activation, state persistence, and concurrent access. You focus on conversation logic.
In this lesson, you'll build a ChatActor that maintains conversation history and publishes events when conversations update. By the end, you'll have a pattern for any stateful agent that needs to remember context across interactions.
The ChatActor Pattern
A chat session maps naturally to an actor:
- One actor instance per user session —
ActorId("user-alice")andActorId("user-bob")are separate instances - State is conversation history — A list of messages with roles (user, assistant)
- Methods handle interactions —
process_messageadds to history and returns response - Events notify other services — Pub/sub announces conversation updates
User "alice" sends message
|
v
+-------------------+
| ChatActor |
| ID: "alice" |
+-------------------+
| State: |
| - history: [...] | <-- Persisted in Redis
+-------------------+
| Methods: |
| - process_message |
| - get_history |
+-------------------+
|
| publishes "ConversationUpdated"
v
+-------------------+
| Pub/Sub Topic | <-- Other services react
| "user-chat" |
+-------------------+
Why not just store history in a regular state store key? Three reasons:
| Challenge | Regular State | Actor State |
|---|---|---|
| Concurrent updates | Manual locking or ETags | Turn-based concurrency (automatic) |
| Lifecycle management | Manual cleanup | Automatic garbage collection |
| Method invocation | HTTP to your service | Actor method calls routed by Dapr |
Define the Actor Interface
Start with the interface that defines what your ChatActor can do:
from dapr.actor import ActorInterface, actormethod
from pydantic import BaseModel
class Message(BaseModel):
"""A single message in the conversation."""
role: str # "user" or "assistant"
content: str
class ChatAgentInterface(ActorInterface):
"""Interface for the ChatAgent actor."""
@actormethod(name="ProcessMessage")
async def process_message(self, user_input: Message) -> Message | None:
"""Process a user message and return assistant response."""
...
@actormethod(name="GetConversationHistory")
async def get_conversation_history(self) -> list[dict] | None:
"""Retrieve the full conversation history."""
...
Output:
(No runtime output - this defines the interface contract)
The @actormethod decorator with the name parameter is critical. The name you provide (like "ProcessMessage") is the method name used in actor invocation. Python's snake_case method name (process_message) is your local implementation, but external callers use the decorator name.
Implement the ChatActor
Now implement the actor with state management and pub/sub integration:
import logging
import json
from dapr.actor import Actor
from dapr.clients import DaprClient
class ChatAgent(Actor, ChatAgentInterface):
"""Actor that maintains conversation history for a single user."""
def __init__(self, ctx, actor_id):
super().__init__(ctx, actor_id)
self._history_key = f"history-{actor_id.id}"
self._actor_id = actor_id
async def _on_activate(self) -> None:
"""Initialize state when actor activates."""
logging.info(f"Activating ChatAgent for {self._history_key}")
try:
history = await self._state_manager.get_state(self._history_key)
if history is None:
logging.info(f"Initializing empty history for {self._history_key}")
await self._state_manager.set_state(self._history_key, [])
except Exception as e:
logging.warning(f"Activation error for {self._history_key}: {e}")
await self._state_manager.set_state(self._history_key, [])
async def process_message(self, user_input: Message) -> Message:
"""Process user message, update history, publish event, return response."""
# Validate input (important for data from external callers)
user_input = Message.model_validate(user_input)
# Load current history
history = await self._state_manager.get_state(self._history_key)
current_history = history if isinstance(history, list) else []
# Add user message to history
current_history.append({"role": "user", "content": user_input.content})
# Generate response (static for now - you'll add LLM later)
response = Message(
role="assistant",
content=f"Received your message: {user_input.content}"
)
# Add assistant response to history
current_history.append(response.model_dump())
# Limit history to last 10 exchanges (20 messages)
if len(current_history) > 20:
current_history = current_history[-20:]
# Persist updated history
await self._state_manager.set_state(self._history_key, current_history)
logging.info(f"Updated history for {self._history_key}: {len(current_history)} messages")
# Publish conversation event
await self._publish_conversation_event(user_input, response)
return response.model_dump()
async def get_conversation_history(self) -> list[dict]:
"""Return the full conversation history."""
history = await self._state_manager.get_state(self._history_key)
return history if isinstance(history, list) else []
async def _publish_conversation_event(self, user_input: Message, response: Message) -> None:
"""Publish ConversationUpdated event to pub/sub topic."""
event_data = {
"actor_id": self._actor_id.id,
"actor_type": "ChatAgent",
"event_type": "ConversationUpdated",
"input": user_input.model_dump(),
"output": response.model_dump()
}
with DaprClient() as client:
try:
client.publish_event(
pubsub_name="daca-pubsub",
topic_name="user-chat",
data=json.dumps(event_data)
)
logging.info(f"Published ConversationUpdated for {self._actor_id.id}")
except Exception as e:
logging.error(f"Failed to publish event: {e}")
Output:
(No runtime output - this is the actor implementation)
Key implementation details:
| Pattern | Purpose |
|---|---|
_history_key = f"history-{actor_id.id}" | State key includes actor ID for isolation |
Message.model_validate(user_input) | Validates incoming data from ActorProxy |
if len(current_history) > 20 | Prevents unbounded history growth |
DaprClient() context manager | Clean resource management for pub/sub |
Register Actor and Create Endpoints
Wire up the actor with FastAPI:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from dapr.ext.fastapi import DaprActor
from dapr.actor import ActorProxy, ActorId
import logging
import json
logging.basicConfig(level=logging.INFO)
app = FastAPI(title="ChatAgentService", description="Chat Actor Demo")
# Add Dapr Actor extension
actor = DaprActor(app)
class Message(BaseModel):
role: str
content: str
# Register actor on startup
@app.on_event("startup")
async def startup():
await actor.register_actor(ChatAgent)
logging.info("Registered ChatAgent actor")
# REST endpoint to send a message
@app.post("/chat/{actor_id}")
async def send_message(actor_id: str, data: Message):
"""Send a message to a user's chat session."""
if not data.content or not isinstance(data.content, str):
raise HTTPException(status_code=400, detail="Invalid message content")
proxy = ActorProxy.create("ChatAgent", ActorId(actor_id), ChatAgentInterface)
response = await proxy.ProcessMessage(data.model_dump())
return {"response": response}
# REST endpoint to get conversation history
@app.get("/chat/{actor_id}/history")
async def get_history(actor_id: str):
"""Get conversation history for a user's chat session."""
proxy = ActorProxy.create("ChatAgent", ActorId(actor_id), ChatAgentInterface)
history = await proxy.GetConversationHistory()
return {"history": history}
# Subscription endpoint for conversation events
@app.post("/subscribe")
async def handle_conversation_event(data: dict):
"""Handle ConversationUpdated events from pub/sub."""
try:
logging.info(f"Received event: {data}")
event_data_raw = data.get("data", "{}")
event_data = json.loads(event_data_raw)
user_id = event_data.get("actor_id", "unknown")
input_msg = event_data.get("input", {}).get("content", "no message")
output_msg = event_data.get("output", {}).get("content", "no response")
logging.info(f"User {user_id}: '{input_msg}' -> '{output_msg}'")
return {"status": "processed"}
except json.JSONDecodeError as e:
logging.error(f"Invalid event data: {e}")
return {"status": "error", "message": "Invalid JSON"}
Output:
INFO: Registered ChatAgent actor
Configure Pub/Sub Subscription
Create a subscription that routes conversation events to your endpoint:
# components/message-subscription.yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: chat-subscription
spec:
pubsubname: daca-pubsub
topic: user-chat
routes:
default: /subscribe
Apply to your cluster:
kubectl apply -f components/message-subscription.yaml
Output:
subscription.dapr.io/chat-subscription created
Test the ChatActor
With the service running (tilt up or kubectl apply), test the conversation flow:
# Start a conversation as user "alice"
curl -X POST http://localhost:8000/chat/alice \
-H "Content-Type: application/json" \
-d '{"role": "user", "content": "Hello, I need help with my project"}'
Output:
{
"response": {
"role": "assistant",
"content": "Received your message: Hello, I need help with my project"
}
}
# Continue the conversation
curl -X POST http://localhost:8000/chat/alice \
-H "Content-Type: application/json" \
-d '{"role": "user", "content": "What about the deadline?"}'
Output:
{
"response": {
"role": "assistant",
"content": "Received your message: What about the deadline?"
}
}
# Check history (both messages are there)
curl http://localhost:8000/chat/alice/history
Output:
{
"history": [
{"role": "user", "content": "Hello, I need help with my project"},
{"role": "assistant", "content": "Received your message: Hello, I need help with my project"},
{"role": "user", "content": "What about the deadline?"},
{"role": "assistant", "content": "Received your message: What about the deadline?"}
]
}
# Start a separate conversation as user "bob"
curl -X POST http://localhost:8000/chat/bob \
-H "Content-Type: application/json" \
-d '{"role": "user", "content": "Different conversation entirely"}'
# Check bob's history - only his message
curl http://localhost:8000/chat/bob/history
Output:
{
"history": [
{"role": "user", "content": "Different conversation entirely"},
{"role": "assistant", "content": "Received your message: Different conversation entirely"}
]
}
Alice and Bob have completely separate conversation histories. This is the power of actor IDs—natural isolation without explicit partitioning logic.
Actor ID as Session Identifier
The actor ID pattern enables various session strategies:
| Strategy | Actor ID Format | Use Case |
|---|---|---|
| Per-user | user-{user_id} | Long-running user assistant |
| Per-conversation | conv-{uuid} | Multiple conversations per user |
| Per-session | session-{token} | Temporary, anonymous chats |
| Per-channel | channel-{platform}-{id} | Multi-platform support |
For production AI agents, you might combine these:
# Per-user with conversation threading
actor_id = f"user-{user_id}-conv-{conversation_id}"
# Multi-tenant with user isolation
actor_id = f"tenant-{tenant_id}-user-{user_id}"
Event-Driven Extensions
The pub/sub integration opens powerful patterns. Other services can subscribe to user-chat events for:
| Subscriber | Reaction to ConversationUpdated |
|---|---|
| Analytics Service | Track conversation metrics |
| Audit Service | Log all interactions for compliance |
| Notification Service | Alert on specific keywords |
| LLM Router | Forward to different models based on content |
| Context Service | Build user profiles from conversations |
The ChatActor doesn't know these services exist. It publishes facts ("conversation updated"). Subscribers decide what to do with those facts.
Reflect on Your Skill
You extended your dapr-deployment skill in L00. Does it include ChatActor patterns now?
Test Your Skill
Using my dapr-deployment skill, generate a ChatActor that:
1. Stores conversation history with a configurable limit
2. Publishes events on each message
3. Has proper error handling in _on_activate
Does my skill produce correct actor interface and implementation?
Identify Gaps
Ask yourself:
- Does my skill explain why actor IDs provide session isolation?
- Does it include the pub/sub integration pattern from within actors?
- Does it handle the Pydantic validation for incoming messages?
Improve Your Skill
If you found gaps:
Update my dapr-deployment skill to include:
- ChatActor pattern with conversation history
- DaprClient pub/sub from within actor methods
- Actor ID strategies for different session patterns
- History size limiting to prevent unbounded growth
Try With AI
Prompt 1: Design Your Agent's State
I'm building an AI agent that [describe your use case, e.g., "helps users
plan travel itineraries"]. What state should my actor store besides
basic conversation history?
Consider:
- What context persists across messages?
- What preferences should the actor remember?
- What's the right history limit for my use case?
- Should I store summaries instead of raw history?
What you're learning: Conversation history is just the starting point. Real agents need domain-specific state—preferences, intermediate results, user profiles. AI helps you design state that matches your use case.
Prompt 2: Add LLM Integration
Take this ChatActor's process_message method and help me integrate an
actual LLM call. I want to:
1. Send conversation history as context
2. Get a real response from [Claude/GPT/Gemini]
3. Handle rate limits and errors gracefully
4. Keep the response time reasonable
Show me what changes and what stays the same.
What you're learning: The static response in this lesson is a placeholder. Real agents call LLMs. AI shows you how to integrate while preserving the actor pattern—state management and event publishing remain unchanged.
Prompt 3: Debug Actor State Issues
My ChatActor seems to lose history between calls. Help me debug:
1. Is the state store component configured correctly?
2. Am I using the right state key pattern?
3. Could the actor be deactivating and losing in-memory state?
4. How do I verify state is actually persisted in Redis?
Show me diagnostic commands and code to trace the issue.
What you're learning: State issues in actors usually stem from configuration (wrong store name), key patterns (not unique per actor), or misunderstanding lifecycle (memory vs persisted state). AI helps you systematically diagnose.
Safety Note
When building chat actors for production, remember that conversation history may contain sensitive information. Consider encryption at rest, retention policies, and GDPR compliance requirements. Never log full conversation content in production—use sanitized summaries for debugging.