Capstone: Stateful Task Agent with Workflows
You've traveled through 18 lessons of actors and workflows. You understand turn-based concurrency, durable execution, reminders that survive restarts, and saga patterns that compensate on failure. Now it's time to combine everything into a complete system.
This capstone isn't just an exercise. It's a Digital FTE blueprint—the kind of stateful agent system you could package and sell to clients who need intelligent task management with conversation capabilities.
By the end of this lesson, you'll have a working system where:
- TaskActor manages individual task state with deadline reminders
- ChatActor maintains conversation context per user
- TaskProcessingWorkflow orchestrates task lifecycle with saga compensation
- All components deploy to Docker Desktop Kubernetes with Dapr sidecars
This is the culmination of Chapter 59. Everything you've learned converges here.
System Architecture
Before writing code, let's understand what we're building:
┌────────────────────────────────────────────────────────────────────────┐
│ Stateful Task Agent System │
├────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ TaskActor │ │ ChatActor │ │ Workflow │ │
│ │ (per task) │ │ (per user) │ │ Runtime │ │
│ ├─────────────────┤ ├─────────────────┤ ├─────────────────┤ │
│ │ - status │ │ - history[] │ │ validate_task │ │
│ │ - assignee │ │ - context │ │ assign_task │ │
│ │ - deadline │ │ - preferences │ │ notify_assignee │ │
│ │ - reminder │ │ │ │ (saga compen.) │ │
│ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │ │
│ └───────────────────────┼────────────────────────┘ │
│ │ │
│ ┌───────────────────────┴────────────────────────┐ │
│ │ Dapr Building Blocks │ │
│ ├─────────────────────────────────────────────────┤ │
│ │ State Store (Redis) │ Pub/Sub │ Placement │ │
│ └─────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────┘
Component Responsibilities
| Component | Responsibility | Building Block |
|---|---|---|
| TaskActor | Stateful task entity with deadline reminders | Actors + Reminders |
| ChatActor | Conversation state per user with event publishing | Actors + Pub/Sub |
| TaskProcessingWorkflow | Orchestrate task lifecycle with rollback | Workflows + Saga |
| Redis | Persist actor state and workflow history | State Store |
The Specification (Spec-First)
Before implementation, write the specification. This is Layer 4—spec-driven development:
# TaskAgent System Specification
## Intent
Build a stateful task management system that combines:
- Per-task entity state (TaskActor) with deadline reminders
- Per-user conversation context (ChatActor) for AI agent integration
- Durable task processing workflow with saga compensation
## Success Criteria
1. TaskActor persists state across deactivation/reactivation cycles
2. Deadline reminder fires and updates task status to "overdue"
3. ChatActor maintains conversation history per user
4. TaskProcessingWorkflow survives pod restart and resumes correctly
5. Saga compensation rolls back assignment on notification failure
6. System deploys to Docker Desktop Kubernetes with Dapr sidecars
## Non-Goals
- Production-grade security (covered in L18, not repeated here)
- Multi-tenant isolation (extension exercise)
- Cross-app workflows (extension exercise)
## Architecture Constraints
- Use Redis for state store (actorStateStore: "true")
- Workflow activities call actors through ActorProxy
- Single namespace (default) for capstone simplicity
TaskActor Implementation
The TaskActor manages individual task state with deadline reminders.
Create task_actor_service/actors/task_actor.py:
from datetime import datetime, timedelta
from dapr.actor import Actor, ActorInterface, actormethod
from dapr.actor.runtime.context import ActorRuntimeContext
class TaskActorInterface(ActorInterface):
@actormethod(name="GetTask")
async def get_task(self) -> dict | None: ...
@actormethod(name="CreateTask")
async def create_task(self, task_data: dict) -> dict: ...
@actormethod(name="UpdateStatus")
async def update_status(self, status: str) -> dict: ...
@actormethod(name="AssignTask")
async def assign_task(self, assignee: str) -> dict: ...
@actormethod(name="SetDeadlineReminder")
async def set_deadline_reminder(self, deadline_seconds: int) -> None: ...
class TaskActor(Actor, TaskActorInterface):
"""Stateful task entity with deadline reminder support."""
def __init__(self, ctx: ActorRuntimeContext, actor_id: str):
super().__init__(ctx, actor_id)
self._state_key = f"task-{actor_id}"
async def _on_activate(self) -> None:
"""Called when actor is activated."""
print(f"TaskActor {self.id.id} activated")
found, _ = await self._state_manager.try_get_state(self._state_key)
if not found:
# Initialize empty state on first activation
await self._state_manager.set_state(self._state_key, {
"task_id": self.id.id,
"status": "uninitialized",
"created_at": None,
"updated_at": None
})
async def _on_deactivate(self) -> None:
"""Called when actor is deactivated (garbage collected)."""
print(f"TaskActor {self.id.id} deactivated")
async def get_task(self) -> dict | None:
"""Get current task state."""
found, state = await self._state_manager.try_get_state(self._state_key)
if not found or state.get("status") == "uninitialized":
return None
return state
async def create_task(self, task_data: dict) -> dict:
"""Initialize task with provided data."""
now = datetime.utcnow().isoformat()
state = {
"task_id": self.id.id,
"title": task_data.get("title", "Untitled"),
"description": task_data.get("description", ""),
"status": "pending",
"assignee": None,
"deadline": task_data.get("deadline"),
"created_at": now,
"updated_at": now
}
await self._state_manager.set_state(self._state_key, state)
print(f"TaskActor {self.id.id}: Created task '{state['title']}'")
return state
async def update_status(self, status: str) -> dict:
"""Update task status with turn-based concurrency guarantee."""
state = await self._state_manager.get_state(self._state_key)
state["status"] = status
state["updated_at"] = datetime.utcnow().isoformat()
await self._state_manager.set_state(self._state_key, state)
print(f"TaskActor {self.id.id}: Status updated to '{status}'")
return state
async def assign_task(self, assignee: str) -> dict:
"""Assign task to user."""
state = await self._state_manager.get_state(self._state_key)
state["assignee"] = assignee
state["status"] = "assigned"
state["updated_at"] = datetime.utcnow().isoformat()
await self._state_manager.set_state(self._state_key, state)
print(f"TaskActor {self.id.id}: Assigned to '{assignee}'")
return state
async def set_deadline_reminder(self, deadline_seconds: int) -> None:
"""Register deadline reminder that survives restarts."""
await self.register_reminder(
reminder_name="deadline_reminder",
state=b'{"action": "mark_overdue"}',
due_time=timedelta(seconds=deadline_seconds),
period=timedelta(seconds=0) # One-time reminder
)
print(f"TaskActor {self.id.id}: Deadline reminder set for {deadline_seconds}s")
async def receive_reminder(
self,
name: str,
state: bytes,
due_time: timedelta,
period: timedelta
) -> None:
"""Callback when reminder fires - marks task overdue."""
if name == "deadline_reminder":
print(f"TaskActor {self.id.id}: Deadline reminder fired!")
await self.update_status("overdue")
Output:
TaskActor task-001 activated
TaskActor task-001: Created task 'Review PR #42'
TaskActor task-001: Assigned to 'alice@example.com'
TaskActor task-001: Deadline reminder set for 30s
# ... 30 seconds later ...
TaskActor task-001: Deadline reminder fired!
TaskActor task-001: Status updated to 'overdue'
ChatActor Implementation
The ChatActor maintains conversation history per user.
Create task_actor_service/actors/chat_actor.py:
from datetime import datetime
from dapr.actor import Actor, ActorInterface, actormethod
from dapr.actor.runtime.context import ActorRuntimeContext
from dapr.clients import DaprClient
import json
class ChatActorInterface(ActorInterface):
@actormethod(name="ProcessMessage")
async def process_message(self, message: dict) -> dict: ...
@actormethod(name="GetHistory")
async def get_history(self) -> list[dict]: ...
@actormethod(name="ClearHistory")
async def clear_history(self) -> None: ...
class ChatActor(Actor, ChatActorInterface):
"""Per-user conversation context for AI agent integration."""
MAX_HISTORY = 10 # Keep last 10 exchanges
def __init__(self, ctx: ActorRuntimeContext, actor_id: str):
super().__init__(ctx, actor_id)
self._history_key = f"chat-history-{actor_id}"
async def _on_activate(self) -> None:
"""Initialize empty history on first activation."""
print(f"ChatActor {self.id.id} activated")
found, _ = await self._state_manager.try_get_state(self._history_key)
if not found:
await self._state_manager.set_state(self._history_key, [])
async def process_message(self, message: dict) -> dict:
"""Process user message and generate response."""
history = await self._state_manager.get_state(self._history_key)
# Add user message to history
user_entry = {
"role": "user",
"content": message.get("content", ""),
"timestamp": datetime.utcnow().isoformat()
}
history.append(user_entry)
# Generate response (in production, call AI service)
response_content = self._generate_response(message.get("content", ""), history)
assistant_entry = {
"role": "assistant",
"content": response_content,
"timestamp": datetime.utcnow().isoformat()
}
history.append(assistant_entry)
# Trim to max history
if len(history) > self.MAX_HISTORY * 2:
history = history[-(self.MAX_HISTORY * 2):]
await self._state_manager.set_state(self._history_key, history)
# Publish conversation event
await self._publish_conversation_event(user_entry, assistant_entry)
print(f"ChatActor {self.id.id}: Processed message, history size: {len(history)}")
return assistant_entry
async def get_history(self) -> list[dict]:
"""Return conversation history."""
return await self._state_manager.get_state(self._history_key)
async def clear_history(self) -> None:
"""Clear conversation history."""
await self._state_manager.set_state(self._history_key, [])
print(f"ChatActor {self.id.id}: History cleared")
def _generate_response(self, content: str, history: list) -> str:
"""Generate response based on message and context.
In production, this would call Claude/GPT/Gemini API with history context.
"""
if "task" in content.lower():
return f"I can help with tasks. You have {len(history)//2} messages in our conversation."
elif "help" in content.lower():
return "I'm your task management assistant. Ask me about creating, assigning, or tracking tasks."
else:
return f"Understood. I'm tracking our conversation (message #{len(history)//2 + 1})."
async def _publish_conversation_event(self, user_msg: dict, assistant_msg: dict) -> None:
"""Publish conversation event for downstream processing."""
with DaprClient() as client:
client.publish_event(
pubsub_name="pubsub",
topic_name="conversation-events",
data=json.dumps({
"event_type": "conversation.updated",
"user_id": self.id.id,
"user_message": user_msg,
"assistant_message": assistant_msg,
"timestamp": datetime.utcnow().isoformat()
}),
data_content_type="application/json"
)
Output:
ChatActor user-alice activated
ChatActor user-alice: Processed message, history size: 2
ChatActor user-alice: Processed message, history size: 4
Event published: {"event_type": "conversation.updated", "user_id": "user-alice", ...}
TaskProcessingWorkflow with Saga
The workflow orchestrates task processing with compensation.
Create task_workflow_service/workflows/task_workflow.py:
import dapr.ext.workflow as wf
from dataclasses import dataclass
from datetime import timedelta
@dataclass
class TaskProcessingInput:
task_id: str
title: str
description: str
assignee: str
deadline_seconds: int = 3600 # Default 1 hour
@dataclass
class TaskProcessingResult:
task_id: str
status: str
message: str
def task_processing_workflow(
ctx: wf.DaprWorkflowContext,
input_data: TaskProcessingInput
) -> TaskProcessingResult:
"""
Orchestrate task lifecycle with saga compensation.
Steps:
1. Validate task data
2. Create task in TaskActor
3. Assign task to user
4. Set deadline reminder
5. Notify assignee
On failure at step 5: Rollback assignment (compensation)
"""
compensations = []
try:
# Step 1: Validate task data
validation = yield ctx.call_activity(
validate_task_activity,
input={
"task_id": input_data.task_id,
"title": input_data.title,
"description": input_data.description
}
)
if not validation["is_valid"]:
return TaskProcessingResult(
task_id=input_data.task_id,
status="rejected",
message=f"Validation failed: {validation['issues']}"
)
# Step 2: Create task in TaskActor
create_result = yield ctx.call_activity(
create_task_activity,
input={
"task_id": input_data.task_id,
"title": input_data.title,
"description": input_data.description,
"deadline_seconds": input_data.deadline_seconds
}
)
# No compensation needed - task just gets abandoned if we fail later
# Step 3: Assign task to user
assign_result = yield ctx.call_activity(
assign_task_activity,
input={
"task_id": input_data.task_id,
"assignee": input_data.assignee
}
)
# Register compensation: unassign if notification fails
compensations.append(("unassign_task_activity", {
"task_id": input_data.task_id
}))
# Step 4: Set deadline reminder
yield ctx.call_activity(
set_reminder_activity,
input={
"task_id": input_data.task_id,
"deadline_seconds": input_data.deadline_seconds
}
)
# Step 5: Notify assignee (might fail - triggers saga compensation)
yield ctx.call_activity(
notify_assignee_activity,
input={
"task_id": input_data.task_id,
"assignee": input_data.assignee,
"title": input_data.title
},
retry_policy=wf.RetryPolicy(
max_attempts=3,
initial_interval=timedelta(seconds=1),
backoff_coefficient=2.0
)
)
return TaskProcessingResult(
task_id=input_data.task_id,
status="completed",
message=f"Task assigned to {input_data.assignee} with deadline reminder"
)
except Exception as e:
# Saga compensation: rollback in reverse order
for comp_name, comp_data in reversed(compensations):
try:
yield ctx.call_activity(comp_name, input=comp_data)
except Exception as comp_error:
print(f"Compensation failed: {comp_error}")
# Log but continue - best effort compensation
return TaskProcessingResult(
task_id=input_data.task_id,
status="failed",
message=f"Processing failed, compensation applied: {str(e)}"
)
Activity Implementations
Create task_workflow_service/activities/task_activities.py:
from dapr.actor import ActorProxy, ActorId
from datetime import datetime
def validate_task_activity(ctx, data: dict) -> dict:
"""Validate task data meets requirements."""
issues = []
if len(data.get("title", "")) < 5:
issues.append("Title must be at least 5 characters")
if len(data.get("description", "")) < 10:
issues.append("Description must be at least 10 characters")
is_valid = len(issues) == 0
print(f"Activity validate_task: task={data['task_id']}, valid={is_valid}")
return {
"is_valid": is_valid,
"issues": issues,
"validated_at": datetime.utcnow().isoformat()
}
async def create_task_activity(ctx, data: dict) -> dict:
"""Create task in TaskActor."""
from task_actor_service.actors.task_actor import TaskActorInterface
proxy = ActorProxy.create(
"TaskActor",
ActorId(data["task_id"]),
TaskActorInterface
)
result = await proxy.CreateTask({
"title": data["title"],
"description": data["description"],
"deadline": data.get("deadline_seconds")
})
print(f"Activity create_task: task={data['task_id']} created")
return result
async def assign_task_activity(ctx, data: dict) -> dict:
"""Assign task to user via TaskActor."""
from task_actor_service.actors.task_actor import TaskActorInterface
proxy = ActorProxy.create(
"TaskActor",
ActorId(data["task_id"]),
TaskActorInterface
)
result = await proxy.AssignTask(data["assignee"])
print(f"Activity assign_task: task={data['task_id']} -> {data['assignee']}")
return result
async def set_reminder_activity(ctx, data: dict) -> dict:
"""Set deadline reminder on TaskActor."""
from task_actor_service.actors.task_actor import TaskActorInterface
proxy = ActorProxy.create(
"TaskActor",
ActorId(data["task_id"]),
TaskActorInterface
)
await proxy.SetDeadlineReminder(data["deadline_seconds"])
print(f"Activity set_reminder: task={data['task_id']}, {data['deadline_seconds']}s")
return {"reminder_set": True}
def notify_assignee_activity(ctx, data: dict) -> dict:
"""Send notification to assignee.
This might fail (email service down, invalid address, etc.)
Failure triggers saga compensation.
"""
# Simulate occasional failure for demonstration
import random
if random.random() < 0.2: # 20% failure rate
raise Exception(f"Notification service unavailable for {data['assignee']}")
print(f"Activity notify_assignee: Notified {data['assignee']} about '{data['title']}'")
return {
"notified": True,
"assignee": data["assignee"],
"notified_at": datetime.utcnow().isoformat()
}
async def unassign_task_activity(ctx, data: dict) -> dict:
"""Compensation: Remove assignment from task."""
from task_actor_service.actors.task_actor import TaskActorInterface
proxy = ActorProxy.create(
"TaskActor",
ActorId(data["task_id"]),
TaskActorInterface
)
result = await proxy.UpdateStatus("pending") # Reset to pending
print(f"Activity unassign_task (COMPENSATION): task={data['task_id']} reset to pending")
return result
Output (successful flow):
Workflow started: task-processing-task-001
Activity validate_task: task=task-001, valid=True
Activity create_task: task=task-001 created
Activity assign_task: task=task-001 -> alice@example.com
Activity set_reminder: task=task-001, 3600s
Activity notify_assignee: Notified alice@example.com about 'Review PR #42'
Workflow completed: {status: "completed", message: "Task assigned to alice@example.com..."}
Output (failure with compensation):
Workflow started: task-processing-task-002
Activity validate_task: task=task-002, valid=True
Activity create_task: task=task-002 created
Activity assign_task: task=task-002 -> bob@example.com
Activity set_reminder: task=task-002, 3600s
Activity notify_assignee: FAILED - Notification service unavailable
Retry 1 of 3...
Activity notify_assignee: FAILED - Notification service unavailable
Retry 2 of 3...
Activity notify_assignee: FAILED - Notification service unavailable
Executing compensation...
Activity unassign_task (COMPENSATION): task=task-002 reset to pending
Workflow completed: {status: "failed", message: "Processing failed, compensation applied..."}
FastAPI Integration
Wire actors and workflows into a FastAPI application.
Create task_actor_service/main.py:
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from dapr.ext.fastapi import DaprActor, DaprApp
from dapr.actor import ActorProxy, ActorId
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowClient
from pydantic import BaseModel
from actors.task_actor import TaskActor, TaskActorInterface
from actors.chat_actor import ChatActor, ChatActorInterface
from workflows.task_workflow import (
task_processing_workflow,
TaskProcessingInput
)
from activities.task_activities import (
validate_task_activity,
create_task_activity,
assign_task_activity,
set_reminder_activity,
notify_assignee_activity,
unassign_task_activity
)
# Pydantic models for API
class TaskCreate(BaseModel):
task_id: str
title: str
description: str
assignee: str
deadline_seconds: int = 3600
class ChatMessage(BaseModel):
content: str
# Workflow runtime
wfr = WorkflowRuntime()
wfr.register_workflow(task_processing_workflow)
wfr.register_activity(validate_task_activity)
wfr.register_activity(create_task_activity)
wfr.register_activity(assign_task_activity)
wfr.register_activity(set_reminder_activity)
wfr.register_activity(notify_assignee_activity)
wfr.register_activity(unassign_task_activity)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup and shutdown lifecycle."""
# Register actors
await dapr_actor.register_actor(TaskActor)
await dapr_actor.register_actor(ChatActor)
# Start workflow runtime
wfr.start()
print("Task Agent System started")
yield
wfr.shutdown()
print("Task Agent System stopped")
app = FastAPI(title="Task Agent System", lifespan=lifespan)
dapr_actor = DaprActor(app)
dapr_app = DaprApp(app)
# Task API endpoints
@app.post("/tasks/process")
async def process_task(task: TaskCreate):
"""Start task processing workflow."""
client = DaprWorkflowClient()
instance_id = f"task-workflow-{task.task_id}"
client.schedule_new_workflow(
workflow=task_processing_workflow,
input=TaskProcessingInput(
task_id=task.task_id,
title=task.title,
description=task.description,
assignee=task.assignee,
deadline_seconds=task.deadline_seconds
),
instance_id=instance_id
)
return {"workflow_id": instance_id, "status": "started"}
@app.get("/tasks/{task_id}")
async def get_task(task_id: str):
"""Get task state from TaskActor."""
proxy = ActorProxy.create("TaskActor", ActorId(task_id), TaskActorInterface)
task = await proxy.GetTask()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@app.get("/workflows/{workflow_id}")
async def get_workflow_status(workflow_id: str):
"""Get workflow status."""
client = DaprWorkflowClient()
state = client.get_workflow_state(workflow_id, fetch_payloads=True)
return {
"workflow_id": workflow_id,
"status": str(state.runtime_status),
"output": state.serialized_output
}
# Chat API endpoints
@app.post("/chat/{user_id}")
async def send_message(user_id: str, message: ChatMessage):
"""Send message to ChatActor."""
proxy = ActorProxy.create("ChatActor", ActorId(user_id), ChatActorInterface)
response = await proxy.ProcessMessage({"content": message.content})
return response
@app.get("/chat/{user_id}/history")
async def get_chat_history(user_id: str):
"""Get conversation history from ChatActor."""
proxy = ActorProxy.create("ChatActor", ActorId(user_id), ChatActorInterface)
history = await proxy.GetHistory()
return {"user_id": user_id, "messages": history}
# Pub/Sub subscription
@dapr_app.subscribe(pubsub="pubsub", topic="conversation-events")
async def handle_conversation_event(event_data: dict):
"""Process conversation events from ChatActors."""
print(f"Received conversation event: {event_data.get('event_type')}")
# In production: log to analytics, trigger follow-up workflows, etc.
return {"status": "SUCCESS"}
Kubernetes Deployment
Dapr Components
# k8s/components/statestore.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
namespace: default
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: redis-master.default.svc.cluster.local:6379
- name: redisPassword
value: ""
- name: actorStateStore
value: "true" # Required for actors
# k8s/components/pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
namespace: default
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: redis-master.default.svc.cluster.local:6379
Deployment Manifest
# k8s/task-agent-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: task-agent-system
namespace: default
spec:
replicas: 2
selector:
matchLabels:
app: task-agent-system
template:
metadata:
labels:
app: task-agent-system
annotations:
# Enable Dapr sidecar
dapr.io/enabled: "true"
dapr.io/app-id: "task-agent-system"
dapr.io/app-port: "8000"
# Enable actors
dapr.io/enable-actors: "true"
# Enable API logging for debugging
dapr.io/enable-api-logging: "true"
# Resource limits for sidecar
dapr.io/sidecar-cpu-limit: "500m"
dapr.io/sidecar-memory-limit: "256Mi"
spec:
containers:
- name: task-agent
image: task-agent-system:latest
ports:
- containerPort: 8000
env:
- name: DAPR_HTTP_PORT
value: "3500"
resources:
limits:
cpu: "1"
memory: "512Mi"
requests:
cpu: "250m"
memory: "256Mi"
---
apiVersion: v1
kind: Service
metadata:
name: task-agent-system
namespace: default
spec:
selector:
app: task-agent-system
ports:
- port: 80
targetPort: 8000
type: ClusterIP
Deployment Commands
# 1. Install Redis (if not already installed)
helm repo add bitnami https://charts.bitnami.com/bitnami
helm install redis bitnami/redis --set auth.enabled=false
# 2. Apply Dapr components
kubectl apply -f k8s/components/
# 3. Build and deploy application
docker build -t task-agent-system:latest .
kubectl apply -f k8s/task-agent-deployment.yaml
# 4. Verify pods are running with sidecars
kubectl get pods
# Expected: task-agent-system-xxx 2/2 Running (2 containers = app + daprd)
# 5. Check Dapr dashboard
kubectl port-forward service/dapr-dashboard 8080:8080 -n dapr-system
# Visit http://localhost:8080
Testing the Complete System
Test 1: Task Processing Workflow
# Start task processing
curl -X POST http://localhost:8000/tasks/process \
-H "Content-Type: application/json" \
-d '{
"task_id": "task-001",
"title": "Review PR #42",
"description": "Review the authentication refactoring pull request",
"assignee": "alice@example.com",
"deadline_seconds": 60
}'
Output:
{"workflow_id": "task-workflow-task-001", "status": "started"}
Test 2: Verify Actor State
# Check task state
curl http://localhost:8000/tasks/task-001
Output:
{
"task_id": "task-001",
"title": "Review PR #42",
"status": "assigned",
"assignee": "alice@example.com",
"created_at": "2025-01-15T10:30:00Z"
}
Test 3: Conversation via ChatActor
# Send message
curl -X POST http://localhost:8000/chat/user-alice \
-H "Content-Type: application/json" \
-d '{"content": "What tasks do I have?"}'
Output:
{
"role": "assistant",
"content": "I can help with tasks. You have 1 messages in our conversation.",
"timestamp": "2025-01-15T10:31:00Z"
}
Test 4: Verify Deadline Reminder
Wait for the deadline (60 seconds in test), then check task status:
# After deadline passes
curl http://localhost:8000/tasks/task-001
Output:
{
"task_id": "task-001",
"title": "Review PR #42",
"status": "overdue",
"assignee": "alice@example.com"
}
Test 5: Workflow Recovery (Simulate Pod Restart)
# Start a long-running workflow
curl -X POST http://localhost:8000/tasks/process \
-d '{"task_id": "task-002", "title": "Long task", "description": "Testing recovery", "assignee": "bob@example.com", "deadline_seconds": 300}'
# Kill the pod while workflow is running
kubectl delete pod -l app=task-agent-system
# Wait for pod to restart
kubectl get pods -w
# Check workflow status - it should resume
curl http://localhost:8000/workflows/task-workflow-task-002
Output:
{
"workflow_id": "task-workflow-task-002",
"status": "COMPLETED",
"output": "{\"task_id\": \"task-002\", \"status\": \"completed\"}"
}
Production Readiness Checklist
Before deploying this system to production, verify:
| Category | Requirement | Status |
|---|---|---|
| State | Actor state persists across pod restarts | Verify with Redis persistence |
| Durability | Workflow survives node failure | Test with pod deletion |
| Compensation | Saga rollback executes on failure | Simulate notification failures |
| Observability | Traces visible in Zipkin/Jaeger | Configure tracing component |
| Security | mTLS enabled between sidecars | Verify Dapr Sentry is running |
| Resources | Sidecar limits configured | Check dapr.io annotations |
| Health | Readiness probes configured | Add /healthz endpoint |
Reflect on Your Skill: Comprehensive Review
You've built a complete Digital FTE blueprint. This is the moment to assess what your dapr-deployment skill has learned throughout Chapter 59.
Core Actor Patterns
Test your skill with these prompts:
Using my dapr-deployment skill, generate a TaskActor with:
- State management using _state_manager
- Deadline reminder that marks task as overdue
- Turn-based concurrency guarantees
Does your skill generate code with:
- Proper
ActorInterfacewith@actormethoddecorators? - Lifecycle hooks (
_on_activate,_on_deactivate)? - Reminder registration and
receive_remindercallback?
Workflow Patterns
Using my dapr-deployment skill, generate a TaskProcessingWorkflow with:
- 4 chained activities (validate, create, assign, notify)
- Saga compensation that unassigns on notification failure
- Retry policy on notification activity
Does your skill generate code with:
- Proper
yield ctx.call_activity()for durability? - Compensation list tracking with reverse execution?
wf.RetryPolicyconfiguration?
Integration Patterns
Using my dapr-deployment skill, design a system that combines:
- TaskActor for per-task state
- TaskProcessingWorkflow for orchestration
- Activities that call actors via ActorProxy
Does your skill explain:
- Why activities wrap actor calls (not workflows directly)?
- How ActorProxy enables activity-to-actor communication?
- When to use actors vs workflows?
Kubernetes Deployment
Using my dapr-deployment skill, generate Kubernetes manifests for:
- Deployment with Dapr sidecar annotations
- State store component with actorStateStore: "true"
- Pub/sub component for event publishing
Does your skill include:
- All required Dapr annotations?
- Proper resource limits for sidecar?
- Component configuration for actors?
Document Your Skill's Growth
Through Chapter 59, your skill should now include:
Actors (L01-L08):
- Actor interface + implementation pattern
- State management with _state_manager
- Timers vs reminders decision framework
- Actor communication via ActorProxy
- Event-driven actors with pub/sub
Workflows (L09-L14):
- Workflow + activity pattern
- Determinism rules (what to avoid)
- Task chaining and fan-out/fan-in
- Saga compensation pattern
- Monitor pattern with continue_as_new
Production (L15-L18):
- Actors vs workflows decision framework
- Multi-app workflow configuration
- Namespaced actors for multi-tenancy
- Security essentials (mTLS, state encryption)
If any of these areas show gaps, return to the relevant lesson and update your skill with the missing patterns.
Try With AI
Open your AI companion and explore these capstone scenarios.
Prompt 1: Extend the System
I've built a TaskActor + TaskProcessingWorkflow system for task management.
Now I want to add a ProjectActor that manages a collection of tasks.
Help me design:
1. ProjectActor that tracks multiple TaskActor IDs
2. A workflow that creates a project with 3 initial tasks
3. An activity that calls TaskActor for each task creation
Show me how the ProjectActor would delegate to TaskActors using ActorProxy,
and how the workflow orchestrates the multi-task creation.
What you're learning: Scaling the pattern to hierarchical actor relationships. The AI helps you understand parent-child actor patterns and how workflows orchestrate multiple actor interactions.
Prompt 2: Debug a Production Issue
My TaskProcessingWorkflow is failing intermittently in production. The logs show:
- Workflow starts successfully
- Activities 1-3 complete
- Activity 4 (notify_assignee) fails with timeout
- Compensation runs, but task ends up in inconsistent state
Help me debug. What could cause the inconsistent state after compensation?
What logging should I add? How do I trace the full workflow execution?
What you're learning: Production debugging for actor/workflow systems. The AI guides you through distributed tracing, understanding replay behavior, and identifying race conditions between actors and workflows.
Prompt 3: Design for Scale
My capstone system needs to handle 10,000 concurrent tasks across 100 users.
Each user has their own ChatActor, and tasks are distributed across TaskActors.
Help me analyze:
1. How many actors will be active simultaneously?
2. What's the impact on the Placement service?
3. Should I batch task creation in workflows?
4. How do I monitor actor activation/deactivation rates?
Design a scaling strategy that keeps the system responsive under this load.
What you're learning: Capacity planning for actor systems. The AI helps you understand placement service behavior, actor garbage collection timing, and workflow batching strategies for high-volume scenarios.
Safety Note
This capstone combines multiple Dapr building blocks. When deploying to production: verify Redis has persistence enabled (actor state must survive Redis restarts), configure appropriate actor idle timeouts based on your usage patterns, and implement proper circuit breakers for external notification services to prevent saga compensation storms.