Skip to main content

Streaming Response Patterns

Your agent works, but it feels sluggish. Users see nothing until the full response arrives—5, 10, sometimes 30 seconds of blank screen. They wonder if it's broken. This isn't a ChatKit problem; it's a streaming problem.

Without streaming, your ChatKit server waits for the entire agent response, then dumps it all at once. With streaming, tokens appear as they're generated—just like ChatGPT's familiar progressive response. The difference is night and day: engaged users instead of frustrated ones.

ChatKit's respond() method returns an AsyncIterator[ThreadStreamEvent]. This lesson teaches you how to implement that iterator correctly—token-by-token delivery, interruption handling, and debugging async issues that cause streams to stall.


Understanding AsyncIterator Streaming

The Problem with Blocking

Without streaming:

from chatkit.types import ThreadItemDoneEvent, AssistantMessageItem, AssistantMessageContent
from datetime import datetime

async def respond(
self,
thread: ThreadMetadata,
input: UserMessageItem | None,
context: Any,
) -> AsyncIterator[ThreadStreamEvent]:
# BAD: Waits for full response
full_response = await agent.run(user_message)

# Yield complete message all at once
msg_id = self.store.generate_item_id("message", thread, context)
yield ThreadItemDoneEvent(
item=AssistantMessageItem(
id=msg_id,
thread_id=thread.id,
created_at=datetime.now(),
content=[AssistantMessageContent(text=full_response)],
),
)

User experience:

  • 0-30 seconds: Blank screen (waiting)
  • 30 seconds: Full response appears instantly

With streaming:

from chatkit.agents import stream_agent_response
from agents import Agent, Runner

async def respond(
self,
thread: ThreadMetadata,
input: UserMessageItem | None,
context: Any,
) -> AsyncIterator[ThreadStreamEvent]:
# GOOD: Yields tokens as generated
agent = Agent(name="Assistant", instructions="You are helpful")
result = Runner.run_streamed(agent, user_message)

# Helper automatically handles token-by-token streaming
async for event in stream_agent_response(context, result):
yield event

User experience:

  • 0-100ms: First token appears
  • 100-200ms: Second token
  • Progressive display until complete

ThreadStreamEvent Types

ChatKit defines multiple event types you can yield. The most common:

1. ThreadItemDoneEvent (Complete Messages)

Delivers complete assistant messages:

from chatkit.types import ThreadItemDoneEvent, AssistantMessageItem, AssistantMessageContent
from datetime import datetime

async def respond(self, thread, input, context):
# Generate unique message ID
msg_id = self.store.generate_item_id("message", thread, context)

# Yield complete assistant message
yield ThreadItemDoneEvent(
item=AssistantMessageItem(
id=msg_id,
thread_id=thread.id,
created_at=datetime.now(),
content=[AssistantMessageContent(text="Hello! How can I help you?")],
),
)

Key components:

  • ThreadItemDoneEvent: Marks message complete and persists
  • AssistantMessageItem: Container with metadata (id, thread_id, created_at)
  • AssistantMessageContent: Wraps the actual text

2. ProgressUpdateEvent

Show transient status updates during processing:

from chatkit.types import ProgressUpdateEvent

# Within a tool or respond() method
yield ProgressUpdateEvent(icon="search", text="Searching 10,000 records...")

# ... processing happens ...

yield ProgressUpdateEvent(icon="check", text="Found 3 matching results")

Available icons: "upload", "search", "check", "clock", "document"

Key difference from messages: Progress events are transient (not persisted to thread history)


Implementing Token-by-Token Streaming

The easiest way to get streaming is using ChatKit's stream_agent_response() helper with OpenAI Agents SDK:

from chatkit.agents import stream_agent_response
from chatkit.server import ChatKitServer
from agents import Agent, Runner

class MyServer(ChatKitServer):
async def respond(self, thread, input, context):
agent = Agent(
name="Assistant",
instructions="You are a helpful assistant",
tools=[search_tool, calculate_tool]
)

# Run agent with streaming enabled
result = Runner.run_streamed(agent, input.content)

# Helper automatically converts agent events to ThreadStreamEvents
async for event in stream_agent_response(context, result):
yield event

Output (progressive):

Token 1: "I"
Token 2: " can"
Token 3: " help"
Token 4: " with"
Token 5: " that"
Token 6: "."

Why this works: stream_agent_response() bridges OpenAI Agents SDK streaming to ChatKit's event system. It handles token-by-token delivery, tool status updates, and completion markers automatically.

Pattern 2: Complete Messages (Non-Streaming)

If your agent returns complete responses (not streaming), use ThreadItemDoneEvent:

from chatkit.types import ThreadItemDoneEvent, AssistantMessageItem, AssistantMessageContent
from datetime import datetime

async def respond(self, thread, input, context):
# Get complete response from agent
full_response = await some_agent_call(input.content)

# Generate message ID
msg_id = self.store.generate_item_id("message", thread, context)

# Yield complete message
yield ThreadItemDoneEvent(
item=AssistantMessageItem(
id=msg_id,
thread_id=thread.id,
created_at=datetime.now(),
content=[AssistantMessageContent(text=full_response)],
),
)

Output (all at once):

[0-5s wait]
Full response: "I can help with that. Here's what you need..."

Handling Stream Interruptions

The Problem

User sends a message while agent is still streaming. What should happen?

Bad approach: Finish the old stream, then start new one

  • User sees stale response continue for seconds
  • Wastes compute and tokens
  • Confusing UX

Good approach: Detect interruption, cancel stream, start new response

  • Agent stops immediately
  • New response starts fresh
  • Clean user experience

Detecting Interruptions

ChatKit automatically handles stream cancellation when users send a new message. Override handle_stream_cancelled for custom cleanup:

from chatkit.server import ChatKitServer
from chatkit.types import ThreadMetadata, ThreadItem, AssistantMessageItem, HiddenContextItem
from datetime import datetime

class MyServer(ChatKitServer):
async def handle_stream_cancelled(
self,
thread: ThreadMetadata,
pending_items: list[ThreadItem],
context: dict,
):
"""Custom cleanup when user cancels stream"""

# Save partial assistant messages that have content
for item in pending_items:
if isinstance(item, AssistantMessageItem):
if any(content.text.strip() for content in item.content):
await self.store.add_thread_item(thread.id, item, context=context)

# Add context note for next turn
await self.store.add_thread_item(
thread.id,
HiddenContextItem(
id=self.store.generate_item_id("sdk_hidden_context", thread, context),
thread_id=thread.id,
created_at=datetime.now(),
content="User cancelled the previous response."
),
context=context
)

Enabling Cancellation

By default, ChatKit shows a stop button during streaming. Configure this behavior:

from chatkit.server import ChatKitServer
from chatkit.types import StreamOptions

class MyServer(ChatKitServer):
def get_stream_options(self, thread, context):
"""Allow users to cancel streams"""
return StreamOptions(allow_cancel=True) # Default behavior

Progress Indicators

Show "thinking" state when agent hasn't generated tokens yet:

from chatkit.types import ProgressUpdateEvent
from agents import Agent, Runner
from chatkit.agents import stream_agent_response

async def respond(self, thread, input, context):
# Show thinking indicator
yield ProgressUpdateEvent(icon="clock", text="Thinking...")

# Start agent (may take 1-2 seconds before first token)
agent = Agent(...)
result = Runner.run_streamed(agent, input.content)

# Clear thinking indicator
yield ProgressUpdateEvent(icon="check", text="")

# Stream response
async for event in stream_agent_response(context, result):
yield event

User sees:

0-2s: "Thinking..." with clock icon
2s: First token appears, indicator clears
2-10s: Progressive response

Common Async Pitfalls

Pitfall 1: Blocking Operation in Async Context

WRONG:

import time
from chatkit.types import ThreadItemDoneEvent, AssistantMessageItem, AssistantMessageContent
from datetime import datetime

async def respond(self, thread, input, context):
# time.sleep() BLOCKS the entire event loop
time.sleep(5) # BAD!

msg_id = self.store.generate_item_id("message", thread, context)
yield ThreadItemDoneEvent(
item=AssistantMessageItem(
id=msg_id,
thread_id=thread.id,
created_at=datetime.now(),
content=[AssistantMessageContent(text="Done")],
),
)

Problem: time.sleep() freezes all concurrent streams.

CORRECT:

import asyncio
from chatkit.types import ThreadItemDoneEvent, AssistantMessageItem, AssistantMessageContent
from datetime import datetime

async def respond(self, thread, input, context):
# asyncio.sleep() yields control to event loop
await asyncio.sleep(5) # GOOD!

msg_id = self.store.generate_item_id("message", thread, context)
yield ThreadItemDoneEvent(
item=AssistantMessageItem(
id=msg_id,
thread_id=thread.id,
created_at=datetime.now(),
content=[AssistantMessageContent(text="Done")],
),
)

Pitfall 2: Forgetting to Yield

WRONG:

from agents import Agent, Runner
from chatkit.agents import stream_agent_response

async def respond(self, thread, input, context):
agent = Agent(...)
result = Runner.run_streamed(agent, input.content)

# Forgot to yield!
stream_agent_response(context, result)

Problem: No events sent to client. Stream appears frozen.

CORRECT:

from agents import Agent, Runner
from chatkit.agents import stream_agent_response

async def respond(self, thread, input, context):
agent = Agent(...)
result = Runner.run_streamed(agent, input.content)

# Use 'async for' and 'yield'
async for event in stream_agent_response(context, result):
yield event

Pitfall 3: Missing await on Async Iterator

WRONG:

async def respond(self, thread, input, context):
result = Runner.run_streamed(agent, input.text)

# Missing 'async' before 'for'
for event in stream_agent_response(context, result): # BAD!
yield event

Problem: Python treats stream_agent_response() as sync iterator, fails.

CORRECT:

async def respond(self, thread, input, context):
result = Runner.run_streamed(agent, input.text)

# Use 'async for' with async iterators
async for event in stream_agent_response(context, result):
yield event

Debugging Stalled Streams

Symptom: Stream Starts But Never Completes

Check 1: Are you using the stream_agent_response() helper?

from agents import Agent, Runner
from chatkit.agents import stream_agent_response

# GOOD: Helper handles completion automatically
async def respond(self, thread, input, context):
agent = Agent(...)
result = Runner.run_streamed(agent, input.content)

async for event in stream_agent_response(context, result):
yield event
# No manual end marker needed - helper handles it

If not using helper, ensure you yield a complete ThreadItemDoneEvent:

from chatkit.types import ThreadItemDoneEvent, AssistantMessageItem, AssistantMessageContent
from datetime import datetime

# Accumulate tokens
full_text = ""
async for token in some_stream:
full_text += token

# Yield complete message when done
msg_id = self.store.generate_item_id("message", thread, context)
yield ThreadItemDoneEvent(
item=AssistantMessageItem(
id=msg_id,
thread_id=thread.id,
created_at=datetime.now(),
content=[AssistantMessageContent(text=full_text)],
),
)

Check 2: Is there a blocking operation?

# Add logging to find blocking code
async def respond(self, thread, input, context):
print("Starting respond()")

agent = Agent(...)
print("Agent created")

result = Runner.run_streamed(agent, input.text)
print("Stream started")

async for event in stream_agent_response(context, result):
print(f"Yielding event: {event}")
yield event

print("Stream complete")

If logs stop at "Stream started" without "Yielding event", the agent stream is stalled.

Symptom: Tokens Come in Bursts Instead of Smoothly

Problem: Network buffering or batching

Solution: Use stream_agent_response() helper which handles buffering correctly

from chatkit.agents import stream_agent_response
from agents import Agent, Runner

async def respond(self, thread, input, context):
agent = Agent(...)
result = Runner.run_streamed(agent, input.content)

# Helper handles proper token delivery timing
async for event in stream_agent_response(context, result):
yield event

Comparison Table

AspectBlocking ResponseStreaming Response
First token5-30 seconds50-200ms
User experienceWaiting, then sudden dumpProgressive, feels responsive
InterruptionsMust wait for completionImmediate cancellation
Progress visibilityNoneToken-by-token + indicators
Implementationreturn responseasync for ... yield

Safety Note

Network failures during streaming: If client disconnects mid-stream, ChatKit automatically handles cleanup. Override handle_stream_cancelled() to save partial responses or perform custom cleanup when streams are interrupted.


Try With AI

You've seen streaming patterns. Now practice implementation with AI assistance.

Setup

  • ChatKit server from Lesson 2
  • Claude Code or similar AI IDE
  • Python 3.11+ with asyncio

Exercise 1: Diagnose Stream Stall

Your agent stream starts but stops halfway. Help me debug:

My ChatKit server starts streaming tokens but freezes after 5-10 tokens.
No errors in logs. Stream never completes. User sees partial response forever.

Here's my respond() method:
[paste your code]

What's causing the stall?

What you're learning: AI helps trace async execution flow and identify missing awaits or blocking operations.

Exercise 2: Add Progress Indicators

Add "thinking" state before first token:

My agent takes 2-3 seconds before generating the first token.
Users think it's broken. Add a progress indicator showing
the agent is thinking.

Current respond():
[paste code]

Show "Thinking..." until first token arrives.

What you're learning: AI suggests ProgressUpdateEvent patterns and placement within async flow.

Exercise 3: Handle Interruptions

User sends new message while agent is streaming. Handle it:

When users send a new message mid-stream, I want to save the partial
response and clean up gracefully. How do I override the stream
cancellation handler?

Current code:
[paste respond() method]

What you're learning: AI explains handle_stream_cancelled() override patterns and how to save partial assistant responses.