Connecting Your First Agent
Chapter 40 built TaskManager as a REST API. But conversations aren't request/response cycles—they're ongoing relationships. ChatKit Server transforms your agents from stateless endpoints into conversational interfaces.
In this lesson, you'll connect the agents you built in Chapters 34-36 to ChatKit's conversation infrastructure. The result: token-by-token streaming, automatic history management, and session persistence—all without writing streaming logic yourself.
The Conversation Architecture Shift
FastAPI Pattern (Chapter 40)
@app.post("/tasks")
def create_task(task: TaskCreate) -> TaskResponse:
# Execute operation
result = process_task(task)
# Return single response
return result
Characteristics:
- One request → One response
- Stateless (no conversation memory)
- Synchronous (wait for completion)
- Client handles all state management
ChatKit Pattern (This Chapter)
class MyAgentServer(ChatKitServer):
async def respond(
self,
thread: ThreadMetadata,
input: UserMessageItem | None,
context: Any,
) -> AsyncIterator[ThreadStreamEvent]:
# Stream events progressively
async for event in stream_agent_response(...):
yield event
Characteristics:
- Ongoing conversation → Stream of events
- Stateful (ChatKit manages history)
- Asynchronous (progressive updates)
- Server manages conversation state
The key insight: ChatKit handles the conversation mechanics (history, streaming, sessions). You implement one method: respond().
The respond() Method Contract
Every ChatKit server extends ChatKitServer and implements respond():
from chatkit import ChatKitServer, ThreadMetadata, UserMessageItem, ThreadStreamEvent
from typing import AsyncIterator, Any
class TaskAgentServer(ChatKitServer):
async def respond(
self,
thread: ThreadMetadata, # Conversation container
input: UserMessageItem | None, # User's message
context: Any, # Session/user context
) -> AsyncIterator[ThreadStreamEvent]:
# Your agent logic here
pass
Parameter Breakdown
thread: ThreadMetadata
- Contains conversation metadata (thread ID, timestamps)
- Provides access to conversation history
- Managed by ChatKit automatically
input: UserMessageItem | None
- The user's message content
- Can be None (for example, on conversation initialization)
- Contains text, attachments, metadata
context: Any
- RequestContext with user identity and session information
- Used for multi-tenant isolation
- Typically contains user ID, permissions, session data
Returns: AsyncIterator[ThreadStreamEvent]
- Stream of events (assistant messages, tool status, widgets, tasks)
- Token-by-token output for real-time UI updates
- Yielded progressively, not returned all at once
Connecting an OpenAI Agent
Here's the complete pattern for integrating an OpenAI Agents SDK agent into ChatKit:
from chatkit import ChatKitServer, ThreadMetadata, UserMessageItem, ThreadStreamEvent
from chatkit.agents import stream_agent_response
from agents import Agent, Runner
from typing import AsyncIterator, Any
class TaskAgentServer(ChatKitServer):
def __init__(self):
super().__init__()
# Define your agent once
self.agent = Agent(
name="TaskManager",
instructions="""You are a task management assistant.
Help users create, organize, and track their tasks.""",
tools=[
# Your tools from Chapter 34-36
],
)
async def respond(
self,
thread: ThreadMetadata,
input: UserMessageItem | None,
context: Any,
) -> AsyncIterator[ThreadStreamEvent]:
# Extract user message
user_message = input.content if input else ""
# Run agent with streaming
result = Runner.run_streamed(
self.agent,
user_message,
)
# Stream agent response through ChatKit
async for event in stream_agent_response(context, result):
yield event
Output:
User: Create a task "Review pull requests" for tomorrow
Assistant: I've created the task "Review pull requests" scheduled for tomorrow...
[Token-by-token streaming visible in ChatKit UI]
How It Works
Step 1: User sends message → ChatKit calls respond()
Step 2: respond() extracts message from input parameter
Step 3: Agent processes message using Runner.run_streamed() (streaming mode)
Step 4: stream_agent_response() converts agent output to ChatKit events
Step 5: Events yielded progressively → ChatKit UI updates in real-time
The magic: stream_agent_response() is a helper that bridges agent SDKs to ChatKit's event format. You don't write streaming logic—you call the helper.
Connecting a Google ADK Agent
Google ADK uses the Runner pattern with session services. While Google ADK has different patterns, ChatKit provides stream_agent_response() to handle event conversion:
from chatkit import ChatKitServer, ThreadMetadata, UserMessageItem, ThreadStreamEvent
from chatkit.agents import AgentContext, stream_agent_response
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from typing import AsyncIterator, Any
class TaskAgentServer(ChatKitServer):
def __init__(self, adk_agent):
super().__init__()
# Store your pre-built Google ADK agent from Chapter 35
self.agent = adk_agent
# Initialize ADK session service
self.session_service = InMemorySessionService()
# Initialize ADK runner
self.runner = Runner(
app_name='chatkit_task_agent',
agent=self.agent,
session_service=self.session_service,
)
async def respond(
self,
thread: ThreadMetadata,
input: UserMessageItem | None,
context: Any,
) -> AsyncIterator[ThreadStreamEvent]:
user_message = input.content if input else ""
user_id = getattr(context, 'user_id', 'default-user')
# Create AgentContext for ChatKit integration
agent_context = AgentContext(
thread=thread,
store=self.store,
request_context=context
)
# Create or retrieve session for this thread
session = self.session_service.create_session(
state={},
app_name='chatkit_task_agent',
user_id=user_id
)
# Format message for ADK
message = types.Content(
role='user',
parts=[types.Part.from_text(text=user_message)]
)
# Stream agent response - ChatKit helper handles event conversion
result = self.runner.run_async(
session_id=session.id,
user_id=user_id,
new_message=message
)
# stream_agent_response() converts ADK events to ChatKit format
async for event in stream_agent_response(agent_context, result):
yield event
Output:
User: List my tasks for this week
Assistant: Here are your tasks for this week:
1. Review pull requests (Tomorrow)
2. Team meeting (Wednesday)...
[Streaming in real-time]
Key differences from OpenAI:
- Runner pattern: Google ADK uses
Runnerwithrun_async()instead of direct agent calls - Session management: Requires
InMemorySessionServicefor conversation state - Content objects: Messages wrapped in
types.Contentwithtypes.Part - Event conversion:
stream_agent_response()helper handles ChatKit event formatting automatically
Connecting a Claude Agent SDK Agent
Claude Agent SDK uses the standalone query() function with options. ChatKit's stream_agent_response() handles event conversion:
from chatkit import ChatKitServer, ThreadMetadata, UserMessageItem, ThreadStreamEvent
from chatkit.agents import AgentContext, stream_agent_response
from claude_agent_sdk import query, ClaudeAgentOptions
from typing import AsyncIterator, Any
class TaskAgentServer(ChatKitServer):
def __init__(self):
super().__init__()
# Configure Claude agent options
self.agent_options = ClaudeAgentOptions(
allowed_tools=["Read", "Write", "Bash"],
system_prompt="You are a task management assistant. Help users create, organize, and track their tasks.",
model="claude-sonnet-4-20250514",
max_turns=10,
include_partial_messages=True, # Enable streaming
)
async def respond(
self,
thread: ThreadMetadata,
input: UserMessageItem | None,
context: Any,
) -> AsyncIterator[ThreadStreamEvent]:
user_message = input.content if input else ""
# Create AgentContext for ChatKit integration
agent_context = AgentContext(
thread=thread,
store=self.store,
request_context=context
)
# Query Claude Agent SDK with streaming
result = query(prompt=user_message, options=self.agent_options)
# stream_agent_response() handles all event conversion
# Including thinking blocks, tool calls, and text responses
async for event in stream_agent_response(agent_context, result):
yield event
Output:
User: What's the status of my high-priority tasks?
[Hidden: Claude's thinking about task prioritization logic]
Assistant: Let me check your high-priority tasks...
[Tool call executes]
You have 3 high-priority tasks:
- Review security audit (Overdue)...
[Streaming progressively]
Key differences from OpenAI:
- Standalone function: Uses
query()function, notAgentclass or API client - ClaudeAgentOptions: Configure system prompt, tools, and model through options object
- Message types: Handle
AssistantMessagewith typed content blocks - ThinkingBlock handling: Extended thinking tokens visible in stream (can hide as
HiddenContextItem) - No helper needed: Manual type checking instead of
stream_agent_response()
Running Your ChatKit Server
Once you've implemented respond(), run the server:
# server.py
from chatkit import ChatKitServer
class TaskAgentServer(ChatKitServer):
# ... respond() implementation from above
if __name__ == "__main__":
import uvicorn
server = TaskAgentServer()
uvicorn.run(server.app, host="0.0.0.0", port=8000)
Terminal:
python server.py
# Output:
# INFO: Started server process
# INFO: Uvicorn running on http://0.0.0.0:8000
Testing:
Open browser at http://localhost:8000 → ChatKit's built-in UI loads → Start conversing with your agent.
Common Integration Patterns
Pattern 1: Reusing Chapter 34-36 Agents
You already built agents. Don't rebuild—integrate:
# Import your existing agent
from chapter_34_openai_agents.task_agent import create_task_agent
class TaskAgentServer(ChatKitServer):
def __init__(self):
super().__init__()
# Reuse existing agent definition
self.agent = create_task_agent()
async def respond(self, thread, input, context):
result = Runner.run_streamed(self.agent, input.content)
async for event in stream_agent_response(context, result):
yield event
Why this works: Agent logic is separate from conversation infrastructure. ChatKit handles the conversation; your agent handles the domain logic.
Pattern 2: Handling Empty Input
First message might be empty (conversation initialization):
from chatkit.types import AssistantMessageEvent
async def respond(self, thread, input, context):
if not input or not input.content:
# Send welcome message
yield AssistantMessageEvent(
content="Hello! I'm your task management assistant. How can I help?"
)
return
# Normal agent processing
result = Runner.run_streamed(self.agent, input.content)
async for event in stream_agent_response(context, result):
yield event
Output:
[User opens chat - no message sent yet]
Assistant: Hello! I'm your task management assistant. How can I help?
Pattern 3: Error Handling in Stream
Wrap agent execution to catch errors gracefully:
from chatkit.types import AssistantMessageEvent
async def respond(self, thread, input, context):
try:
result = Runner.run_streamed(self.agent, input.content)
async for event in stream_agent_response(context, result):
yield event
except Exception as e:
# Send error message to user
yield AssistantMessageEvent(
content=f"I encountered an error: {str(e)}. Please try again."
)
Output:
User: Create a task with invalid date "tomorrow yesterday"
Assistant: I encountered an error: Invalid date format. Please try again.
Why this matters: Unhandled exceptions break the conversation stream. Users see frozen UI. Error handling keeps conversation flowing.
Safety Note
When testing locally, ChatKit runs without authentication by default. This is great for development but never deploy to production without authentication (covered in Lesson 6). Anyone with the URL can access your agent.
For local testing, this is fine. For production: authentication is mandatory.
Try With AI
Use your AI companion (Claude, ChatGPT, or Gemini).
Prompt 1: Integration Verification
I just connected my [OpenAI/Google/Anthropic] agent to ChatKit using the
respond() method. Help me verify the integration is correct.
My agent from Chapter [34/35/36] does [describe functionality].
Here's my respond() implementation: [paste code]
Ask me:
1. Does stream_agent_response() receive the right agent output format?
2. Am I handling the case where input is None?
3. What happens if my agent throws an error mid-stream?
What you're learning: Critical verification through AI-guided code review—identifying integration gaps before runtime failures.
Prompt 2: Streaming Troubleshooting
My ChatKit server runs, but I don't see token-by-token streaming.
The entire response appears at once.
My setup:
- Agent SDK: [OpenAI/Google/Anthropic]
- Using stream_agent_response(): [yes/no]
- Agent call method: [paste your Runner.run_streamed() or equivalent]
What's wrong with my streaming implementation?
What you're learning: Pattern debugging—AI helps you identify whether the issue is in agent streaming setup or ChatKit integration.
Prompt 3: Multi-Agent Extension
I have multiple agents from Chapters 34-36:
- TaskManager (OpenAI)
- ResearchAgent (Google)
- CodeReviewer (Anthropic)
How would I extend my ChatKitServer to let users choose which agent to talk to
in the same conversation?
Don't write the full code—ask me:
1. Should agent selection happen in respond() or outside it?
2. How should users specify which agent they want?
3. Where should I store the "active agent" state for this conversation?
What you're learning: Architectural exploration—using AI to discover design patterns for multi-agent ChatKit servers before implementing.