Skip to main content
Updated Feb 23, 2026

Capstone: Dapr-Enabled Task API

You've learned Dapr's building blocks individually: state management in Lesson 03, service invocation in Lesson 04, pub/sub in Lesson 05, bindings in Lesson 06, jobs in Lesson 07, and secrets in Lesson 08. Each building block solves a specific distributed systems challenge. Now it's time to compose them into a complete application.

This capstone follows the spec-driven development approach. You'll write a specification first, then refactor the Part 6 Task API to use Dapr for all infrastructure abstraction. The goal is practical: eliminate direct Redis, Kafka, and HTTP client code from your application. Your Task API talks to Dapr; Dapr talks to infrastructure.

The result demonstrates Dapr's core value proposition: infrastructure becomes configuration, not code. Need to swap Redis for PostgreSQL? Change a YAML file. Need to add Kafka alongside Redis pub/sub? Deploy another component. Your application code stays the same.

Phase 1: Write the Specification

Before touching code, define precisely what you're building. A clear specification enables focused implementation and provides acceptance criteria for validation.

The Task API Dapr Migration Specification

# Task API with Dapr Integration

## Intent

Refactor the Part 6 Task API to use Dapr for all infrastructure abstraction.
This eliminates direct infrastructure clients (redis-py, confluent-kafka, httpx)
from application code, making the Task API portable across cloud providers and
infrastructure backends.

**Business Value**: The current Task API has hardcoded connections to Redis,
Kafka, and other services. Changing infrastructure requires code changes,
testing cycles, and deployment risk. Dapr abstraction enables infrastructure
decisions at deployment time, not development time.

## Building Blocks to Use

| Building Block | Replaces | Purpose |
|----------------|----------|---------|
| **State Management** | Direct Redis client | Store tasks as key-value pairs |
| **Pub/Sub** | Direct Kafka producer | Publish task.created, task.completed events |
| **Service Invocation** | Direct HTTP client | Call notification-service via Dapr |
| **Secrets** | Environment variables | Retrieve API keys securely |

## Constraints

### Technical Constraints
- **Dapr Version**: 1.14+ (requires Jobs API support)
- **Python SDK**: dapr-client>=1.13.0, dapr-ext-fastapi>=1.13.0
- **State Store**: Redis (component: statestore)
- **Pub/Sub**: Redis (component: pubsub)
- **Platform**: Docker Desktop Kubernetes

### Operational Constraints
- Pod must show 2/2 containers (app + Dapr sidecar)
- No direct infrastructure imports in application code
- All configuration via Dapr components, not environment variables

## Success Criteria

### SC-1: Sidecar Injection Working
- [ ] Deployment includes Dapr annotations
- [ ] kubectl get pods shows 2/2 READY for task-api
- [ ] Dapr sidecar logs show "dapr initialized"

### SC-2: State Management via Dapr
- [ ] Create task -> state saved to statestore component
- [ ] Get task -> state retrieved from statestore component
- [ ] Update task -> state updated with ETag concurrency
- [ ] Delete task -> state removed from statestore

### SC-3: Events Published via Dapr
- [ ] Create task -> task.created event published to pubsub
- [ ] Complete task -> task.completed event published to pubsub
- [ ] Events received by subscription handler

### SC-4: Service Invocation via Dapr
- [ ] Task completion -> notification-service called via Dapr invoke
- [ ] Service discovery automatic (no hardcoded URLs)

### SC-5: No Direct Infrastructure Clients
- [ ] No redis-py imports in application code
- [ ] No confluent-kafka imports in application code
- [ ] No httpx/requests for service-to-service calls

## Non-Goals (What We're NOT Building)

- Dapr Actors for task state (saved for Chapter 59)
- Dapr Workflows for multi-step operations (saved for Chapter 59)
- Schema Registry integration (JSON sufficient for demonstration)
- Multi-cluster deployment (single Kubernetes cluster)
- Production observability (tracing, metrics beyond basic logging)

## Architecture

```
┌────────────────────────────────────────────────────────────────────┐
│ Docker Desktop Kubernetes │
│ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ task-api Pod (2/2 containers) │ │
│ │ ┌────────────────────────┐ ┌────────────────────────────┐ │ │
│ │ │ FastAPI Application │ │ Dapr Sidecar (daprd) │ │ │
│ │ │ - POST /tasks │ │ - State API :3500 │ │ │
│ │ │ - GET /tasks/{id} │──│ - Pub/Sub API :3500 │ │ │
│ │ │ - PUT /tasks/{id} │ │ - Invoke API :3500 │ │ │
│ │ │ - DELETE /tasks/{id} │ │ - Secrets API :3500 │ │ │
│ │ └────────────────────────┘ └────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────┴───────────────┐ │
│ ▼ ▼ │
│ ┌─────────────────────────┐ ┌─────────────────────────┐ │
│ │ Redis (statestore) │ │ Redis (pubsub) │ │
│ │ component: statestore │ │ component: pubsub │ │
│ └─────────────────────────┘ └─────────────────────────┘ │
│ │
│ ┌─────────────────────────┐ ┌─────────────────────────┐ │
│ │ notification-service │ │ Kubernetes Secrets │ │
│ │ app-id: notification │ │ component: k8s-secrets │ │
│ └─────────────────────────┘ └─────────────────────────┘ │
└────────────────────────────────────────────────────────────────────┘
```

## Task Model

```python
from pydantic import BaseModel
from datetime import datetime
from enum import Enum

class TaskStatus(str, Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"

class Task(BaseModel):
id: str
title: str
description: str | None = None
status: TaskStatus = TaskStatus.PENDING
priority: int = 1
created_at: datetime
updated_at: datetime
```

Why This Specification Matters

The specification maps Part 6's Task API functionality to Dapr building blocks:

Part 6 PatternDapr Building Block
redis.set(key, value)client.save_state(store, key, value)
producer.send(topic, event)client.publish_event(pubsub, topic, data)
httpx.post(url, data)client.invoke_method(app_id, method, data)
os.getenv("API_KEY")client.get_secret(store, key)

When you work with AI to implement this specification, you both understand what "done" looks like.

Phase 2: Implement the Specification

Now refactor the Task API by composing patterns from Lessons 03-08. This phase demonstrates the core skill of spec-driven development: translating clear requirements into working code.

Step 1: Define the Task Model

First, create the Pydantic models for tasks and events.

Create models.py:

from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum

class TaskStatus(str, Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"

class TaskCreate(BaseModel):
title: str = Field(..., min_length=1, max_length=200)
description: str | None = None
priority: int = Field(default=1, ge=1, le=5)

class Task(BaseModel):
id: str
title: str
description: str | None = None
status: TaskStatus = TaskStatus.PENDING
priority: int = 1
created_at: datetime
updated_at: datetime

class TaskEvent(BaseModel):
event_type: str
task_id: str
title: str
status: str
timestamp: datetime

Output:

>>> from models import Task, TaskStatus
>>> from datetime import datetime
>>> task = Task(
... id="task-123",
... title="Learn Dapr",
... status=TaskStatus.PENDING,
... priority=2,
... created_at=datetime.utcnow(),
... updated_at=datetime.utcnow()
... )
>>> task.model_dump_json()
'{"id":"task-123","title":"Learn Dapr","description":null,"status":"pending","priority":2,"created_at":"2025-01-15T10:30:00","updated_at":"2025-01-15T10:30:00"}'

Step 2: Implement the Dapr-Enabled Task API

Create the FastAPI application that uses Dapr for all infrastructure.

Create main.py:

from contextlib import asynccontextmanager
from datetime import datetime
from fastapi import FastAPI, HTTPException
from dapr.clients import DaprClient
from dapr.ext.fastapi import DaprApp
from pydantic import BaseModel
import json
import uuid

from models import Task, TaskCreate, TaskStatus, TaskEvent

# Configuration - names match component metadata.name
STORE_NAME = "statestore"
PUBSUB_NAME = "pubsub"
TOPIC_NAME = "task-events"
SECRETS_STORE = "kubernetes-secrets"

@asynccontextmanager
async def lifespan(app: FastAPI):
"""Wait for Dapr sidecar readiness."""
with DaprClient() as client:
client.wait(timeout_s=30)
yield

app = FastAPI(
title="Task API with Dapr",
description="Distributed Task Management using Dapr building blocks",
lifespan=lifespan
)
dapr_app = DaprApp(app)

# =============================================================================
# API Endpoints - All infrastructure via Dapr
# =============================================================================

@app.post("/tasks", response_model=Task, status_code=201)
async def create_task(task_create: TaskCreate):
"""Create a new task using Dapr state and pub/sub."""
now = datetime.utcnow()
task = Task(
id=str(uuid.uuid4()),
title=task_create.title,
description=task_create.description,
priority=task_create.priority,
created_at=now,
updated_at=now
)

with DaprClient() as client:
# Save state via Dapr (replaces direct Redis)
client.save_state(
store_name=STORE_NAME,
key=f"task-{task.id}",
value=task.model_dump_json()
)

# Publish event via Dapr (replaces direct Kafka)
event = TaskEvent(
event_type="task.created",
task_id=task.id,
title=task.title,
status=task.status.value,
timestamp=now
)
client.publish_event(
pubsub_name=PUBSUB_NAME,
topic_name=TOPIC_NAME,
data=event.model_dump_json(),
data_content_type="application/json"
)

return task

@app.get("/tasks/{task_id}", response_model=Task)
async def get_task(task_id: str):
"""Retrieve a task from Dapr state store."""
with DaprClient() as client:
state = client.get_state(
store_name=STORE_NAME,
key=f"task-{task_id}"
)

if not state.data:
raise HTTPException(status_code=404, detail="Task not found")

return Task.model_validate_json(state.data)

@app.put("/tasks/{task_id}/status", response_model=Task)
async def update_task_status(task_id: str, status: TaskStatus):
"""Update task status with optimistic concurrency via ETag."""
with DaprClient() as client:
# Get current state with ETag
state = client.get_state(
store_name=STORE_NAME,
key=f"task-{task_id}"
)

if not state.data:
raise HTTPException(status_code=404, detail="Task not found")

task = Task.model_validate_json(state.data)
task.status = status
task.updated_at = datetime.utcnow()

# Save with ETag for concurrency control
client.save_state(
store_name=STORE_NAME,
key=f"task-{task_id}",
value=task.model_dump_json(),
etag=state.etag
)

# Publish status change event
event = TaskEvent(
event_type=f"task.{status.value}",
task_id=task.id,
title=task.title,
status=status.value,
timestamp=task.updated_at
)
client.publish_event(
pubsub_name=PUBSUB_NAME,
topic_name=TOPIC_NAME,
data=event.model_dump_json(),
data_content_type="application/json"
)

return task

@app.delete("/tasks/{task_id}", status_code=204)
async def delete_task(task_id: str):
"""Delete a task from Dapr state store."""
with DaprClient() as client:
# Verify exists
state = client.get_state(
store_name=STORE_NAME,
key=f"task-{task_id}"
)

if not state.data:
raise HTTPException(status_code=404, detail="Task not found")

# Delete state
client.delete_state(
store_name=STORE_NAME,
key=f"task-{task_id}"
)

# =============================================================================
# Event Subscription - Triggers notification service
# =============================================================================

@dapr_app.subscribe(pubsub=PUBSUB_NAME, topic=TOPIC_NAME)
async def handle_task_event(event_data: dict):
"""Handle task events and invoke notification service via Dapr."""
print(f"Received event: {event_data}")

event_type = event_data.get("event_type", "unknown")
task_id = event_data.get("task_id", "unknown")

# Call notification service via Dapr (replaces direct HTTP)
if event_type == "task.completed":
with DaprClient() as client:
try:
client.invoke_method(
app_id="notification-service",
method_name="notify",
data=json.dumps({
"type": "task_completed",
"task_id": task_id,
"message": f"Task {task_id} has been completed"
}),
http_verb="POST"
)
print(f"Notification sent for task {task_id}")
except Exception as e:
print(f"Failed to notify: {e}")

return {"status": "SUCCESS"}

# =============================================================================
# Health Check
# =============================================================================

@app.get("/health")
async def health():
"""Health check endpoint."""
return {"status": "healthy"}

if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

Output:

$ curl -X POST http://localhost:8000/tasks \
-H "Content-Type: application/json" \
-d '{"title": "Learn Dapr building blocks", "priority": 2}'

{"id":"abc-123-def","title":"Learn Dapr building blocks","description":null,"status":"pending","priority":2,"created_at":"2025-01-15T10:30:00","updated_at":"2025-01-15T10:30:00"}

Step 3: Create Dapr Components

Configure the Dapr components that your application uses.

Create components/statestore.yaml:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
namespace: default
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: redis-master.default.svc.cluster.local:6379
- name: redisPassword
value: ""

Create components/pubsub.yaml:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
namespace: default
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: redis-master.default.svc.cluster.local:6379

Create components/secrets.yaml:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kubernetes-secrets
namespace: default
spec:
type: secretstores.kubernetes
version: v1
metadata: []

Apply the components:

kubectl apply -f components/

Output:

component.dapr.io/statestore created
component.dapr.io/pubsub created
component.dapr.io/kubernetes-secrets created

Step 4: Deploy with Dapr Annotations

Create the Kubernetes deployment with Dapr sidecar injection.

Create k8s/deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
name: task-api
namespace: default
spec:
replicas: 2
selector:
matchLabels:
app: task-api
template:
metadata:
labels:
app: task-api
annotations:
# Enable Dapr sidecar injection
dapr.io/enabled: "true"
# Unique app identifier for service discovery
dapr.io/app-id: "task-api"
# Port your app listens on
dapr.io/app-port: "8000"
# Enable API logging for debugging
dapr.io/enable-api-logging: "true"
spec:
containers:
- name: task-api
image: task-api:latest
ports:
- containerPort: 8000
resources:
requests:
cpu: "100m"
memory: "128Mi"
limits:
cpu: "500m"
memory: "256Mi"
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 30

Create k8s/service.yaml:

apiVersion: v1
kind: Service
metadata:
name: task-api
namespace: default
spec:
selector:
app: task-api
ports:
- port: 80
targetPort: 8000
type: ClusterIP

Deploy:

kubectl apply -f k8s/

Output:

deployment.apps/task-api created
service/task-api created

Phase 3: Validate Against Specification

The implementation is complete. Now verify each success criterion from the specification.

SC-1: Sidecar Injection Working

# Check pod status - should show 2/2 READY
kubectl get pods -l app=task-api

Output:

NAME                        READY   STATUS    RESTARTS   AGE
task-api-7d4b5c6f8-abc12 2/2 Running 0 2m
task-api-7d4b5c6f8-def34 2/2 Running 0 2m
# Verify Dapr sidecar initialized
kubectl logs task-api-7d4b5c6f8-abc12 -c daprd | grep "dapr initialized"

Output:

time="2025-01-15T10:30:00Z" level=info msg="dapr initialized. Status: Running."
CriterionStatus
Deployment includes Dapr annotationsPASS
kubectl get pods shows 2/2 READYPASS
Dapr sidecar logs show initializedPASS

SC-2: State Management via Dapr

# Port-forward to test
kubectl port-forward service/task-api 8000:80

# Create task
curl -X POST http://localhost:8000/tasks \
-H "Content-Type: application/json" \
-d '{"title": "Test state management"}'

Output:

{"id":"task-789","title":"Test state management","status":"pending",...}
# Get task
curl http://localhost:8000/tasks/task-789

Output:

{"id":"task-789","title":"Test state management","status":"pending",...}
# Update status
curl -X PUT "http://localhost:8000/tasks/task-789/status?status=completed"

Output:

{"id":"task-789","title":"Test state management","status":"completed",...}
CriterionStatus
Create task saves statePASS
Get task retrieves statePASS
Update task with ETagPASS
Delete task removes statePASS

SC-3: Events Published via Dapr

# Check application logs for event publishing
kubectl logs task-api-7d4b5c6f8-abc12 -c task-api | grep "Received event"

Output:

Received event: {'event_type': 'task.created', 'task_id': 'task-789', ...}
Received event: {'event_type': 'task.completed', 'task_id': 'task-789', ...}
CriterionStatus
task.created event publishedPASS
task.completed event publishedPASS
Subscription handler received eventsPASS

SC-4: Service Invocation via Dapr

# Check logs for notification service call
kubectl logs task-api-7d4b5c6f8-abc12 -c task-api | grep "Notification"

Output:

Notification sent for task task-789
CriterionStatus
notification-service called via DaprPASS
Service discovery automaticPASS

SC-5: No Direct Infrastructure Clients

# Verify no direct imports in application code
grep -E "import redis|from redis|import confluent_kafka|import httpx" main.py models.py

Output:

(no output - no matches found)
CriterionStatus
No redis-py importsPASS
No confluent-kafka importsPASS
No httpx for service callsPASS

Validation Summary

All success criteria from the specification have been verified:

Success CriteriaResult
SC-1: Sidecar Injection3/3 PASS
SC-2: State Management4/4 PASS
SC-3: Events Published3/3 PASS
SC-4: Service Invocation2/2 PASS
SC-5: No Direct Clients3/3 PASS

The implementation matches the specification.

What You Built

This capstone demonstrated the spec-driven development pattern for Dapr integration:

PhaseActivityOutcome
SpecificationDefine building blocks, constraints, success criteriaClear requirements document
ImplementationCompose patterns from Lessons 03-08Working Dapr-enabled API
ValidationVerify each success criterionEvidence of correctness

The system you built includes:

  • State management via Dapr state API (replaces Redis client)
  • Pub/sub messaging via Dapr publish/subscribe (replaces Kafka producer)
  • Service invocation via Dapr invoke (replaces HTTP client)
  • Kubernetes deployment with sidecar injection annotations
  • Zero infrastructure imports in application code

The key transformation: your application code no longer knows about Redis, Kafka, or HTTP clients. It only knows about Dapr APIs. Infrastructure decisions happen in YAML configuration, not Python code.


Reflect on Your Skill

You built a dapr-deployment skill in Lesson 0 and improved it throughout Lessons 01-08. This capstone is the ultimate test.

Test Your Skill

Using my dapr-deployment skill, refactor my existing FastAPI service to use Dapr:
1. Replace redis-py state management with Dapr state API
2. Replace Kafka producer with Dapr pub/sub
3. Replace httpx service calls with Dapr service invocation
4. Create Kubernetes deployment with Dapr annotations

Does my skill produce a complete, working Dapr-enabled service?

Identify Gaps

After completing this capstone, ask yourself:

  • Did my skill show the full integration pattern (state + pub/sub + invoke together)?
  • Did it include the Kubernetes deployment with correct annotations?
  • Did it demonstrate event subscription with @dapr_app.subscribe?
  • Did it show the sidecar verification process (2/2 containers)?

Improve Your Skill

If you found gaps:

My dapr-deployment skill is missing the complete integration pattern.
Update it to include:
- Full FastAPI + Dapr integration example (all building blocks)
- Kubernetes deployment YAML with Dapr annotations
- Component configuration for state, pub/sub, secrets
- Verification commands to confirm sidecar injection

Your skill should now generate complete Dapr-enabled FastAPI services. Test it by asking for a new microservice with all building blocks.


Try With AI

Use AI to extend and refine your capstone implementation.

Prompt 1: Migrate State Management

I have a FastAPI Task API that uses direct Redis with redis-py.
I want to migrate to Dapr state management.

Current code uses:
- redis.set(f"task:{task_id}", task_json)
- redis.get(f"task:{task_id}")
- redis.delete(f"task:{task_id}")

Show me:
1. The DaprClient equivalents for each operation
2. How to handle optimistic concurrency with ETags
3. The statestore component YAML for Redis backend
4. How to verify state is being stored (debug commands)

What you're learning: The state migration pattern. You're seeing how to replace direct Redis calls with Dapr's state API while gaining features like automatic concurrency control. The abstraction prepares you for swapping backends without code changes.


Prompt 2: Add Pub/Sub Integration

My Task API saves tasks but doesn't publish events.
I want to add Dapr pub/sub for task.created and task.completed events.

Show me:
1. The publish_event call to add after save_state
2. A subscription handler using dapr-ext-fastapi
3. How to invoke another service when processing events
4. The Redis pub/sub component YAML

What you're learning: Event-driven patterns with Dapr. You're composing pub/sub with state management, seeing how events flow between services through Dapr's abstraction layer rather than direct broker connections.


Prompt 3: Deploy with Sidecar Verification

I have a working Dapr-enabled Task API locally.
Now I need to deploy it to Kubernetes with the Dapr sidecar.

Show me:
1. The complete Deployment YAML with all Dapr annotations
2. Commands to verify 2/2 containers are running
3. How to check Dapr sidecar logs for "initialized"
4. How to test the deployed service with port-forward

What you're learning: Production deployment patterns. The sidecar injection via annotations is the key insight - your application doesn't install Dapr, Kubernetes injects it. Verification confirms the distributed system is functioning correctly.

Safety note: When migrating production services to Dapr, run both implementations in parallel during transition. Direct clients and Dapr can coexist temporarily, allowing gradual migration and rollback if needed. Never migrate all services simultaneously - one service at a time reduces blast radius.