Streaming Response Patterns
Your agent works, but it feels sluggish. Users see nothing until the full response arrives—5, 10, sometimes 30 seconds of blank screen. They wonder if it's broken. This isn't a ChatKit problem; it's a streaming problem.
Without streaming, your ChatKit server waits for the entire agent response, then dumps it all at once. With streaming, tokens appear as they're generated—just like ChatGPT's familiar progressive response. The difference is night and day: engaged users instead of frustrated ones.
ChatKit's respond() method returns an AsyncIterator[ThreadStreamEvent]. This lesson teaches you how to implement that iterator correctly—token-by-token delivery, interruption handling, and debugging async issues that cause streams to stall.
Understanding AsyncIterator Streaming
The Problem with Blocking
Without streaming:
from chatkit.types import ThreadItemDoneEvent, AssistantMessageItem, AssistantMessageContent
from datetime import datetime
async def respond(
self,
thread: ThreadMetadata,
input: UserMessageItem | None,
context: Any,
) -> AsyncIterator[ThreadStreamEvent]:
# BAD: Waits for full response
full_response = await agent.run(user_message)
# Yield complete message all at once
msg_id = self.store.generate_item_id("message", thread, context)
yield ThreadItemDoneEvent(
item=AssistantMessageItem(
id=msg_id,
thread_id=thread.id,
created_at=datetime.now(),
content=[AssistantMessageContent(text=full_response)],
),
)
User experience:
- 0-30 seconds: Blank screen (waiting)
- 30 seconds: Full response appears instantly
With streaming:
from chatkit.agents import stream_agent_response
from agents import Agent, Runner
async def respond(
self,
thread: ThreadMetadata,
input: UserMessageItem | None,
context: Any,
) -> AsyncIterator[ThreadStreamEvent]:
# GOOD: Yields tokens as generated
agent = Agent(name="Assistant", instructions="You are helpful")
result = Runner.run_streamed(agent, user_message)
# Helper automatically handles token-by-token streaming
async for event in stream_agent_response(context, result):
yield event
User experience:
- 0-100ms: First token appears
- 100-200ms: Second token
- Progressive display until complete
ThreadStreamEvent Types
ChatKit defines multiple event types you can yield. The most common:
1. ThreadItemDoneEvent (Complete Messages)
Delivers complete assistant messages:
from chatkit.types import ThreadItemDoneEvent, AssistantMessageItem, AssistantMessageContent
from datetime import datetime
async def respond(self, thread, input, context):
# Generate unique message ID
msg_id = self.store.generate_item_id("message", thread, context)
# Yield complete assistant message
yield ThreadItemDoneEvent(
item=AssistantMessageItem(
id=msg_id,
thread_id=thread.id,
created_at=datetime.now(),
content=[AssistantMessageContent(text="Hello! How can I help you?")],
),
)
Key components:
ThreadItemDoneEvent: Marks message complete and persistsAssistantMessageItem: Container with metadata (id, thread_id, created_at)AssistantMessageContent: Wraps the actual text
2. ProgressUpdateEvent
Show transient status updates during processing:
from chatkit.types import ProgressUpdateEvent
# Within a tool or respond() method
yield ProgressUpdateEvent(icon="search", text="Searching 10,000 records...")
# ... processing happens ...
yield ProgressUpdateEvent(icon="check", text="Found 3 matching results")
Available icons: "upload", "search", "check", "clock", "document"
Key difference from messages: Progress events are transient (not persisted to thread history)
Implementing Token-by-Token Streaming
Pattern 1: Using stream_agent_response() Helper (Recommended)
The easiest way to get streaming is using ChatKit's stream_agent_response() helper with OpenAI Agents SDK:
from chatkit.agents import stream_agent_response
from chatkit.server import ChatKitServer
from agents import Agent, Runner
class MyServer(ChatKitServer):
async def respond(self, thread, input, context):
agent = Agent(
name="Assistant",
instructions="You are a helpful assistant",
tools=[search_tool, calculate_tool]
)
# Run agent with streaming enabled
result = Runner.run_streamed(agent, input.content)
# Helper automatically converts agent events to ThreadStreamEvents
async for event in stream_agent_response(context, result):
yield event
Output (progressive):
Token 1: "I"
Token 2: " can"
Token 3: " help"
Token 4: " with"
Token 5: " that"
Token 6: "."
Why this works: stream_agent_response() bridges OpenAI Agents SDK streaming to ChatKit's event system. It handles token-by-token delivery, tool status updates, and completion markers automatically.
Pattern 2: Complete Messages (Non-Streaming)
If your agent returns complete responses (not streaming), use ThreadItemDoneEvent:
from chatkit.types import ThreadItemDoneEvent, AssistantMessageItem, AssistantMessageContent
from datetime import datetime
async def respond(self, thread, input, context):
# Get complete response from agent
full_response = await some_agent_call(input.content)
# Generate message ID
msg_id = self.store.generate_item_id("message", thread, context)
# Yield complete message
yield ThreadItemDoneEvent(
item=AssistantMessageItem(
id=msg_id,
thread_id=thread.id,
created_at=datetime.now(),
content=[AssistantMessageContent(text=full_response)],
),
)
Output (all at once):
[0-5s wait]
Full response: "I can help with that. Here's what you need..."
Handling Stream Interruptions
The Problem
User sends a message while agent is still streaming. What should happen?
Bad approach: Finish the old stream, then start new one
- User sees stale response continue for seconds
- Wastes compute and tokens
- Confusing UX
Good approach: Detect interruption, cancel stream, start new response
- Agent stops immediately
- New response starts fresh
- Clean user experience
Detecting Interruptions
ChatKit automatically handles stream cancellation when users send a new message. Override handle_stream_cancelled for custom cleanup:
from chatkit.server import ChatKitServer
from chatkit.types import ThreadMetadata, ThreadItem, AssistantMessageItem, HiddenContextItem
from datetime import datetime
class MyServer(ChatKitServer):
async def handle_stream_cancelled(
self,
thread: ThreadMetadata,
pending_items: list[ThreadItem],
context: dict,
):
"""Custom cleanup when user cancels stream"""
# Save partial assistant messages that have content
for item in pending_items:
if isinstance(item, AssistantMessageItem):
if any(content.text.strip() for content in item.content):
await self.store.add_thread_item(thread.id, item, context=context)
# Add context note for next turn
await self.store.add_thread_item(
thread.id,
HiddenContextItem(
id=self.store.generate_item_id("sdk_hidden_context", thread, context),
thread_id=thread.id,
created_at=datetime.now(),
content="User cancelled the previous response."
),
context=context
)
Enabling Cancellation
By default, ChatKit shows a stop button during streaming. Configure this behavior:
from chatkit.server import ChatKitServer
from chatkit.types import StreamOptions
class MyServer(ChatKitServer):
def get_stream_options(self, thread, context):
"""Allow users to cancel streams"""
return StreamOptions(allow_cancel=True) # Default behavior
Progress Indicators
Show "thinking" state when agent hasn't generated tokens yet:
from chatkit.types import ProgressUpdateEvent
from agents import Agent, Runner
from chatkit.agents import stream_agent_response
async def respond(self, thread, input, context):
# Show thinking indicator
yield ProgressUpdateEvent(icon="clock", text="Thinking...")
# Start agent (may take 1-2 seconds before first token)
agent = Agent(...)
result = Runner.run_streamed(agent, input.content)
# Clear thinking indicator
yield ProgressUpdateEvent(icon="check", text="")
# Stream response
async for event in stream_agent_response(context, result):
yield event
User sees:
0-2s: "Thinking..." with clock icon
2s: First token appears, indicator clears
2-10s: Progressive response
Common Async Pitfalls
Pitfall 1: Blocking Operation in Async Context
❌ WRONG:
import time
from chatkit.types import ThreadItemDoneEvent, AssistantMessageItem, AssistantMessageContent
from datetime import datetime
async def respond(self, thread, input, context):
# time.sleep() BLOCKS the entire event loop
time.sleep(5) # BAD!
msg_id = self.store.generate_item_id("message", thread, context)
yield ThreadItemDoneEvent(
item=AssistantMessageItem(
id=msg_id,
thread_id=thread.id,
created_at=datetime.now(),
content=[AssistantMessageContent(text="Done")],
),
)
Problem: time.sleep() freezes all concurrent streams.
✅ CORRECT:
import asyncio
from chatkit.types import ThreadItemDoneEvent, AssistantMessageItem, AssistantMessageContent
from datetime import datetime
async def respond(self, thread, input, context):
# asyncio.sleep() yields control to event loop
await asyncio.sleep(5) # GOOD!
msg_id = self.store.generate_item_id("message", thread, context)
yield ThreadItemDoneEvent(
item=AssistantMessageItem(
id=msg_id,
thread_id=thread.id,
created_at=datetime.now(),
content=[AssistantMessageContent(text="Done")],
),
)
Pitfall 2: Forgetting to Yield
❌ WRONG:
from agents import Agent, Runner
from chatkit.agents import stream_agent_response
async def respond(self, thread, input, context):
agent = Agent(...)
result = Runner.run_streamed(agent, input.content)
# Forgot to yield!
stream_agent_response(context, result)
Problem: No events sent to client. Stream appears frozen.
✅ CORRECT:
from agents import Agent, Runner
from chatkit.agents import stream_agent_response
async def respond(self, thread, input, context):
agent = Agent(...)
result = Runner.run_streamed(agent, input.content)
# Use 'async for' and 'yield'
async for event in stream_agent_response(context, result):
yield event
Pitfall 3: Missing await on Async Iterator
❌ WRONG:
async def respond(self, thread, input, context):
result = Runner.run_streamed(agent, input.text)
# Missing 'async' before 'for'
for event in stream_agent_response(context, result): # BAD!
yield event
Problem: Python treats stream_agent_response() as sync iterator, fails.
✅ CORRECT:
async def respond(self, thread, input, context):
result = Runner.run_streamed(agent, input.text)
# Use 'async for' with async iterators
async for event in stream_agent_response(context, result):
yield event
Debugging Stalled Streams
Symptom: Stream Starts But Never Completes
Check 1: Are you using the stream_agent_response() helper?
from agents import Agent, Runner
from chatkit.agents import stream_agent_response
# GOOD: Helper handles completion automatically
async def respond(self, thread, input, context):
agent = Agent(...)
result = Runner.run_streamed(agent, input.content)
async for event in stream_agent_response(context, result):
yield event
# No manual end marker needed - helper handles it
If not using helper, ensure you yield a complete ThreadItemDoneEvent:
from chatkit.types import ThreadItemDoneEvent, AssistantMessageItem, AssistantMessageContent
from datetime import datetime
# Accumulate tokens
full_text = ""
async for token in some_stream:
full_text += token
# Yield complete message when done
msg_id = self.store.generate_item_id("message", thread, context)
yield ThreadItemDoneEvent(
item=AssistantMessageItem(
id=msg_id,
thread_id=thread.id,
created_at=datetime.now(),
content=[AssistantMessageContent(text=full_text)],
),
)
Check 2: Is there a blocking operation?
# Add logging to find blocking code
async def respond(self, thread, input, context):
print("Starting respond()")
agent = Agent(...)
print("Agent created")
result = Runner.run_streamed(agent, input.text)
print("Stream started")
async for event in stream_agent_response(context, result):
print(f"Yielding event: {event}")
yield event
print("Stream complete")
If logs stop at "Stream started" without "Yielding event", the agent stream is stalled.
Symptom: Tokens Come in Bursts Instead of Smoothly
Problem: Network buffering or batching
Solution: Use stream_agent_response() helper which handles buffering correctly
from chatkit.agents import stream_agent_response
from agents import Agent, Runner
async def respond(self, thread, input, context):
agent = Agent(...)
result = Runner.run_streamed(agent, input.content)
# Helper handles proper token delivery timing
async for event in stream_agent_response(context, result):
yield event
Comparison Table
| Aspect | Blocking Response | Streaming Response |
|---|---|---|
| First token | 5-30 seconds | 50-200ms |
| User experience | Waiting, then sudden dump | Progressive, feels responsive |
| Interruptions | Must wait for completion | Immediate cancellation |
| Progress visibility | None | Token-by-token + indicators |
| Implementation | return response | async for ... yield |
Safety Note
Network failures during streaming: If client disconnects mid-stream, ChatKit automatically handles cleanup. Override handle_stream_cancelled() to save partial responses or perform custom cleanup when streams are interrupted.
Try With AI
You've seen streaming patterns. Now practice implementation with AI assistance.
Setup
- ChatKit server from Lesson 2
- Claude Code or similar AI IDE
- Python 3.11+ with asyncio
Exercise 1: Diagnose Stream Stall
Your agent stream starts but stops halfway. Help me debug:
My ChatKit server starts streaming tokens but freezes after 5-10 tokens.
No errors in logs. Stream never completes. User sees partial response forever.
Here's my respond() method:
[paste your code]
What's causing the stall?
What you're learning: AI helps trace async execution flow and identify missing awaits or blocking operations.
Exercise 2: Add Progress Indicators
Add "thinking" state before first token:
My agent takes 2-3 seconds before generating the first token.
Users think it's broken. Add a progress indicator showing
the agent is thinking.
Current respond():
[paste code]
Show "Thinking..." until first token arrives.
What you're learning: AI suggests ProgressUpdateEvent patterns and placement within async flow.
Exercise 3: Handle Interruptions
User sends new message while agent is streaming. Handle it:
When users send a new message mid-stream, I want to save the partial
response and clean up gracefully. How do I override the stream
cancellation handler?
Current code:
[paste respond() method]
What you're learning: AI explains handle_stream_cancelled() override patterns and how to save partial assistant responses.