Skip to main content
Updated Feb 23, 2026

Pub/Sub Messaging

In Chapter 52, you built Kafka producers and consumers directly. You learned about topics, partitions, consumer groups, and offset management. That knowledge is valuable—but it's also tightly coupled to Kafka. If your team decides to use RabbitMQ for one service or Azure Service Bus for cloud deployment, you'd rewrite your messaging code.

Dapr's pub/sub building block gives you event-driven messaging through a single API. Your application publishes to /v1.0/publish/{pubsub}/{topic} and subscribes via HTTP callbacks. The actual broker—Redis, Kafka, RabbitMQ, AWS SNS/SQS—is defined in a YAML component file. Change the YAML, keep your code.

This is the same pattern you learned with state management: infrastructure abstraction through configuration, not code changes.

The Pub/Sub API

Dapr's pub/sub exposes two operations:

OperationAPIYour Code
PublishPOST /v1.0/publish/{pubsub}/{topic}Call Dapr with event data
SubscribeDapr calls YOUR endpointHandle incoming events

The key insight: publishing is an outbound call to Dapr, but subscribing is Dapr making inbound calls to your application. You register subscription handlers, and Dapr routes events to them.

CloudEvents: Automatic Message Wrapping

When you publish through Dapr, your message gets wrapped in CloudEvents format automatically. CloudEvents is a specification for describing event data in a common way.

What you send:

{
"event_type": "todo.created",
"todo_id": "todo-1",
"title": "Learn Dapr"
}

What Dapr delivers to subscribers:

{
"specversion": "1.0",
"type": "com.dapr.event.sent",
"source": "task-api",
"id": "abc-123-def",
"datacontenttype": "application/json",
"data": {
"event_type": "todo.created",
"todo_id": "todo-1",
"title": "Learn Dapr"
}
}

Why CloudEvents matters:

  • Interoperability: Any system that understands CloudEvents can process your events
  • Traceability: Built-in id, source, and time fields for debugging
  • Portability: Switch brokers without worrying about message format differences

You don't need to construct CloudEvents yourself—Dapr handles the wrapping. Your subscriber receives the data field with your original payload.

Publishing Events with DaprClient

Here's the async pattern for publishing events from your Todo API:

from dapr.clients import DaprClient
import json

async def publish_todo_created(todo_id: str, title: str):
"""Publish a todo.created event to the pubsub component."""
async with DaprClient() as client:
await client.publish_event(
pubsub_name='pubsub',
topic_name='todo-events',
data=json.dumps({
'event_type': 'todo.created',
'todo_id': todo_id,
'title': title
}),
data_content_type='application/json'
)

Output (Dapr sidecar logs):

INFO[0042] Publishing message to topic todo-events on pubsub pubsub

Key parameters:

ParameterPurpose
pubsub_nameName of the pub/sub component (matches metadata.name in YAML)
topic_nameTopic to publish to (created automatically if it doesn't exist)
dataYour event payload as JSON string
data_content_typeMIME type for proper deserialization

The Redis Pub/Sub Component

Before publishing works, you need a pub/sub component configured. Here's Redis:

# components/pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
namespace: default
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: redis-master.default.svc.cluster.local:6379

Apply it:

kubectl apply -f components/pubsub.yaml

Output:

component.dapr.io/pubsub created

That's it. Your publish_event() calls now route through Redis. No connection strings in code, no Redis client imports.

Subscribing to Events: Two Approaches

Dapr supports two subscription patterns:

1. Declarative Subscriptions (Kubernetes CRD)

Define subscriptions as Kubernetes resources:

# subscriptions/todo-subscription.yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: todo-subscription
namespace: default
spec:
pubsubname: pubsub
topic: todo-events
routes:
default: /events/todo

Apply it:

kubectl apply -f subscriptions/todo-subscription.yaml

Then implement the handler in your FastAPI app:

from fastapi import FastAPI

app = FastAPI()

@app.post("/events/todo")
async def handle_todo_event(event_data: dict):
"""Handle todo events from Dapr pub/sub."""
print(f"Received event: {event_data}")
# Process the event
return {"status": "SUCCESS"}

2. Programmatic Subscriptions (dapr-ext-fastapi)

The dapr-ext-fastapi extension registers subscriptions directly in code—no CRD needed:

from fastapi import FastAPI
from dapr.ext.fastapi import DaprApp

app = FastAPI()
dapr_app = DaprApp(app)

@dapr_app.subscribe(pubsub='pubsub', topic='todo-events')
async def handle_todo_event(event_data: dict):
"""Dapr routes todo-events to this handler automatically."""
print(f"Received: {event_data}")
return {"status": "SUCCESS"}

Output (when event arrives):

Received: {'event_type': 'todo.created', 'todo_id': 'todo-1', 'title': 'Learn Dapr'}

Which approach to use?

ApproachBest For
Declarative (CRD)GitOps workflows, separation of concerns, ops-managed subscriptions
ProgrammaticDeveloper-controlled subscriptions, rapid iteration, simpler deployments

For this book's learning context, programmatic subscriptions are clearer—the subscription lives with the code that handles it.

Complete Example: Todo Event Publisher and Subscriber

Here's a complete FastAPI service that publishes and subscribes to todo events:

from contextlib import asynccontextmanager
from fastapi import FastAPI
from dapr.clients import DaprClient
from dapr.ext.fastapi import DaprApp
from pydantic import BaseModel
import json
import uuid

class TodoCreate(BaseModel):
title: str

class TodoEvent(BaseModel):
event_type: str
todo_id: str
title: str

@asynccontextmanager
async def lifespan(app: FastAPI):
"""Dapr sidecar readiness happens automatically."""
yield

app = FastAPI(lifespan=lifespan)
dapr_app = DaprApp(app)

@app.post("/todos")
async def create_todo(todo: TodoCreate):
"""Create a todo and publish a todo.created event."""
todo_id = str(uuid.uuid4())

# Publish event via Dapr pub/sub
async with DaprClient() as client:
await client.publish_event(
pubsub_name='pubsub',
topic_name='todo-events',
data=json.dumps({
'event_type': 'todo.created',
'todo_id': todo_id,
'title': todo.title
}),
data_content_type='application/json'
)

return {"id": todo_id, "title": todo.title, "status": "created"}

@dapr_app.subscribe(pubsub='pubsub', topic='todo-events')
async def handle_todo_event(event_data: dict):
"""Process todo events (could trigger notifications, analytics, etc.)."""
event_type = event_data.get('event_type', 'unknown')
todo_id = event_data.get('todo_id', 'unknown')

print(f"Processing {event_type} for todo {todo_id}")

# Your event handling logic here
# - Send notification
# - Update analytics
# - Trigger downstream workflows

return {"status": "SUCCESS"}

Testing the flow:

# Create a todo (triggers event)
curl -X POST http://localhost:8000/todos \
-H "Content-Type: application/json" \
-d '{"title": "Learn Dapr pub/sub"}'

Output:

{"id": "abc-123", "title": "Learn Dapr pub/sub", "status": "created"}

Logs show the subscription handler received the event:

Processing todo.created for todo abc-123

Swapping Brokers: Redis to Kafka

Here's the power of Dapr's abstraction. You learned Kafka in Chapter 52. To use Kafka instead of Redis for pub/sub, change only the component YAML:

# components/kafka-pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
namespace: default
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: task-events-kafka-bootstrap.kafka.svc.cluster.local:9092
- name: consumerGroup
value: todo-service
- name: authType
value: none

Apply the new component:

kubectl apply -f components/kafka-pubsub.yaml

Your application code doesn't change. The same publish_event() and @dapr_app.subscribe() calls now route through Kafka instead of Redis.

When to use which broker:

BrokerUse Case
RedisDevelopment, simple pub/sub, low-latency local messaging
KafkaProduction event streaming, durability, replay capability, high throughput
RabbitMQComplex routing, message queuing patterns
Cloud (SNS/SQS, Pub/Sub)Managed infrastructure, cloud-native deployments

The choice is now a deployment decision, not a code decision.

Subscription Response Patterns

Your subscription handler must return a status that tells Dapr how to handle the message:

@dapr_app.subscribe(pubsub='pubsub', topic='todo-events')
async def handle_todo_event(event_data: dict):
try:
# Process event
process_event(event_data)
return {"status": "SUCCESS"} # Acknowledge, remove from queue

except TransientError:
return {"status": "RETRY"} # Redelivery requested

except PermanentError:
return {"status": "DROP"} # Discard, don't retry
StatusDapr Behavior
SUCCESSMessage acknowledged, removed from broker
RETRYMessage redelivered after backoff
DROPMessage discarded without retry

For critical events, prefer RETRY over DROP—let the broker's dead-letter handling manage truly unprocessable messages.


Reflect on Your Skill

You built a dapr-deployment skill in Lesson 0. Test and improve it based on what you learned.

Test Your Skill

Using my dapr-deployment skill, add pub/sub messaging to my Todo API:
1. Create a Redis pub/sub component
2. Publish todo.created events when todos are created
3. Implement a subscription handler using dapr-ext-fastapi

Does my skill show both publish_event and @dapr_app.subscribe patterns?

Identify Gaps

Ask yourself:

  • Did my skill explain the CloudEvents wrapping that Dapr applies automatically?
  • Did it show how to swap from Redis to Kafka by changing only the component YAML?
  • Did it include the subscription response patterns (SUCCESS, RETRY, DROP)?

Improve Your Skill

If you found gaps:

My dapr-deployment skill is missing pub/sub patterns.
Update it to include:
- DaprClient.publish_event() async pattern
- dapr-ext-fastapi @dapr_app.subscribe() decorator
- Redis and Kafka component YAML examples
- Subscription response status meanings

Try With AI

Setup: You have a Todo API using direct Redis pub/sub and want to migrate to Dapr's abstraction.

Prompt 1: Add pub/sub to your Todo API

Add pub/sub to my Todo API: publish todo.created events using async DaprClient and create a subscription handler using dapr-ext-fastapi.

My current code creates todos but doesn't publish events. Show me:
1. The publish_event call in my create endpoint
2. A subscription handler that logs received events
3. The Redis pub/sub component YAML

What you're learning: The pub/sub integration pattern. You're seeing how Dapr's publish and subscribe APIs fit into existing FastAPI code without requiring broker-specific clients. The abstraction keeps your business logic clean.


Prompt 2: Swap brokers without code changes

Show me how to swap from Redis pub/sub to Kafka pub/sub without changing my application code.

I want to see:
1. My current Redis component
2. The Kafka component that replaces it
3. Confirmation that my publish_event and subscribe code stays identical

What you're learning: Infrastructure portability in practice. The component YAML is the only thing that changes—your application remains broker-agnostic. This is why Dapr matters for production systems that may need to evolve their infrastructure.


Prompt 3: Understand CloudEvents format

What's CloudEvents format? How does Dapr handle it automatically?

Show me:
1. What my raw event data looks like
2. What Dapr wraps it into
3. What my subscriber actually receives

What you're learning: CloudEvents as the interoperability standard for event-driven systems. Dapr handles the envelope automatically, so you don't construct CloudEvents manually—but understanding the format helps when debugging or integrating with external systems that expect CloudEvents.

Safety note: When testing pub/sub in production environments, use separate topics for testing. Publishing to production topics during development can trigger real workflows—notifications sent, orders processed, analytics skewed.