Pub/Sub Messaging
In Chapter 52, you built Kafka producers and consumers directly. You learned about topics, partitions, consumer groups, and offset management. That knowledge is valuable—but it's also tightly coupled to Kafka. If your team decides to use RabbitMQ for one service or Azure Service Bus for cloud deployment, you'd rewrite your messaging code.
Dapr's pub/sub building block gives you event-driven messaging through a single API. Your application publishes to /v1.0/publish/{pubsub}/{topic} and subscribes via HTTP callbacks. The actual broker—Redis, Kafka, RabbitMQ, AWS SNS/SQS—is defined in a YAML component file. Change the YAML, keep your code.
This is the same pattern you learned with state management: infrastructure abstraction through configuration, not code changes.
The Pub/Sub API
Dapr's pub/sub exposes two operations:
| Operation | API | Your Code |
|---|---|---|
| Publish | POST /v1.0/publish/{pubsub}/{topic} | Call Dapr with event data |
| Subscribe | Dapr calls YOUR endpoint | Handle incoming events |
The key insight: publishing is an outbound call to Dapr, but subscribing is Dapr making inbound calls to your application. You register subscription handlers, and Dapr routes events to them.
CloudEvents: Automatic Message Wrapping
When you publish through Dapr, your message gets wrapped in CloudEvents format automatically. CloudEvents is a specification for describing event data in a common way.
What you send:
{
"event_type": "todo.created",
"todo_id": "todo-1",
"title": "Learn Dapr"
}
What Dapr delivers to subscribers:
{
"specversion": "1.0",
"type": "com.dapr.event.sent",
"source": "task-api",
"id": "abc-123-def",
"datacontenttype": "application/json",
"data": {
"event_type": "todo.created",
"todo_id": "todo-1",
"title": "Learn Dapr"
}
}
Why CloudEvents matters:
- Interoperability: Any system that understands CloudEvents can process your events
- Traceability: Built-in
id,source, andtimefields for debugging - Portability: Switch brokers without worrying about message format differences
You don't need to construct CloudEvents yourself—Dapr handles the wrapping. Your subscriber receives the data field with your original payload.
Publishing Events with DaprClient
Here's the async pattern for publishing events from your Todo API:
from dapr.clients import DaprClient
import json
async def publish_todo_created(todo_id: str, title: str):
"""Publish a todo.created event to the pubsub component."""
async with DaprClient() as client:
await client.publish_event(
pubsub_name='pubsub',
topic_name='todo-events',
data=json.dumps({
'event_type': 'todo.created',
'todo_id': todo_id,
'title': title
}),
data_content_type='application/json'
)
Output (Dapr sidecar logs):
INFO[0042] Publishing message to topic todo-events on pubsub pubsub
Key parameters:
| Parameter | Purpose |
|---|---|
pubsub_name | Name of the pub/sub component (matches metadata.name in YAML) |
topic_name | Topic to publish to (created automatically if it doesn't exist) |
data | Your event payload as JSON string |
data_content_type | MIME type for proper deserialization |
The Redis Pub/Sub Component
Before publishing works, you need a pub/sub component configured. Here's Redis:
# components/pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
namespace: default
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: redis-master.default.svc.cluster.local:6379
Apply it:
kubectl apply -f components/pubsub.yaml
Output:
component.dapr.io/pubsub created
That's it. Your publish_event() calls now route through Redis. No connection strings in code, no Redis client imports.
Subscribing to Events: Two Approaches
Dapr supports two subscription patterns:
1. Declarative Subscriptions (Kubernetes CRD)
Define subscriptions as Kubernetes resources:
# subscriptions/todo-subscription.yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: todo-subscription
namespace: default
spec:
pubsubname: pubsub
topic: todo-events
routes:
default: /events/todo
Apply it:
kubectl apply -f subscriptions/todo-subscription.yaml
Then implement the handler in your FastAPI app:
from fastapi import FastAPI
app = FastAPI()
@app.post("/events/todo")
async def handle_todo_event(event_data: dict):
"""Handle todo events from Dapr pub/sub."""
print(f"Received event: {event_data}")
# Process the event
return {"status": "SUCCESS"}
2. Programmatic Subscriptions (dapr-ext-fastapi)
The dapr-ext-fastapi extension registers subscriptions directly in code—no CRD needed:
from fastapi import FastAPI
from dapr.ext.fastapi import DaprApp
app = FastAPI()
dapr_app = DaprApp(app)
@dapr_app.subscribe(pubsub='pubsub', topic='todo-events')
async def handle_todo_event(event_data: dict):
"""Dapr routes todo-events to this handler automatically."""
print(f"Received: {event_data}")
return {"status": "SUCCESS"}
Output (when event arrives):
Received: {'event_type': 'todo.created', 'todo_id': 'todo-1', 'title': 'Learn Dapr'}
Which approach to use?
| Approach | Best For |
|---|---|
| Declarative (CRD) | GitOps workflows, separation of concerns, ops-managed subscriptions |
| Programmatic | Developer-controlled subscriptions, rapid iteration, simpler deployments |
For this book's learning context, programmatic subscriptions are clearer—the subscription lives with the code that handles it.
Complete Example: Todo Event Publisher and Subscriber
Here's a complete FastAPI service that publishes and subscribes to todo events:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from dapr.clients import DaprClient
from dapr.ext.fastapi import DaprApp
from pydantic import BaseModel
import json
import uuid
class TodoCreate(BaseModel):
title: str
class TodoEvent(BaseModel):
event_type: str
todo_id: str
title: str
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Dapr sidecar readiness happens automatically."""
yield
app = FastAPI(lifespan=lifespan)
dapr_app = DaprApp(app)
@app.post("/todos")
async def create_todo(todo: TodoCreate):
"""Create a todo and publish a todo.created event."""
todo_id = str(uuid.uuid4())
# Publish event via Dapr pub/sub
async with DaprClient() as client:
await client.publish_event(
pubsub_name='pubsub',
topic_name='todo-events',
data=json.dumps({
'event_type': 'todo.created',
'todo_id': todo_id,
'title': todo.title
}),
data_content_type='application/json'
)
return {"id": todo_id, "title": todo.title, "status": "created"}
@dapr_app.subscribe(pubsub='pubsub', topic='todo-events')
async def handle_todo_event(event_data: dict):
"""Process todo events (could trigger notifications, analytics, etc.)."""
event_type = event_data.get('event_type', 'unknown')
todo_id = event_data.get('todo_id', 'unknown')
print(f"Processing {event_type} for todo {todo_id}")
# Your event handling logic here
# - Send notification
# - Update analytics
# - Trigger downstream workflows
return {"status": "SUCCESS"}
Testing the flow:
# Create a todo (triggers event)
curl -X POST http://localhost:8000/todos \
-H "Content-Type: application/json" \
-d '{"title": "Learn Dapr pub/sub"}'
Output:
{"id": "abc-123", "title": "Learn Dapr pub/sub", "status": "created"}
Logs show the subscription handler received the event:
Processing todo.created for todo abc-123
Swapping Brokers: Redis to Kafka
Here's the power of Dapr's abstraction. You learned Kafka in Chapter 52. To use Kafka instead of Redis for pub/sub, change only the component YAML:
# components/kafka-pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
namespace: default
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: task-events-kafka-bootstrap.kafka.svc.cluster.local:9092
- name: consumerGroup
value: todo-service
- name: authType
value: none
Apply the new component:
kubectl apply -f components/kafka-pubsub.yaml
Your application code doesn't change. The same publish_event() and @dapr_app.subscribe() calls now route through Kafka instead of Redis.
When to use which broker:
| Broker | Use Case |
|---|---|
| Redis | Development, simple pub/sub, low-latency local messaging |
| Kafka | Production event streaming, durability, replay capability, high throughput |
| RabbitMQ | Complex routing, message queuing patterns |
| Cloud (SNS/SQS, Pub/Sub) | Managed infrastructure, cloud-native deployments |
The choice is now a deployment decision, not a code decision.
Subscription Response Patterns
Your subscription handler must return a status that tells Dapr how to handle the message:
@dapr_app.subscribe(pubsub='pubsub', topic='todo-events')
async def handle_todo_event(event_data: dict):
try:
# Process event
process_event(event_data)
return {"status": "SUCCESS"} # Acknowledge, remove from queue
except TransientError:
return {"status": "RETRY"} # Redelivery requested
except PermanentError:
return {"status": "DROP"} # Discard, don't retry
| Status | Dapr Behavior |
|---|---|
SUCCESS | Message acknowledged, removed from broker |
RETRY | Message redelivered after backoff |
DROP | Message discarded without retry |
For critical events, prefer RETRY over DROP—let the broker's dead-letter handling manage truly unprocessable messages.
Reflect on Your Skill
You built a dapr-deployment skill in Lesson 0. Test and improve it based on what you learned.
Test Your Skill
Using my dapr-deployment skill, add pub/sub messaging to my Todo API:
1. Create a Redis pub/sub component
2. Publish todo.created events when todos are created
3. Implement a subscription handler using dapr-ext-fastapi
Does my skill show both publish_event and @dapr_app.subscribe patterns?
Identify Gaps
Ask yourself:
- Did my skill explain the CloudEvents wrapping that Dapr applies automatically?
- Did it show how to swap from Redis to Kafka by changing only the component YAML?
- Did it include the subscription response patterns (SUCCESS, RETRY, DROP)?
Improve Your Skill
If you found gaps:
My dapr-deployment skill is missing pub/sub patterns.
Update it to include:
- DaprClient.publish_event() async pattern
- dapr-ext-fastapi @dapr_app.subscribe() decorator
- Redis and Kafka component YAML examples
- Subscription response status meanings
Try With AI
Setup: You have a Todo API using direct Redis pub/sub and want to migrate to Dapr's abstraction.
Prompt 1: Add pub/sub to your Todo API
Add pub/sub to my Todo API: publish todo.created events using async DaprClient and create a subscription handler using dapr-ext-fastapi.
My current code creates todos but doesn't publish events. Show me:
1. The publish_event call in my create endpoint
2. A subscription handler that logs received events
3. The Redis pub/sub component YAML
What you're learning: The pub/sub integration pattern. You're seeing how Dapr's publish and subscribe APIs fit into existing FastAPI code without requiring broker-specific clients. The abstraction keeps your business logic clean.
Prompt 2: Swap brokers without code changes
Show me how to swap from Redis pub/sub to Kafka pub/sub without changing my application code.
I want to see:
1. My current Redis component
2. The Kafka component that replaces it
3. Confirmation that my publish_event and subscribe code stays identical
What you're learning: Infrastructure portability in practice. The component YAML is the only thing that changes—your application remains broker-agnostic. This is why Dapr matters for production systems that may need to evolve their infrastructure.
Prompt 3: Understand CloudEvents format
What's CloudEvents format? How does Dapr handle it automatically?
Show me:
1. What my raw event data looks like
2. What Dapr wraps it into
3. What my subscriber actually receives
What you're learning: CloudEvents as the interoperability standard for event-driven systems. Dapr handles the envelope automatically, so you don't construct CloudEvents manually—but understanding the format helps when debugging or integrating with external systems that expect CloudEvents.
Safety note: When testing pub/sub in production environments, use separate topics for testing. Publishing to production topics during development can trigger real workflows—notifications sent, orders processed, analytics skewed.