Actor State Management
Your actors work, but what happens when they go idle? The Dapr runtime garbage-collects inactive actors to save memory. When a user returns hours later, their ChatActor needs to remember the conversation history—not start fresh.
This is where virtual actors shine. Unlike traditional objects that lose state when destroyed, Dapr actors automatically persist state to a configured store. When an actor reactivates, it recovers exactly where it left off. You don't write checkpointing code—Dapr handles it.
In this lesson, you'll master the StateManager API for reading and writing actor state, implement lifecycle hooks that run during activation and deactivation, and understand why turn-based concurrency guarantees your state is always consistent—without a single lock or mutex in your code.
The StateManager API
Every Dapr actor has access to self._state_manager, which provides four core methods for state operations:
| Method | Purpose | Behavior |
|---|---|---|
get_state(key) | Retrieve state | Raises exception if key not found |
try_get_state(key) | Retrieve state safely | Returns tuple (found: bool, value) |
set_state(key, value) | Set state in memory | Does NOT persist immediately |
save_state() | Persist all changes | Flushes to state store |
The distinction between get_state and try_get_state matters for initialization patterns. Let's see both in action.
Basic State Operations
from dapr.actor import Actor, ActorInterface, actormethod
from dapr.actor.runtime.context import ActorRuntimeContext
from datetime import datetime
class TaskActorInterface(ActorInterface):
@actormethod(name="GetTask")
async def get_task(self) -> dict: ...
@actormethod(name="UpdateStatus")
async def update_status(self, status: str) -> None: ...
class TaskActor(Actor, TaskActorInterface):
def __init__(self, ctx: ActorRuntimeContext, actor_id: str):
super().__init__(ctx, actor_id)
async def get_task(self) -> dict:
"""Retrieve task state."""
# get_state raises if key doesn't exist
task_data = await self._state_manager.get_state("task_data")
return {"id": self.id.id, **task_data}
async def update_status(self, status: str) -> None:
"""Update task status."""
# Get current state
task_data = await self._state_manager.get_state("task_data")
# Modify state
task_data["status"] = status
task_data["updated_at"] = datetime.utcnow().isoformat()
# Set state in memory
await self._state_manager.set_state("task_data", task_data)
# Persist to state store
await self._state_manager.save_state()
Output (calling update_status):
# State store now contains:
{
"task_data": {
"status": "in_progress",
"updated_at": "2025-01-15T10:30:00.000000"
}
}
The try_get_state Pattern
Using get_state on a missing key raises an exception. For initialization, use try_get_state which returns a tuple indicating whether the key exists:
async def _on_activate(self) -> None:
"""Called when actor activates. Initialize state if needed."""
# try_get_state returns (found: bool, value)
found, existing_state = await self._state_manager.try_get_state("task_data")
if not found:
# First activation - initialize default state
initial_state = {
"status": "pending",
"created_at": datetime.utcnow().isoformat(),
"history": []
}
await self._state_manager.set_state("task_data", initial_state)
await self._state_manager.save_state()
print(f"TaskActor {self.id.id}: Initialized new state")
else:
print(f"TaskActor {self.id.id}: Recovered existing state")
Output (first activation):
TaskActor task-123: Initialized new state
Output (subsequent activation after deactivation):
TaskActor task-123: Recovered existing state
This pattern ensures you don't accidentally overwrite existing state when an actor reactivates after garbage collection.
Lifecycle Hooks: _on_activate and _on_deactivate
Dapr actors provide two lifecycle hooks that run automatically:
_on_activate: Initialization on Demand
_on_activate runs when an actor receives its first message after being created or garbage-collected. Use it for:
- Initializing default state (if none exists)
- Loading cached data from external services
- Setting up connections or resources the actor needs
async def _on_activate(self) -> None:
"""
Called automatically when:
1. Actor receives first-ever message (new actor)
2. Actor receives message after being garbage-collected (reactivation)
"""
found, state = await self._state_manager.try_get_state("task_data")
if not found:
# New actor - set defaults
await self._state_manager.set_state("task_data", {
"status": "pending",
"created_at": datetime.utcnow().isoformat()
})
await self._state_manager.save_state()
# Log activation for debugging
print(f"[LIFECYCLE] Actor {self.id.id} activated at {datetime.utcnow()}")
Output:
[LIFECYCLE] Actor task-123 activated at 2025-01-15 10:30:00.123456
_on_deactivate: Cleanup Before Garbage Collection
_on_deactivate runs when Dapr is about to garbage-collect an idle actor. Use it for:
- Ensuring final state is saved
- Releasing external resources
- Logging for debugging actor lifecycle
async def _on_deactivate(self) -> None:
"""
Called automatically when actor is about to be garbage-collected.
The actor has been idle longer than the configured timeout.
"""
# Ensure any pending state changes are saved
await self._state_manager.save_state()
# Log deactivation for debugging
print(f"[LIFECYCLE] Actor {self.id.id} deactivating at {datetime.utcnow()}")
Output (after idle timeout):
[LIFECYCLE] Actor task-123 deactivating at 2025-01-15 11:00:00.456789
Complete Actor with Lifecycle Hooks
Here's a complete TaskActor implementation with both hooks:
from dapr.actor import Actor, ActorInterface, actormethod
from dapr.actor.runtime.context import ActorRuntimeContext
from datetime import datetime
from typing import Optional
class TaskActorInterface(ActorInterface):
@actormethod(name="GetTask")
async def get_task(self) -> dict: ...
@actormethod(name="UpdateStatus")
async def update_status(self, status: str) -> None: ...
@actormethod(name="AddHistoryEntry")
async def add_history_entry(self, entry: str) -> None: ...
class TaskActor(Actor, TaskActorInterface):
def __init__(self, ctx: ActorRuntimeContext, actor_id: str):
super().__init__(ctx, actor_id)
async def _on_activate(self) -> None:
"""Initialize or recover state on activation."""
found, _ = await self._state_manager.try_get_state("task_data")
if not found:
initial_state = {
"status": "pending",
"created_at": datetime.utcnow().isoformat(),
"updated_at": None,
"history": []
}
await self._state_manager.set_state("task_data", initial_state)
await self._state_manager.save_state()
print(f"[ACTIVATE] {self.id.id}: Created new task")
else:
print(f"[ACTIVATE] {self.id.id}: Recovered existing task")
async def _on_deactivate(self) -> None:
"""Ensure state is persisted before garbage collection."""
await self._state_manager.save_state()
print(f"[DEACTIVATE] {self.id.id}: State saved, going idle")
async def get_task(self) -> dict:
"""Get current task state."""
task_data = await self._state_manager.get_state("task_data")
return {"id": self.id.id, **task_data}
async def update_status(self, status: str) -> None:
"""Update task status with timestamp."""
task_data = await self._state_manager.get_state("task_data")
task_data["status"] = status
task_data["updated_at"] = datetime.utcnow().isoformat()
await self._state_manager.set_state("task_data", task_data)
await self._state_manager.save_state()
async def add_history_entry(self, entry: str) -> None:
"""Append to task history."""
task_data = await self._state_manager.get_state("task_data")
task_data["history"].append({
"entry": entry,
"timestamp": datetime.utcnow().isoformat()
})
await self._state_manager.set_state("task_data", task_data)
await self._state_manager.save_state()
Output (full lifecycle):
[ACTIVATE] task-123: Created new task
# ... actor processes messages ...
[DEACTIVATE] task-123: State saved, going idle
# ... time passes, new request arrives ...
[ACTIVATE] task-123: Recovered existing task
Turn-Based Concurrency: Safety Without Locks
Traditional concurrent programming requires locks, mutexes, or semaphores to prevent race conditions. Dapr actors eliminate this complexity through turn-based concurrency—each actor processes exactly one message at a time.
How It Works
Actor: task-123
Incoming Messages Actor Processing
───────────────── ────────────────
┌─────────────────┐
│ UpdateStatus() │ ◄─ Request 1 ┌─────────────┐
└────────┬────────┘ │ Processing │
│ │ Request 1 │
┌────────▼────────┐ └──────┬──────┘
│ Queue Slot 1 │ │
└─────────────────┘ │
┌─────────────────┐ │
│ AddHistory() │ ◄─ Request 2 (waits) │
└────────┬────────┘ │
│ │
┌────────▼────────┐ ┌──────▼──────┐
│ Queue Slot 2 │ │ Processing │
└─────────────────┘ │ Request 2 │
┌─────────────────┐ └──────┬──────┘
│ GetTask() │ ◄─ Request 3 (waits) │
└────────┬────────┘ │
│ │
┌────────▼────────┐ ┌──────▼──────┐
│ Queue Slot 3 │ │ Processing │
└─────────────────┘ │ Request 3 │
└─────────────┘
Each actor instance has its own message queue. The Dapr runtime guarantees:
- Single-threaded execution: Only one method runs at a time per actor
- Complete turns: A method fully completes (including awaits) before the next starts
- Isolated state: No other actor or process can access this actor's state during a turn
Why This Matters for State Safety
Consider this scenario without turn-based concurrency:
Thread 1: read balance → 100
Thread 2: read balance → 100
Thread 1: balance = 100 - 50 → write 50
Thread 2: balance = 100 - 30 → write 70 # Lost update! Should be 20
With Dapr actors, this race condition is impossible:
Request 1: read balance → 100, write 50 → COMPLETE
Request 2: read balance → 50, write 20 → COMPLETE
Requests queue up and execute sequentially. You never worry about concurrent modifications.
Code Example: Concurrent-Safe Counter
class CounterActorInterface(ActorInterface):
@actormethod(name="Increment")
async def increment(self) -> int: ...
@actormethod(name="GetCount")
async def get_count(self) -> int: ...
class CounterActor(Actor, CounterActorInterface):
async def _on_activate(self) -> None:
found, _ = await self._state_manager.try_get_state("count")
if not found:
await self._state_manager.set_state("count", 0)
await self._state_manager.save_state()
async def increment(self) -> int:
"""
This is safe without locks!
Only one increment() runs at a time for this actor.
"""
count = await self._state_manager.get_state("count")
count += 1
await self._state_manager.set_state("count", count)
await self._state_manager.save_state()
return count
async def get_count(self) -> int:
return await self._state_manager.get_state("count")
Output (100 concurrent calls to increment):
Final count: 100 # Always correct, no race conditions
Demonstrating State Persistence
Let's prove that state survives actor deactivation. This test:
- Creates an actor and sets state
- Forces deactivation by waiting for idle timeout
- Reactivates the actor
- Verifies state was recovered
Test Script
import asyncio
from dapr.actor import ActorProxy, ActorId
from task_actor import TaskActorInterface
async def test_state_persistence():
actor_id = ActorId("test-persistence")
# Step 1: Create actor and set initial state
print("Step 1: Creating actor and setting state...")
proxy = ActorProxy.create("TaskActor", actor_id, TaskActorInterface)
await proxy.UpdateStatus("step-1-complete")
await proxy.AddHistoryEntry("Created via test script")
task = await proxy.GetTask()
print(f" Status: {task['status']}")
print(f" History entries: {len(task['history'])}")
# Step 2: Wait for actor to deactivate
# (In production, idle timeout is configurable; for testing, use short timeout)
print("\nStep 2: Waiting for actor to deactivate (idle timeout)...")
await asyncio.sleep(65) # Assuming 60-second idle timeout
# Step 3: Reactivate by sending new request
print("\nStep 3: Reactivating actor...")
proxy2 = ActorProxy.create("TaskActor", actor_id, TaskActorInterface)
recovered_task = await proxy2.GetTask()
# Step 4: Verify state persisted
print("\nStep 4: Verifying state persistence...")
print(f" Status: {recovered_task['status']}")
print(f" History entries: {len(recovered_task['history'])}")
assert recovered_task["status"] == "step-1-complete", "Status lost!"
assert len(recovered_task["history"]) == 1, "History lost!"
print("\n✓ State persistence verified!")
if __name__ == "__main__":
asyncio.run(test_state_persistence())
Output:
Step 1: Creating actor and setting state...
[ACTIVATE] test-persistence: Created new task
Status: step-1-complete
History entries: 1
Step 2: Waiting for actor to deactivate (idle timeout)...
[DEACTIVATE] test-persistence: State saved, going idle
Step 3: Reactivating actor...
[ACTIVATE] test-persistence: Recovered existing task
Step 4: Verifying state persistence...
Status: step-1-complete
History entries: 1
✓ State persistence verified!
The actor was garbage-collected after being idle, but when we invoked it again, Dapr:
- Created a new in-memory actor instance
- Called
_on_activatewhich recovered state from Redis - Resumed processing with all previous state intact
State Key Naming Patterns
For actors with multiple state values, use consistent key naming:
class ChatActor(Actor, ChatActorInterface):
"""Actor storing multiple pieces of state."""
async def _on_activate(self) -> None:
# Use descriptive, predictable key names
keys = ["messages", "preferences", "metadata"]
for key in keys:
found, _ = await self._state_manager.try_get_state(key)
if not found:
default = self._get_default_for_key(key)
await self._state_manager.set_state(key, default)
await self._state_manager.save_state()
def _get_default_for_key(self, key: str) -> dict:
defaults = {
"messages": [],
"preferences": {"language": "en", "timezone": "UTC"},
"metadata": {"created_at": datetime.utcnow().isoformat()}
}
return defaults.get(key, {})
async def get_messages(self) -> list:
return await self._state_manager.get_state("messages")
async def get_preferences(self) -> dict:
return await self._state_manager.get_state("preferences")
| Pattern | When to Use |
|---|---|
Single key (task_data) | Simple actors with unified state |
Multiple keys (messages, preferences) | Complex actors with distinct state domains |
Prefixed keys (chat_messages, chat_prefs) | When state might be inspected in Redis directly |
Reflect on Your Skill
Your dapr-deployment skill from Chapter 53 handles state management. Now extend it with actor-specific patterns.
Test Your Skill
Using my dapr-deployment skill, explain the difference between:
1. Dapr state API (DaprClient.save_state)
2. Actor state API (self._state_manager.set_state)
When should I use each?
Identify Gaps
Ask yourself:
- Does my skill explain the
try_get_statepattern for safe initialization? - Does it include lifecycle hook examples?
- Can it explain turn-based concurrency?
Improve Your Skill
If you found gaps:
Update my dapr-deployment skill to include actor state management:
- StateManager API (get_state, set_state, try_get_state, save_state)
- _on_activate pattern for initializing state only when needed
- _on_deactivate pattern for ensuring state persistence
- Explanation of turn-based concurrency guarantees
Try With AI
Implement State Recovery Pattern
Help me implement a ChatActor with proper state recovery. I need:
1. _on_activate that initializes conversation_history only if it doesn't exist
2. A process_message method that appends to history and saves
3. A get_history method that returns all messages
Show me the complete actor implementation.
What you're learning: The try_get_state pattern is essential for idempotent activation. Without it, you'd overwrite existing state every time an actor reactivates—losing all accumulated conversation history.
Trace a Concurrent Scenario
Walk me through what happens when three requests arrive simultaneously for the same TaskActor:
1. UpdateStatus("in_progress")
2. AddHistoryEntry("Started work")
3. GetTask()
Explain the order of execution and why no locks are needed.
What you're learning: Turn-based concurrency means Dapr queues requests and processes them one at a time. You never have two methods executing simultaneously on the same actor instance—this is fundamentally different from thread-safe programming with locks.
Debug State Loss
My actor state seems to disappear between requests. Help me debug:
1. What could cause state to be lost?
2. How do I verify state is being saved to Redis?
3. What's the difference between set_state and save_state?
I'm not calling save_state explicitly—is that the problem?
What you're learning: set_state modifies in-memory state but doesn't persist. You must call save_state() to flush changes to the state store. Dapr automatically calls save_state() after each method completes, but if your method crashes before returning, unsaved state is lost.
Safety note: Actor state is stored in Redis by default. In production, ensure your state store component uses actorStateStore: "true" and that Redis (or your chosen backend) is configured with appropriate persistence settings. State store failures mean actor state loss—monitor your infrastructure.