Skip to main content

Streaming with SSE

Some operations take time. When an AI agent generates a response, you don't want users staring at a loading spinner for 30 seconds. Streaming sends data as it becomes available—token by token for LLMs, update by update for long-running tasks.

This is the lesson that connects everything to agents. In Lesson 7, you'll stream actual agent responses. Here, you build the foundation with simulated data.

Why Streaming Changes Everything

Traditional request-response (what you've built so far):

  1. Client sends request
  2. Server processes (30 seconds for agent response)
  3. Client waits with no feedback...
  4. Server sends complete response

Streaming:

  1. Client sends request
  2. Server starts processing
  3. Server sends first token immediately
  4. Server sends more tokens as available
  5. Client sees response forming in real-time

You've experienced this in ChatGPT—words appearing as the model generates them. That's streaming.

For agents, streaming means:

  • Users see responses forming, not waiting
  • Long tool calls show progress
  • Failed operations fail fast, not after timeout
  • Better perceived performance (first byte matters)

How SSE Works (Under the Hood)

Server-Sent Events is a simple protocol. The server sends text in a specific format:

Output (raw SSE stream):

event: task_update
data: {"task_id": 1, "status": "in_progress"}

event: task_update
data: {"task_id": 1, "status": "completed"}

Each event has:

  • event: Event type (optional, defaults to "message")
  • data: The payload (must be a string, usually JSON)
  • Blank line: Separates events

Why SSE over WebSockets?

  • SSE is simpler—just HTTP with a special content type
  • Works through proxies and load balancers without configuration
  • Browser handles reconnection automatically
  • One-directional (server → client) which is exactly what streaming needs

WebSockets are bidirectional, which adds complexity you don't need for agent responses.

Installing sse-starlette

FastAPI doesn't include SSE by default. Add the package:

uv add sse-starlette

This provides EventSourceResponse, which handles SSE formatting automatically.

Your First Streaming Endpoint

from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio
import json

app = FastAPI(title="Task API")

async def task_updates_generator():
"""Simulates task status updates over time."""
for i in range(5):
yield {
"event": "task_update",
"data": json.dumps({
"task_id": i + 1,
"status": "processing",
"progress": (i + 1) * 20
})
}
await asyncio.sleep(1) # Simulate work

yield {
"event": "complete",
"data": json.dumps({"message": "All tasks processed"})
}

@app.get("/tasks/stream")
async def stream_task_updates():
return EventSourceResponse(task_updates_generator())

Breaking this down:

  1. async def with yield creates an async generator—a function that produces values over time
  2. Each yield sends one SSE event to the client
  3. await asyncio.sleep(1) simulates work (in Lesson 7, this is where the agent generates tokens)
  4. EventSourceResponse wraps the generator and handles SSE formatting

The key insight: yield doesn't end the function. It pauses, sends data, then continues. This is fundamentally different from return.

Why Async Generators Matter for Agents

In Lesson 7, you'll stream agent responses like this:

async def agent_response_generator(message: str):
result = Runner.run_streamed(agent, message)
async for event in result.stream_events():
if event.type == "raw_response_event":
# Extract token text from the response delta
if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
yield {
"event": "token",
"data": event.data.delta.text
}

The pattern is identical:

  • Async generator yields data
  • EventSourceResponse sends it
  • Client receives tokens as they generate

Master the pattern here with simulated data. Lesson 7 plugs in the real agent.

Testing in Browser

Swagger UI doesn't work for SSE—it expects regular responses. Open your browser's console:

const source = new EventSource('http://localhost:8000/tasks/stream');

source.onmessage = (event) => {
console.log('Message:', event.data);
};

source.addEventListener('task_update', (event) => {
console.log('Task update:', JSON.parse(event.data));
});

source.addEventListener('complete', (event) => {
console.log('Complete:', JSON.parse(event.data));
source.close();
});

source.onerror = (error) => {
console.error('Error:', error);
source.close();
};

You'll see events arriving one second apart.

Important: The browser automatically reconnects if the connection drops. That's a feature of EventSource. For agent responses, you might want to disable this (handled in the client code).

Streaming with Context

Let's add streaming that relates to a specific task:

from fastapi import Depends, HTTPException, status
from repository import TaskRepository, get_task_repo

async def task_progress_generator(task_id: int, task_title: str):
"""Streams progress updates for a specific task."""
steps = [
"Analyzing task...",
"Processing requirements...",
"Generating output...",
"Validating results...",
"Finalizing...",
]

for i, step in enumerate(steps, 1):
yield {
"event": "progress",
"data": json.dumps({
"task_id": task_id,
"task_title": task_title,
"step": i,
"total_steps": len(steps),
"message": step,
"percentage": int((i / len(steps)) * 100)
})
}
await asyncio.sleep(0.8)

yield {
"event": "complete",
"data": json.dumps({
"task_id": task_id,
"status": "completed"
})
}

@app.post("/tasks/{task_id}/execute")
async def execute_task(
task_id: int,
repo: TaskRepository = Depends(get_task_repo)
):
# Verify task exists before streaming
task = repo.get_by_id(task_id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Task with id {task_id} not found"
)

return EventSourceResponse(
task_progress_generator(task_id, task["title"])
)

Notice the pattern:

  1. Validate input BEFORE returning the stream
  2. Pass context (task_id, task_title) to the generator
  3. Generator doesn't need to access the repository—it just yields data

This matters for agents: you'll validate the conversation exists, then stream the response.

Error Handling in Streams

What happens when an error occurs mid-stream? The client has already received some data.

async def risky_generator():
try:
for i in range(10):
if i == 5:
raise ValueError("Something went wrong at step 5!")
yield {
"event": "step",
"data": json.dumps({"step": i})
}
await asyncio.sleep(0.5)
except Exception as e:
# Send error as an event, don't raise
yield {
"event": "error",
"data": json.dumps({"error": str(e)})
}

The key insight: Once streaming starts, you can't change the HTTP status code. It's already been sent as 200. So you send an error EVENT, and the client handles it.

For agents, this means:

  • Agent starts generating
  • Tool call fails mid-response
  • Stream an error event
  • Client shows error in the UI

The Complete Streaming Example

from fastapi import FastAPI, Depends, HTTPException, status
from sse_starlette.sse import EventSourceResponse
import asyncio
import json

from repository import TaskRepository, get_task_repo

app = FastAPI(title="Task API")

# Stream 1: System-wide updates
async def system_updates_generator():
"""Simulates system-wide events."""
events = [
("info", {"message": "System started"}),
("task_created", {"task_id": 1}),
("task_updated", {"task_id": 1, "status": "in_progress"}),
("task_completed", {"task_id": 1}),
("info", {"message": "Batch complete"}),
]

for event_type, data in events:
yield {
"event": event_type,
"data": json.dumps(data)
}
await asyncio.sleep(1)

@app.get("/stream/system")
async def stream_system_updates():
return EventSourceResponse(system_updates_generator())

# Stream 2: Task-specific progress
async def task_work_generator(task_id: int, task_title: str):
"""Simulates work on a specific task."""
steps = [
"Starting task...",
"Analyzing requirements...",
"Processing data...",
"Generating output...",
"Finalizing...",
]

for i, step in enumerate(steps, 1):
yield {
"event": "step",
"data": json.dumps({
"task_id": task_id,
"task_title": task_title,
"step": i,
"message": step,
"progress": int((i / len(steps)) * 100)
})
}
await asyncio.sleep(0.8)

yield {
"event": "done",
"data": json.dumps({
"task_id": task_id,
"message": "Task completed successfully"
})
}

@app.post("/tasks/{task_id}/execute")
async def execute_task(
task_id: int,
repo: TaskRepository = Depends(get_task_repo)
):
task = repo.get_by_id(task_id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Task {task_id} not found"
)

return EventSourceResponse(
task_work_generator(task_id, task["title"])
)

# Stream 3: Countdown (simple demo)
async def countdown_generator(seconds: int):
"""Simple countdown stream."""
for i in range(seconds, 0, -1):
yield {
"event": "tick",
"data": json.dumps({"remaining": i})
}
await asyncio.sleep(1)

yield {
"event": "complete",
"data": json.dumps({"message": "Countdown finished!"})
}

@app.get("/stream/countdown/{seconds}")
async def stream_countdown(seconds: int):
if seconds < 1 or seconds > 60:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Seconds must be between 1 and 60"
)
return EventSourceResponse(countdown_generator(seconds))

Hands-On Exercise

Build a streaming endpoint for task processing:

Step 1: Add sse-starlette to your project

uv add sse-starlette

Step 2: Create a streaming endpoint that validates the task exists first

Step 3: Test in browser console

const source = new EventSource('http://localhost:8000/tasks/1/execute', {
method: 'POST' // Note: EventSource is GET-only by default
});

Wait—EventSource only supports GET! For POST, you need a different approach:

// For POST endpoints, use fetch with streaming
async function streamTask(taskId) {
const response = await fetch(`http://localhost:8000/tasks/${taskId}/execute`, {
method: 'POST'
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
const {done, value} = await reader.read();
if (done) break;
console.log(decoder.decode(value));
}
}

streamTask(1);

Step 4: Observe events arriving in real-time

Challenge: Build a Progress Tracker

Before looking at any solution, design a streaming endpoint:

The Problem: Build an endpoint that simulates an AI agent "thinking":

  • Starts with "Analyzing request..."
  • Shows 3-5 intermediate "thoughts"
  • Ends with a "conclusion"
  • Takes about 5 seconds total

Think about:

  • What events do you need? (thinking, thought, conclusion?)
  • How do you structure the data for each event?
  • How would a frontend render this progressively?

Implement it. Then compare with AI:

"I built a thinking stream like this: [paste your code]. The frontend will need to render each thought in sequence. Is there a better event structure for progressive rendering?"

Common Mistakes

Mistake 1: Forgetting to import json for data serialization

# Wrong - data must be a string
yield {"data": {"task_id": 1}}

# Correct - serialize to JSON string
yield {"data": json.dumps({"task_id": 1})}

SSE data must be a string. If you pass a dict, you'll get errors.

Mistake 2: Not closing the connection on the client

// Wrong - connection stays open forever
const source = new EventSource('/stream');

// Correct - close when done
source.addEventListener('complete', () => source.close());

Open connections consume server resources. Always close when done.

Mistake 3: Blocking the event loop

# Wrong - blocks other requests
import time
time.sleep(1) # This is synchronous!

# Correct - use async sleep
await asyncio.sleep(1)

Synchronous time.sleep() blocks the entire event loop. Other requests can't be processed. Always use await asyncio.sleep().

Mistake 4: Returning instead of yielding

# Wrong - sends nothing
async def generator():
return {"data": "hello"} # Not a generator!

# Correct - yield makes it a generator
async def generator():
yield {"data": "hello"}

return ends the function. yield makes it a generator that produces values.

Try With AI

Now that you understand SSE streaming, explore advanced patterns for production agent systems.

Prompt 1: Understand the Protocol

Explain the SSE protocol format in detail:
1. What are all the optional fields besides 'event' and 'data'?
2. How does the 'id' field enable automatic reconnection?
3. What is the 'retry' field for?

Then show me how to implement resumable streams—if my server crashes mid-stream and the client reconnects, how do I resume from where I left off?

What you're learning: This prompt reveals SSE's built-in resilience features. You'll discover that id enables Last-Event-ID headers on reconnect, letting your server resume from the right position. The retry field controls reconnection timing. These features are crucial for reliable agent streaming over unreliable networks.

Prompt 2: Handle Client Disconnection

When a client disconnects mid-stream, my async generator might keep running, wasting resources:

async def agent_stream():
async for token in expensive_agent_call():
yield {"data": token}
# What if client disconnected here?

How do I detect disconnection and clean up? What if I have database cursors or API connections that need proper cleanup?

What you're learning: This prompt teaches resource management in async contexts. You'll learn about asyncio.CancelledError, context managers for cleanup, and how to structure generators that release resources properly. Essential for agent systems where each stream might hold LLM API connections.

Prompt 3: Choose the Right Technology

Compare SSE vs WebSockets vs HTTP/2 Server Push for my use case:
- Agent streams token-by-token responses (server → client)
- Users can click "Stop" to interrupt generation (client → server)
- Need to work through corporate proxies

Which technology best fits? Can I handle user interrupts with SSE, or do I need WebSockets?

What you're learning: This prompt develops your architecture judgment. You'll discover that SSE handles most agent streaming needs elegantly—the "stop" button can be a separate HTTP request that cancels the stream server-side. WebSockets add complexity you rarely need for unidirectional agent output.


Reflect on Your Skill

You built a fastapi-agent skill in Lesson 0. Test and improve it based on what you learned.

Test Your Skill

Using my fastapi-agent skill, help me implement streaming responses with Server-Sent Events.
Does my skill include async generators, EventSourceResponse, and proper SSE event formatting?

Identify Gaps

Ask yourself:

  • Did my skill include async generator patterns with yield?
  • Did it handle EventSourceResponse from sse-starlette?
  • Did it cover SSE event format (event type, data as JSON string, error handling)?

Improve Your Skill

If you found gaps:

My fastapi-agent skill is missing streaming patterns.
Update it to include async generators for streaming data, EventSourceResponse usage,
SSE event format with event/data fields, and error handling mid-stream.