Agent Integration
You've built a complete API: CRUD, authentication, database, streaming. Now the payoff—turning those endpoints into tools that AI agents can use. This is where FastAPI meets AI agents.
The pattern is straightforward:
- Your API endpoints define capabilities
- Wrap them as functions agents can call
- Create an agent that orchestrates the tools
- Expose the agent via an SSE endpoint
After this lesson, natural language requests become API operations.
The Pattern: APIs → Functions → Tools
Your Task API has these operations:
| Endpoint | Operation |
|---|---|
POST /tasks | Create task |
GET /tasks | List tasks |
GET /tasks/{id} | Get task |
PUT /tasks/{id} | Update task |
DELETE /tasks/{id} | Delete task |
Each becomes a function an agent can call. The agent decides WHEN to call them based on natural language requests.
Creating Tool Functions
Start with simple wrappers around your existing code. Create tools.py:
from sqlmodel import Session, select
from models import Task, TaskCreate
from database import engine
def create_task(title: str, description: str | None = None) -> dict:
"""Create a new task with the given title and optional description."""
with Session(engine) as session:
task = Task(title=title, description=description)
session.add(task)
session.commit()
session.refresh(task)
return {"id": task.id, "title": task.title, "status": task.status}
def list_tasks() -> list[dict]:
"""List all tasks."""
with Session(engine) as session:
tasks = session.exec(select(Task)).all()
return [{"id": t.id, "title": t.title, "status": t.status} for t in tasks]
def get_task(task_id: int) -> dict | None:
"""Get a specific task by ID."""
with Session(engine) as session:
task = session.get(Task, task_id)
if task:
return {"id": task.id, "title": task.title, "description": task.description, "status": task.status}
return None
def update_task_status(task_id: int, status: str) -> dict | None:
"""Update a task's status. Status must be: pending, in_progress, or completed."""
with Session(engine) as session:
task = session.get(Task, task_id)
if not task:
return None
task.status = status
session.add(task)
session.commit()
session.refresh(task)
return {"id": task.id, "title": task.title, "status": task.status}
def delete_task(task_id: int) -> bool:
"""Delete a task by ID. Returns True if deleted, False if not found."""
with Session(engine) as session:
task = session.get(Task, task_id)
if not task:
return False
session.delete(task)
session.commit()
return True
Key points:
- Each function has a clear docstring (becomes tool description)
- Parameters have type hints (agents use these)
- Returns simple dicts (JSON-serializable)
- Manages its own session (independent of HTTP request)
Creating the Agent
Using OpenAI Agents SDK:
uv add openai-agents
Create agent.py:
from agents import Agent, Runner, function_tool
from tools import create_task, list_tasks, get_task, update_task_status, delete_task
# Wrap functions as tools
@function_tool
def tool_create_task(title: str, description: str | None = None) -> str:
"""Create a new task with the given title and optional description."""
result = create_task(title, description)
return f"Created task {result['id']}: {result['title']}"
@function_tool
def tool_list_tasks() -> str:
"""List all tasks."""
tasks = list_tasks()
if not tasks:
return "No tasks found."
return "\n".join([f"[{t['id']}] {t['title']} ({t['status']})" for t in tasks])
@function_tool
def tool_get_task(task_id: int) -> str:
"""Get details of a specific task by ID."""
task = get_task(task_id)
if not task:
return f"Task {task_id} not found."
return f"Task {task['id']}: {task['title']}\nDescription: {task['description']}\nStatus: {task['status']}"
@function_tool
def tool_update_status(task_id: int, status: str) -> str:
"""Update a task's status. Status must be: pending, in_progress, or completed."""
result = update_task_status(task_id, status)
if not result:
return f"Task {task_id} not found."
return f"Updated task {result['id']} to {result['status']}"
@function_tool
def tool_delete_task(task_id: int) -> str:
"""Delete a task by ID."""
if delete_task(task_id):
return f"Task {task_id} deleted."
return f"Task {task_id} not found."
# Create the agent
task_agent = Agent(
name="Task Manager",
instructions="""You are a task management assistant. Help users manage their tasks.
You can:
- Create new tasks
- List all tasks
- Get details of specific tasks
- Update task status (pending, in_progress, completed)
- Delete tasks
Be helpful and concise. When creating tasks, confirm what you created.
When listing tasks, format them clearly.""",
tools=[
tool_create_task,
tool_list_tasks,
tool_get_task,
tool_update_status,
tool_delete_task,
],
)
Non-Streaming Agent Endpoint
Start with a simple endpoint that waits for the complete response. Add to main.py:
from fastapi import FastAPI
from pydantic import BaseModel
from agents import Runner
from agent import task_agent
app = FastAPI()
class ChatRequest(BaseModel):
message: str
class ChatResponse(BaseModel):
response: str
@app.post("/agent/chat", response_model=ChatResponse)
async def chat_with_agent(request: ChatRequest):
"""Send a message to the task agent and get a response."""
result = await Runner.run(task_agent, request.message)
return ChatResponse(response=result.final_output)
Testing:
curl -X POST http://localhost:8000/agent/chat \
-H "Content-Type: application/json" \
-d '{"message": "Create a task called Learn FastAPI"}'
{"response": "Created task 1: Learn FastAPI"}
curl -X POST http://localhost:8000/agent/chat \
-H "Content-Type: application/json" \
-d '{"message": "List all my tasks"}'
{"response": "[1] Learn FastAPI (pending)"}
Streaming Agent Endpoint
For better UX, stream the response as it generates. Add to main.py:
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
import json
async def agent_stream_generator(message: str):
"""Generate SSE events from agent response."""
result = Runner.run_streamed(task_agent, message)
async for event in result.stream_events():
if event.type == "raw_response_event":
# Extract text from the response
if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
text = event.data.delta.text
if text:
yield {
"event": "token",
"data": json.dumps({"text": text})
}
# Final event with complete response
yield {
"event": "complete",
"data": json.dumps({"response": result.final_output})
}
@app.post("/agent/chat/stream")
async def chat_with_agent_stream(request: ChatRequest):
"""Stream agent response via SSE."""
return EventSourceResponse(agent_stream_generator(request.message))
Testing in browser console:
const response = await fetch('http://localhost:8000/agent/chat/stream', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({message: 'Create a task called Test streaming'})
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const {done, value} = await reader.read();
if (done) break;
console.log(decoder.decode(value));
}
Complete Agent Integration
Here's everything together for reference.
tools.py:
from sqlmodel import Session, select
from models import Task
from database import engine
def create_task(title: str, description: str | None = None) -> dict:
with Session(engine) as session:
task = Task(title=title, description=description)
session.add(task)
session.commit()
session.refresh(task)
return {"id": task.id, "title": task.title, "status": task.status}
def list_tasks() -> list[dict]:
with Session(engine) as session:
tasks = session.exec(select(Task)).all()
return [{"id": t.id, "title": t.title, "status": t.status} for t in tasks]
def get_task(task_id: int) -> dict | None:
with Session(engine) as session:
task = session.get(Task, task_id)
if task:
return {"id": task.id, "title": task.title, "description": task.description, "status": task.status}
return None
def update_task_status(task_id: int, status: str) -> dict | None:
with Session(engine) as session:
task = session.get(Task, task_id)
if not task:
return None
task.status = status
session.add(task)
session.commit()
session.refresh(task)
return {"id": task.id, "title": task.title, "status": task.status}
def delete_task(task_id: int) -> bool:
with Session(engine) as session:
task = session.get(Task, task_id)
if not task:
return False
session.delete(task)
session.commit()
return True
agent.py:
from agents import Agent, function_tool
from tools import create_task, list_tasks, get_task, update_task_status, delete_task
@function_tool
def tool_create_task(title: str, description: str | None = None) -> str:
"""Create a new task."""
result = create_task(title, description)
return f"Created task {result['id']}: {result['title']}"
@function_tool
def tool_list_tasks() -> str:
"""List all tasks."""
tasks = list_tasks()
if not tasks:
return "No tasks found."
return "\n".join([f"[{t['id']}] {t['title']} ({t['status']})" for t in tasks])
@function_tool
def tool_get_task(task_id: int) -> str:
"""Get a specific task."""
task = get_task(task_id)
if not task:
return f"Task {task_id} not found."
return f"Task {task['id']}: {task['title']}\nStatus: {task['status']}"
@function_tool
def tool_update_status(task_id: int, status: str) -> str:
"""Update task status (pending/in_progress/completed)."""
result = update_task_status(task_id, status)
if not result:
return f"Task {task_id} not found."
return f"Updated task {result['id']} to {result['status']}"
@function_tool
def tool_delete_task(task_id: int) -> str:
"""Delete a task."""
if delete_task(task_id):
return f"Task {task_id} deleted."
return f"Task {task_id} not found."
task_agent = Agent(
name="Task Manager",
instructions="You help manage tasks. Create, list, update, and delete tasks as requested.",
tools=[tool_create_task, tool_list_tasks, tool_get_task, tool_update_status, tool_delete_task],
)
main.py (agent routes):
from fastapi import FastAPI
from pydantic import BaseModel
from sse_starlette.sse import EventSourceResponse
from agents import Runner
from agent import task_agent
import json
app = FastAPI()
class ChatRequest(BaseModel):
message: str
@app.post("/agent/chat")
async def chat_with_agent(request: ChatRequest):
result = await Runner.run(task_agent, request.message)
return {"response": result.final_output}
async def agent_stream_generator(message: str):
result = Runner.run_streamed(task_agent, message)
async for event in result.stream_events():
if event.type == "raw_response_event":
if hasattr(event.data, 'delta') and hasattr(event.data.delta, 'text'):
text = event.data.delta.text
if text:
yield {"event": "token", "data": json.dumps({"text": text})}
yield {"event": "complete", "data": json.dumps({"response": result.final_output})}
@app.post("/agent/chat/stream")
async def chat_with_agent_stream(request: ChatRequest):
return EventSourceResponse(agent_stream_generator(request.message))
Hands-On Exercise
Step 1: Create tools.py with CRUD wrapper functions
Step 2: Create agent.py with tool decorators and agent definition
Step 3: Add /agent/chat endpoint to main.py
Step 4: Test with natural language:
# Create
curl -X POST http://localhost:8000/agent/chat \
-d '{"message": "Add a task: Review pull request"}'
# List
curl -X POST http://localhost:8000/agent/chat \
-d '{"message": "What tasks do I have?"}'
# Update
curl -X POST http://localhost:8000/agent/chat \
-d '{"message": "Mark task 1 as completed"}'
# Delete
curl -X POST http://localhost:8000/agent/chat \
-d '{"message": "Remove task 1"}'
Step 5: Add the streaming endpoint and test in browser
Common Mistakes
Mistake 1: Forgetting docstrings on tools
# Wrong - agent doesn't know what this does
@function_tool
def tool_create_task(title: str) -> str:
...
# Correct - docstring becomes tool description
@function_tool
def tool_create_task(title: str) -> str:
"""Create a new task with the given title."""
...
Mistake 2: Returning complex objects
# Wrong - returns SQLModel object
def get_task(task_id: int) -> Task:
...
# Correct - returns serializable dict
def get_task(task_id: int) -> dict:
...
Mistake 3: Not handling missing resources
# Wrong - crashes on missing task
def get_task(task_id: int) -> dict:
task = session.get(Task, task_id)
return {"id": task.id} # AttributeError if None!
# Correct - handle not found
def get_task(task_id: int) -> dict | None:
task = session.get(Task, task_id)
if task:
return {"id": task.id}
return None
What You've Achieved
You've completed the core loop:
- L1-L5: Built REST API with CRUD
- L6-L9: Added configuration, database, authentication
- L10: Organized with dependency injection
- L11: Added streaming
- L12: Integrated AI agent
Your API is now both:
- Machine-callable (REST endpoints for direct integration)
- Natural language accessible (agent endpoint for conversational use)
This is the foundation of a Digital FTE service.
Try With AI
After completing the exercise, explore these scenarios.
Prompt 1: Adding Context to Tools
My tools work, but the agent doesn't know about the current user.
How do I pass user context to tools so the agent only manages
THEIR tasks, not everyone's?
What you're learning: Tool functions can receive context beyond their explicit parameters. Understanding this enables multi-user agent systems.
Prompt 2: Error Handling in Tools
When a tool fails (database down, invalid input), what should it
return? Should I raise an exception or return an error string?
How does the agent handle tool failures?
What you're learning: Agents react to tool outputs. Clear error messages help agents recover gracefully.
Prompt 3: Adding More Agents
I want to add a second agent that helps with scheduling tasks
(setting due dates, suggesting priorities). How do I structure
multiple specialized agents that can work together?
What you're learning: Multi-agent architectures enable specialization. This extends to triage agents, handoffs, and agent collaboration—covered in the capstone.
Reflect on Your Skill
You built a fastapi-agent skill in Lesson 0. Test and improve it based on what you learned.
Test Your Skill
Using my fastapi-agent skill, help me integrate AI agents with my API endpoints.
Does my skill include wrapping API functions as agent tools, creating agents with function_tool,
and streaming agent responses via SSE?
Identify Gaps
Ask yourself:
- Did my skill include converting API endpoints to callable functions for agents?
- Did it handle function_tool decorator and Agent creation with tools?
- Did it cover streaming agent responses using Runner.run_streamed() and EventSourceResponse?
Improve Your Skill
If you found gaps:
My fastapi-agent skill is missing agent integration patterns.
Update it to include API-to-function conversion for tools, function_tool decorator usage,
Agent creation with instructions and tools, Runner.run() for non-streaming,
and Runner.run_streamed() with SSE for streaming agent responses.