Skip to main content
Updated Feb 23, 2026

Async Producers and Consumers in FastAPI

Your Task API works. Users create tasks via POST requests, the database stores them, and responses return in milliseconds. But now you need to notify other services when tasks are created. The notification service, audit logger, and reminder scheduler all need to know about new tasks---and they shouldn't slow down your API response.

This is where event-driven architecture shines. Instead of your API calling each service directly (and waiting for responses), you publish a task.created event to Kafka. Services consume at their own pace. Your API stays fast. Consumers process independently.

But integrating Kafka with FastAPI raises architectural questions. Kafka's Python libraries use blocking I/O, but FastAPI is async. How do you initialize producers at startup and clean them up at shutdown? How do you consume events in the background without blocking your API? This lesson answers these questions with production patterns you'll use repeatedly.

The Challenge: Async FastAPI vs Blocking Kafka

FastAPI is built on asyncio. When you write async def create_task(), you're telling Python this function can pause and let other requests run while waiting for I/O. This is why FastAPI handles thousands of concurrent requests efficiently.

But here's the problem: confluent-kafka-python, the most robust Kafka client, is not async. It wraps librdkafka, a C library that uses blocking calls internally. When you call producer.produce(), it doesn't block---it queues the message. But producer.flush() blocks until messages are delivered. And consumers are entirely blocking: consumer.poll() waits for messages.

Two approaches to async Kafka in Python:

LibraryNatureStrengthsConsiderations
confluent-kafka-pythonBlocking (C-based)Production-grade, commercial support, full feature setRequires threading for consumers
aiokafkaNative asyncBuilt for asyncio, natural FastAPI fitPure Python, less commercial support

For production systems, we'll use confluent-kafka-python because of its reliability and feature completeness. The threading patterns you'll learn work with any blocking library. We'll show where aiokafka makes sense as an alternative.

Producer Integration with FastAPI Lifespan

FastAPI's lifespan events let you run code at startup and shutdown. This is perfect for Kafka producers: initialize once at startup, share across requests, flush and close at shutdown.

The Lifespan Pattern

from contextlib import asynccontextmanager
from fastapi import FastAPI
from confluent_kafka import Producer
import json
import uuid

# Global producer - shared across all requests
producer: Producer | None = None

def delivery_report(err, msg):
"""Callback for delivery confirmation."""
if err is not None:
print(f"Delivery failed for {msg.key()}: {err}")
else:
print(f"Delivered {msg.key()} to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")

@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage Kafka producer lifecycle with FastAPI."""
global producer

# Startup: Initialize producer
producer = Producer({
'bootstrap.servers': 'localhost:30092',
'client.id': 'task-api',
'acks': 'all',
'enable.idempotence': True,
'delivery.timeout.ms': 30000
})
print("Kafka producer initialized")

yield # Application runs here

# Shutdown: Flush pending messages and close
if producer:
remaining = producer.flush(timeout=10)
if remaining > 0:
print(f"Warning: {remaining} messages not delivered on shutdown")
print("Kafka producer shut down")

app = FastAPI(lifespan=lifespan)

Output (on startup):

Kafka producer initialized
INFO: Application startup complete.

Output (on shutdown):

INFO:     Shutting down
Kafka producer shut down
INFO: Application shutdown complete.

Why lifespan instead of startup/shutdown events? FastAPI deprecated @app.on_event("startup") in favor of lifespan context managers. The lifespan pattern guarantees cleanup runs even if startup fails partially.

Publishing Events from Endpoints

With the producer initialized, endpoints can publish events without blocking:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from datetime import datetime

class TaskCreate(BaseModel):
title: str
description: str | None = None
priority: int = 1

class TaskResponse(BaseModel):
id: str
title: str
description: str | None
priority: int
created_at: str

@app.post("/tasks", response_model=TaskResponse)
async def create_task(task: TaskCreate):
"""Create a task and publish event to Kafka."""
# Generate task data
task_id = str(uuid.uuid4())
created_at = datetime.utcnow().isoformat() + "Z"

# Simulate database write (in production, use actual DB)
task_data = {
"id": task_id,
"title": task.title,
"description": task.description,
"priority": task.priority,
"created_at": created_at
}

# Publish event to Kafka (non-blocking)
event = {
"event_type": "task.created",
"event_id": str(uuid.uuid4()),
"occurred_at": created_at,
"data": task_data
}

producer.produce(
topic='task-created',
key=task_id,
value=json.dumps(event),
callback=delivery_report
)

# Service callbacks without blocking
producer.poll(0)

return TaskResponse(**task_data)

Output (on request):

$ curl -X POST "http://localhost:8000/tasks" \
-H "Content-Type: application/json" \
-d '{"title": "Review PR", "priority": 2}'

{"id":"a1b2c3d4-e5f6-7890-abcd-ef1234567890","title":"Review PR","description":null,"priority":2,"created_at":"2025-01-15T10:30:00.000000Z"}
Delivered a1b2c3d4-e5f6-7890-abcd-ef1234567890 to task-created [0] @ 157

Critical pattern: producer.poll(0)

The produce() call queues the message but doesn't wait for delivery confirmation. The delivery callback won't fire until you call poll(). By calling poll(0) (zero timeout), you process any pending callbacks without blocking. This pattern:

  1. Queues the message immediately
  2. Returns the HTTP response fast
  3. Processes delivery confirmations in the background

Thread Safety Considerations

The confluent-kafka Producer is thread-safe. Multiple FastAPI workers (via Uvicorn workers) can share the same producer instance. However, each worker process needs its own producer because Python processes don't share memory.

# For multi-worker deployments, each worker initializes its own producer
# This happens automatically because lifespan runs per-process

# Example: uvicorn main:app --workers 4
# Result: 4 separate producers, one per worker process

Background Consumer Pattern

Consuming events is trickier. The consumer poll loop is blocking---it waits for messages. You can't run it in an async function without blocking the event loop. The solution: run the consumer in a background thread.

Threaded Consumer Architecture

from threading import Thread, Event
from confluent_kafka import Consumer, KafkaError
import json

# Consumer state
consumer_thread: Thread | None = None
shutdown_event = Event()

def consume_loop():
"""Background thread that consumes Kafka messages."""
consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'task-api-worker',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False
})

consumer.subscribe(['task-created'])
print("Consumer started, listening for events...")

try:
while not shutdown_event.is_set():
msg = consumer.poll(1.0) # 1 second timeout

if msg is None:
continue

if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
print(f"Consumer error: {msg.error()}")
continue

# Process the message
try:
event = json.loads(msg.value().decode())
handle_task_event(event)
consumer.commit(message=msg)
except Exception as e:
print(f"Failed to process message: {e}")
# In production: implement retry or DLQ

finally:
consumer.close()
print("Consumer shut down")

def handle_task_event(event: dict):
"""Process a task event. Customize for your use case."""
event_type = event.get("event_type")
task_data = event.get("data", {})

print(f"Processing {event_type}: {task_data.get('title')}")

# Example: Send notification, update cache, trigger workflow
# In production, this would call external services

Integrating Consumer with Lifespan

Update the lifespan to start and stop the consumer thread:

@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage Kafka producer and consumer lifecycle."""
global producer, consumer_thread

# Startup: Initialize producer
producer = Producer({
'bootstrap.servers': 'localhost:30092',
'client.id': 'task-api',
'acks': 'all',
'enable.idempotence': True
})
print("Kafka producer initialized")

# Startup: Start consumer thread
shutdown_event.clear()
consumer_thread = Thread(target=consume_loop, daemon=True)
consumer_thread.start()
print("Kafka consumer thread started")

yield # Application runs here

# Shutdown: Signal consumer to stop
shutdown_event.set()
if consumer_thread and consumer_thread.is_alive():
consumer_thread.join(timeout=5)
if consumer_thread.is_alive():
print("Warning: Consumer thread did not stop cleanly")
print("Kafka consumer stopped")

# Shutdown: Flush producer
if producer:
remaining = producer.flush(timeout=10)
if remaining > 0:
print(f"Warning: {remaining} messages not delivered")
print("Kafka producer shut down")

app = FastAPI(lifespan=lifespan)

Output (startup):

Kafka producer initialized
Kafka consumer thread started
INFO: Application startup complete.
Consumer started, listening for events...

Output (when event is published and consumed):

Delivered task-123 to task-created [0] @ 158
Processing task.created: Review PR

Output (shutdown):

INFO:     Shutting down
Kafka consumer stopped
Kafka producer shut down
INFO: Application shutdown complete.
Consumer shut down

Why Threading Instead of Asyncio?

You might wonder: "FastAPI is async, shouldn't the consumer be async too?"

The answer involves understanding what confluent-kafka does internally:

ApproachHow it WorksProblem
Async wrapperRun poll() in executorWorks but adds overhead, loses batching benefits
Background threadDedicated thread for consumerClean separation, standard Kafka pattern
aiokafkaNative async consumerRequires switching libraries

The threading approach is recommended by Confluent for asyncio applications. The consumer thread runs independently of the event loop, processing messages at its own pace. This matches how production Kafka consumers typically run---as separate processes or threads from the API.

Alternative: Native Async with aiokafka

If you prefer fully async code and can accept the trade-offs, aiokafka provides native asyncio support:

from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncio
import json

producer: AIOKafkaProducer | None = None
consumer: AIOKafkaConsumer | None = None
consumer_task: asyncio.Task | None = None

async def consume_events():
"""Async consumer loop using aiokafka."""
async for msg in consumer:
event = json.loads(msg.value.decode())
print(f"Processing: {event.get('event_type')}")
# Process event...

@asynccontextmanager
async def lifespan(app: FastAPI):
global producer, consumer, consumer_task

# Initialize async producer
producer = AIOKafkaProducer(
bootstrap_servers='localhost:30092',
value_serializer=lambda v: json.dumps(v).encode()
)
await producer.start()

# Initialize async consumer
consumer = AIOKafkaConsumer(
'task-created',
bootstrap_servers='localhost:30092',
group_id='task-api-worker'
)
await consumer.start()

# Start consumer as background task
consumer_task = asyncio.create_task(consume_events())

yield

# Cleanup
consumer_task.cancel()
await consumer.stop()
await producer.stop()

app = FastAPI(lifespan=lifespan)

@app.post("/tasks")
async def create_task(title: str):
task_id = str(uuid.uuid4())
event = {"event_type": "task.created", "data": {"id": task_id, "title": title}}

# Truly async send
await producer.send_and_wait('task-created', key=task_id.encode(), value=event)

return {"id": task_id}

When to choose aiokafka:

  • Your team prefers pure Python solutions
  • You want simpler async code without threading
  • You don't need advanced features (transactions, exactly-once)
  • Development velocity matters more than raw performance

When to choose confluent-kafka + threading:

  • You need maximum reliability and performance
  • You require transactions or exactly-once semantics
  • Commercial support is important
  • You're already using confluent-kafka elsewhere

Complete Production Example

Here's the full pattern combining producer, consumer, and proper lifecycle management:

from contextlib import asynccontextmanager
from threading import Thread, Event
from fastapi import FastAPI, HTTPException
from confluent_kafka import Producer, Consumer, KafkaError
from pydantic import BaseModel
import json
import uuid
import os
from datetime import datetime

# Configuration
KAFKA_BOOTSTRAP = os.environ.get(
'KAFKA_BOOTSTRAP_SERVERS',
'localhost:30092'
)

# Global state
producer: Producer | None = None
consumer_thread: Thread | None = None
shutdown_event = Event()

# Pydantic models
class TaskCreate(BaseModel):
title: str
description: str | None = None
priority: int = 1

class TaskResponse(BaseModel):
id: str
title: str
description: str | None
priority: int
created_at: str

# Delivery callback
def delivery_report(err, msg):
if err:
print(f"Delivery failed: {err}")
else:
print(f"Delivered: {msg.key().decode()} @ {msg.offset()}")

# Consumer loop
def consume_loop():
consumer = Consumer({
'bootstrap.servers': KAFKA_BOOTSTRAP,
'group.id': f"task-api-{os.environ.get('HOSTNAME', 'local')}",
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'session.timeout.ms': 45000
})
consumer.subscribe(['task-created'])

try:
while not shutdown_event.is_set():
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
print(f"Error: {msg.error()}")
continue

try:
event = json.loads(msg.value().decode())
# Process event (in production: call notification service, etc.)
print(f"Consumed: {event['event_type']} - {event['data']['title']}")
consumer.commit(message=msg)
except Exception as e:
print(f"Processing failed: {e}")
finally:
consumer.close()

# Lifespan manager
@asynccontextmanager
async def lifespan(app: FastAPI):
global producer, consumer_thread

# Initialize producer
producer = Producer({
'bootstrap.servers': KAFKA_BOOTSTRAP,
'client.id': 'task-api',
'acks': 'all',
'enable.idempotence': True,
'delivery.timeout.ms': 30000
})

# Start consumer thread
shutdown_event.clear()
consumer_thread = Thread(target=consume_loop, daemon=True)
consumer_thread.start()

print(f"Kafka integration ready (bootstrap: {KAFKA_BOOTSTRAP})")

yield

# Shutdown
shutdown_event.set()
if consumer_thread:
consumer_thread.join(timeout=5)
if producer:
producer.flush(timeout=10)
print("Kafka integration shut down")

# FastAPI app
app = FastAPI(
title="Task API",
lifespan=lifespan
)

@app.post("/tasks", response_model=TaskResponse)
async def create_task(task: TaskCreate):
"""Create a task and publish event."""
task_id = str(uuid.uuid4())
created_at = datetime.utcnow().isoformat() + "Z"

task_data = {
"id": task_id,
"title": task.title,
"description": task.description,
"priority": task.priority,
"created_at": created_at
}

# Publish event
event = {
"event_type": "task.created",
"event_id": str(uuid.uuid4()),
"occurred_at": created_at,
"data": task_data
}

producer.produce(
topic='task-created',
key=task_id,
value=json.dumps(event),
callback=delivery_report
)
producer.poll(0)

return TaskResponse(**task_data)

@app.get("/health")
async def health():
"""Health check endpoint."""
return {
"status": "healthy",
"kafka_producer": producer is not None,
"kafka_consumer": consumer_thread is not None and consumer_thread.is_alive()
}

Output (full flow):

$ uvicorn main:app --reload
INFO: Started server process
Kafka integration ready (bootstrap: localhost:30092)
INFO: Application startup complete.

$ curl -X POST http://localhost:8000/tasks \
-H "Content-Type: application/json" \
-d '{"title": "Write documentation", "priority": 1}'

Delivered: 7f8a9b0c-1234-5678-90ab-cdef12345678 @ 159
Consumed: task.created - Write documentation

$ curl http://localhost:8000/health
{"status":"healthy","kafka_producer":true,"kafka_consumer":true}

Refining the Integration

You've built a working FastAPI + Kafka integration. Let's explore how to refine it for your specific requirements.

Initial design question:

"Should the API wait for Kafka acknowledgment before responding to the client?"

Exploring the trade-offs:

The current pattern returns immediately after produce() without waiting for delivery confirmation. This means:

  • API response is fast (sub-millisecond for Kafka interaction)
  • Client gets response before message is confirmed
  • If Kafka fails, client doesn't know

Alternative: Wait for acknowledgment

# Block until delivery (adds latency, provides guarantee)
producer.produce(topic, key, value)
producer.flush(timeout=5) # Wait up to 5 seconds

This adds ~20-50ms latency but guarantees the client knows if publishing failed.

What emerged from this exploration:

  • For most APIs, async publishing (current pattern) is correct
  • For critical workflows (payments, orders), consider flush or transactions
  • The health endpoint helps operators verify Kafka connectivity
  • Consumer group naming with hostname enables per-pod metrics in Kubernetes

Reflect on Your Skill

You built a kafka-events skill in Lesson 0. Test and improve it based on what you learned.

Test Your Skill

Using my kafka-events skill, integrate Kafka producers and consumers into a FastAPI application.
Does my skill show proper async producer/consumer patterns, connection pooling, and graceful shutdown?

Identify Gaps

Ask yourself:

  • Did my skill explain how to avoid blocking FastAPI's async event loop?
  • Did it cover lifespan events for proper producer/consumer initialization and cleanup?

Improve Your Skill

If you found gaps:

My kafka-events skill is missing FastAPI integration patterns (async producers, background consumers, lifespan events).
Update it to include how to integrate Kafka with FastAPI without blocking the event loop.

Try With AI

Apply what you've learned by designing FastAPI + Kafka integrations for your domain.

Setup: Open your AI assistant with the FastAPI project context.


Prompt 1: Analyze your async architecture

I'm integrating Kafka with FastAPI. Here's my current setup:

- FastAPI app handling 1000 requests/second
- Need to publish events for: order.created, order.updated, order.shipped
- Consumers: notification service, analytics, inventory sync
- Running on Kubernetes with 3 replicas

Questions:
1. Should I use one producer for all event types or separate producers?
2. Should consumers run in the same pods as the API or separately?
3. How do I handle the case where Kafka is temporarily unavailable?

What you're learning: AI helps you think through architectural decisions specific to your scale and deployment model. The answer depends on your failure modes and operational preferences.


Prompt 2: Debug a consumer issue

My FastAPI app starts but the consumer thread dies silently after a few minutes.
The main app keeps running and serving requests.

Current setup:
- Consumer in daemon thread (Thread(target=consume_loop, daemon=True))
- No explicit error handling in consume_loop
- Kubernetes with liveness probe on /health

What could cause this? How do I diagnose and fix it?

What you're learning: AI walks you through common causes (session timeout, max.poll.interval exceeded, unhandled exceptions) and debugging strategies (logging, health endpoint enhancement, thread monitoring).


Prompt 3: Design for your domain

I need to add Kafka to my existing FastAPI app:

- Current: Simple REST API for [your domain, e.g., "expense reports"]
- Goal: Publish events when [specific action, e.g., "expense is submitted for approval"]
- Consumers: [who needs these events, e.g., "manager notification, budget tracking, audit log"]

Design the integration:
1. Event schema for my domain
2. Producer configuration
3. Whether I need a consumer in the same app
4. Error handling strategy

What you're learning: AI collaborates on translating generic patterns to your specific domain, helping you decide what to publish, how to structure events, and where consumers should run.


Safety note: When testing FastAPI + Kafka integration locally, ensure your Kafka cluster is running before starting the app. The lifespan will fail at startup if it can't connect, which is the correct behavior---you want fast failure rather than an app that starts but can't publish events.