Skip to main content

Capstone: Full Agent Framework Integration

You've built the pieces: a fine-tuned Task API model, LiteLLM proxy for SDK compatibility, tool calling patterns, and an MCP server. Now you'll combine everything into a production-ready agent system.

This capstone brings together all chapter concepts. You'll create an agent that uses your custom model as its reasoning engine, falls back gracefully when needed, and exposes its capabilities through multiple interfaces—CLI, API, and MCP.

By the end of this lesson, you'll have a complete agent system that demonstrates the full value of custom model integration.

The Complete Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│ TASK API AGENT SYSTEM │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────┐ ┌─────────────────────┐ ┌──────────────────┐ │
│ │ CLI Client │ │ REST API │ │ MCP Server │ │
│ │ │ │ │ │ │ │
│ │ python cli.py │ │ POST /tasks │ │ Claude Desktop │ │
│ │ "Create a task" │ │ GET /analyze │ │ Integration │ │
│ └──────────┬──────────┘ └──────────┬──────────┘ └────────┬─────────┘ │
│ │ │ │ │
│ └─────────────────────────┼────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ AGENT CORE │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │ │
│ │ │ Router │──▶│ Executor │──▶│ Response Formatter │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ - Parse │ │ - Tools │ │ - JSON │ │ │
│ │ │ - Classify │ │ - Model │ │ - Natural Language │ │ │
│ │ │ - Route │ │ - Fallback │ │ - Streaming │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ MODEL LAYER │ │
│ │ │ │
│ │ Primary: task-api-model (Ollama via LiteLLM) │ │
│ │ Fallback: gpt-4o-mini (OpenAI direct) │ │
│ │ Emergency: Rule-based responses │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘

Project Structure

Create the following structure:

task-api-agent/
├── agent/
│ ├── __init__.py
│ ├── core.py # Agent core logic
│ ├── models.py # Model layer with fallback
│ ├── tools.py # Tool definitions and execution
│ └── config.py # Configuration management
├── interfaces/
│ ├── cli.py # Command-line interface
│ ├── api.py # REST API (FastAPI)
│ └── mcp_server.py # MCP server
├── tests/
│ ├── test_core.py
│ ├── test_models.py
│ └── test_tools.py
├── config.yaml # Configuration file
├── requirements.txt
└── README.md

Step 1: Configuration Layer

Create agent/config.py:

from dataclasses import dataclass
from typing import Optional
import os
import yaml

@dataclass
class ModelConfig:
name: str
base_url: str
api_key: str
timeout: float = 30.0
max_retries: int = 3

@dataclass
class AgentConfig:
primary_model: ModelConfig
fallback_model: Optional[ModelConfig]
enable_fallback: bool = True
log_level: str = "INFO"

def load_config(config_path: str = "config.yaml") -> AgentConfig:
"""Load configuration from file and environment."""
with open(config_path) as f:
config_data = yaml.safe_load(f)

# Primary model (local via LiteLLM)
primary = ModelConfig(
name=config_data.get("primary_model", {}).get("name", "task-api-model"),
base_url=os.environ.get(
"PRIMARY_MODEL_URL",
config_data.get("primary_model", {}).get("base_url", "http://localhost:4000/v1")
),
api_key=os.environ.get(
"PRIMARY_MODEL_KEY",
config_data.get("primary_model", {}).get("api_key", "sk-local-dev-key")
),
timeout=config_data.get("primary_model", {}).get("timeout", 30.0)
)

# Fallback model (OpenAI)
fallback = None
if config_data.get("fallback_model", {}).get("enabled", True):
fallback = ModelConfig(
name=config_data.get("fallback_model", {}).get("name", "gpt-4o-mini"),
base_url="https://api.openai.com/v1",
api_key=os.environ.get("OPENAI_API_KEY", ""),
timeout=config_data.get("fallback_model", {}).get("timeout", 60.0)
)

return AgentConfig(
primary_model=primary,
fallback_model=fallback,
enable_fallback=config_data.get("enable_fallback", True),
log_level=config_data.get("log_level", "INFO")
)

Create config.yaml:

primary_model:
name: task-api-model
base_url: http://localhost:4000/v1
api_key: sk-local-dev-key
timeout: 30.0

fallback_model:
enabled: true
name: gpt-4o-mini
timeout: 60.0

enable_fallback: true
log_level: INFO

Step 2: Model Layer with Fallback

Create agent/models.py:

from openai import OpenAI, APIConnectionError, APIError
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
import logging
import time

from .config import AgentConfig, ModelConfig

logger = logging.getLogger(__name__)

@dataclass
class ModelResponse:
content: str
model_used: str
latency_ms: float
tool_calls: Optional[List[Dict]] = None
fallback_used: bool = False

class ModelLayer:
"""Model layer with fallback support."""

def __init__(self, config: AgentConfig):
self.config = config
self.primary_client = self._create_client(config.primary_model)
self.fallback_client = None
if config.fallback_model and config.enable_fallback:
self.fallback_client = self._create_client(config.fallback_model)

def _create_client(self, model_config: ModelConfig) -> OpenAI:
return OpenAI(
base_url=model_config.base_url,
api_key=model_config.api_key,
timeout=model_config.timeout
)

def complete(
self,
messages: List[Dict],
tools: Optional[List[Dict]] = None,
stream: bool = False
) -> ModelResponse:
"""Complete with automatic fallback."""
start_time = time.time()

# Try primary model
try:
response = self._call_model(
self.primary_client,
self.config.primary_model.name,
messages,
tools,
stream
)
latency = (time.time() - start_time) * 1000

return ModelResponse(
content=response["content"],
model_used=self.config.primary_model.name,
latency_ms=latency,
tool_calls=response.get("tool_calls"),
fallback_used=False
)

except (APIConnectionError, APIError) as e:
logger.warning(f"Primary model failed: {e}")

if not self.fallback_client:
raise

# Try fallback
logger.info("Attempting fallback model...")
try:
response = self._call_model(
self.fallback_client,
self.config.fallback_model.name,
messages,
tools,
stream
)
latency = (time.time() - start_time) * 1000

return ModelResponse(
content=response["content"],
model_used=self.config.fallback_model.name,
latency_ms=latency,
tool_calls=response.get("tool_calls"),
fallback_used=True
)

except Exception as fallback_error:
logger.error(f"Fallback also failed: {fallback_error}")
raise

def _call_model(
self,
client: OpenAI,
model_name: str,
messages: List[Dict],
tools: Optional[List[Dict]],
stream: bool
) -> Dict:
"""Call a specific model."""
kwargs = {
"model": model_name,
"messages": messages
}
if tools:
kwargs["tools"] = tools
kwargs["tool_choice"] = "auto"

response = client.chat.completions.create(**kwargs)
message = response.choices[0].message

result = {"content": message.content or ""}
if message.tool_calls:
result["tool_calls"] = [
{
"id": tc.id,
"name": tc.function.name,
"arguments": tc.function.arguments
}
for tc in message.tool_calls
]

return result

def health_check(self) -> Dict[str, Any]:
"""Check health of all model backends."""
status = {"primary": "unknown", "fallback": "unknown"}

# Check primary
try:
self.primary_client.models.list()
status["primary"] = "healthy"
except Exception as e:
status["primary"] = f"unhealthy: {str(e)}"

# Check fallback
if self.fallback_client:
try:
self.fallback_client.models.list()
status["fallback"] = "healthy"
except Exception as e:
status["fallback"] = f"unhealthy: {str(e)}"
else:
status["fallback"] = "disabled"

return status

Step 3: Tool Definitions

Create agent/tools.py:

from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass
from datetime import datetime
import json

@dataclass
class ToolResult:
success: bool
data: Any
error: Optional[str] = None

# Task storage
tasks: Dict[str, Dict] = {}
task_counter: int = 0

# Tool implementations
def create_task(title: str, priority: str = "normal", due_date: str = None) -> ToolResult:
global task_counter
task_counter += 1
task_id = f"TASK-{task_counter:04d}"

task = {
"id": task_id,
"title": title,
"priority": priority,
"due_date": due_date,
"status": "pending",
"created_at": datetime.now().isoformat()
}
tasks[task_id] = task

return ToolResult(success=True, data=task)

def list_tasks(status: str = None, priority: str = None) -> ToolResult:
result = list(tasks.values())
if status:
result = [t for t in result if t["status"] == status]
if priority:
result = [t for t in result if t["priority"] == priority]
return ToolResult(success=True, data=result)

def update_task(task_id: str, **updates) -> ToolResult:
if task_id not in tasks:
return ToolResult(success=False, data=None, error=f"Task {task_id} not found")

task = tasks[task_id]
for key, value in updates.items():
if value is not None:
task[key] = value
task["updated_at"] = datetime.now().isoformat()

return ToolResult(success=True, data=task)

def delete_task(task_id: str) -> ToolResult:
if task_id not in tasks:
return ToolResult(success=False, data=None, error=f"Task {task_id} not found")

deleted = tasks.pop(task_id)
return ToolResult(success=True, data={"deleted": deleted})

# Tool registry
TOOL_REGISTRY: Dict[str, Callable] = {
"create_task": create_task,
"list_tasks": list_tasks,
"update_task": update_task,
"delete_task": delete_task
}

# Tool schemas for OpenAI format
TOOL_SCHEMAS: List[Dict] = [
{
"type": "function",
"function": {
"name": "create_task",
"description": "Create a new task in the task management system",
"parameters": {
"type": "object",
"properties": {
"title": {"type": "string", "description": "Task title"},
"priority": {
"type": "string",
"enum": ["low", "normal", "high", "urgent"],
"description": "Priority level"
},
"due_date": {"type": "string", "description": "Due date (YYYY-MM-DD)"}
},
"required": ["title"]
}
}
},
{
"type": "function",
"function": {
"name": "list_tasks",
"description": "List tasks with optional filters",
"parameters": {
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": ["pending", "in_progress", "completed"]
},
"priority": {
"type": "string",
"enum": ["low", "normal", "high", "urgent"]
}
}
}
}
},
{
"type": "function",
"function": {
"name": "update_task",
"description": "Update an existing task",
"parameters": {
"type": "object",
"properties": {
"task_id": {"type": "string", "description": "Task ID"},
"title": {"type": "string"},
"priority": {"type": "string", "enum": ["low", "normal", "high", "urgent"]},
"status": {"type": "string", "enum": ["pending", "in_progress", "completed"]}
},
"required": ["task_id"]
}
}
},
{
"type": "function",
"function": {
"name": "delete_task",
"description": "Delete a task",
"parameters": {
"type": "object",
"properties": {
"task_id": {"type": "string", "description": "Task ID to delete"}
},
"required": ["task_id"]
}
}
}
]

def execute_tool(name: str, arguments: Dict) -> ToolResult:
"""Execute a tool by name."""
if name not in TOOL_REGISTRY:
return ToolResult(success=False, data=None, error=f"Unknown tool: {name}")

try:
return TOOL_REGISTRY[name](**arguments)
except Exception as e:
return ToolResult(success=False, data=None, error=str(e))

Step 4: Agent Core

Create agent/core.py:

from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import json
import logging

from .config import load_config, AgentConfig
from .models import ModelLayer, ModelResponse
from .tools import TOOL_SCHEMAS, execute_tool

logger = logging.getLogger(__name__)

@dataclass
class AgentResponse:
message: str
tool_calls_made: List[Dict]
model_used: str
latency_ms: float
fallback_used: bool

class TaskAgent:
"""Complete task management agent."""

def __init__(self, config: AgentConfig = None):
self.config = config or load_config()
self.model_layer = ModelLayer(self.config)
self.conversation_history: List[Dict] = []

def reset_conversation(self):
"""Clear conversation history."""
self.conversation_history = []

def process(self, user_message: str) -> AgentResponse:
"""Process a user message through the agent."""
# Build messages
messages = [
{
"role": "system",
"content": """You are TaskMaster, an intelligent task management assistant.
Use the provided tools to manage tasks. Be helpful and concise.
When creating tasks, infer appropriate priority from context.
Always confirm actions taken."""
}
]
messages.extend(self.conversation_history)
messages.append({"role": "user", "content": user_message})

# Get model response
response = self.model_layer.complete(messages, tools=TOOL_SCHEMAS)

tool_calls_made = []

# Handle tool calls if present
if response.tool_calls:
# Execute each tool
for tc in response.tool_calls:
arguments = json.loads(tc["arguments"])
result = execute_tool(tc["name"], arguments)

tool_calls_made.append({
"tool": tc["name"],
"arguments": arguments,
"result": result.data if result.success else result.error
})

# Add tool results to messages and get final response
messages.append({
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": tc["id"],
"type": "function",
"function": {
"name": tc["name"],
"arguments": tc["arguments"]
}
}
for tc in response.tool_calls
]
})

for i, tc in enumerate(response.tool_calls):
result = tool_calls_made[i]["result"]
messages.append({
"role": "tool",
"tool_call_id": tc["id"],
"content": json.dumps(result)
})

# Get final response with tool results
final_response = self.model_layer.complete(messages, tools=TOOL_SCHEMAS)
response_content = final_response.content
total_latency = response.latency_ms + final_response.latency_ms
else:
response_content = response.content
total_latency = response.latency_ms

# Update conversation history
self.conversation_history.append({"role": "user", "content": user_message})
self.conversation_history.append({"role": "assistant", "content": response_content})

return AgentResponse(
message=response_content,
tool_calls_made=tool_calls_made,
model_used=response.model_used,
latency_ms=total_latency,
fallback_used=response.fallback_used
)

def health(self) -> Dict[str, Any]:
"""Get agent health status."""
return {
"status": "healthy",
"models": self.model_layer.health_check(),
"conversation_length": len(self.conversation_history)
}

Step 5: CLI Interface

Create interfaces/cli.py:

#!/usr/bin/env python
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from agent.core import TaskAgent
import json

def main():
agent = TaskAgent()

print("TaskMaster Agent CLI")
print("=" * 40)
print("Type 'quit' to exit, 'reset' to clear history")
print("Type 'health' to check system status")
print()

while True:
try:
user_input = input("You: ").strip()

if not user_input:
continue

if user_input.lower() == "quit":
print("Goodbye!")
break

if user_input.lower() == "reset":
agent.reset_conversation()
print("Conversation reset.")
continue

if user_input.lower() == "health":
health = agent.health()
print(json.dumps(health, indent=2))
continue

# Process the message
response = agent.process(user_input)

print(f"\nTaskMaster: {response.message}")
print(f" [Model: {response.model_used}, Latency: {response.latency_ms:.0f}ms]")

if response.tool_calls_made:
print(f" [Tools used: {', '.join(tc['tool'] for tc in response.tool_calls_made)}]")

if response.fallback_used:
print(" [Note: Used fallback model]")

print()

except KeyboardInterrupt:
print("\nGoodbye!")
break
except Exception as e:
print(f"Error: {e}")

if __name__ == "__main__":
main()

Output (example session):

TaskMaster Agent CLI
========================================
Type 'quit' to exit, 'reset' to clear history
Type 'health' to check system status

You: Create a high-priority task for quarterly review

TaskMaster: I've created the task for you:

**Task Created:**
- ID: TASK-0001
- Title: Quarterly review
- Priority: high
- Status: pending

Would you like to set a due date?
[Model: task-api-model, Latency: 245ms]
[Tools used: create_task]

You: List all my tasks

TaskMaster: Here are your current tasks:

1. **TASK-0001**: Quarterly review
- Priority: high
- Status: pending
- Created: Just now

Total: 1 task
[Model: task-api-model, Latency: 189ms]
[Tools used: list_tasks]

You: health
{
"status": "healthy",
"models": {
"primary": "healthy",
"fallback": "healthy"
},
"conversation_length": 4
}

You: quit
Goodbye!

Step 6: REST API

Create interfaces/api.py:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from agent.core import TaskAgent

app = FastAPI(title="Task API Agent", version="1.0.0")
agent = TaskAgent()

class ChatRequest(BaseModel):
message: str
reset_history: bool = False

class ChatResponse(BaseModel):
message: str
tool_calls: List[Dict[str, Any]]
model_used: str
latency_ms: float
fallback_used: bool

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""Send a message to the agent."""
if request.reset_history:
agent.reset_conversation()

response = agent.process(request.message)

return ChatResponse(
message=response.message,
tool_calls=response.tool_calls_made,
model_used=response.model_used,
latency_ms=response.latency_ms,
fallback_used=response.fallback_used
)

@app.get("/health")
async def health():
"""Get agent health status."""
return agent.health()

@app.post("/reset")
async def reset():
"""Reset conversation history."""
agent.reset_conversation()
return {"status": "reset"}

if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

Output:

$ curl -X POST http://localhost:8000/chat \
-H "Content-Type: application/json" \
-d '{"message": "Create a task for code review"}'

{
"message": "I've created the task:\n\n**TASK-0001**: Code review\n- Priority: normal\n- Status: pending",
"tool_calls": [{"tool": "create_task", "arguments": {"title": "Code review"}, "result": {...}}],
"model_used": "task-api-model",
"latency_ms": 312.5,
"fallback_used": false
}

Step 7: Production Checklist

Before deploying, verify these requirements:

Health Monitoring

# Add to agent/core.py

def comprehensive_health_check(self) -> Dict[str, Any]:
"""Comprehensive health check for production."""
health = {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"checks": {}
}

# Model health
model_health = self.model_layer.health_check()
health["checks"]["models"] = model_health

# Test actual inference
try:
test_response = self.model_layer.complete(
[{"role": "user", "content": "ping"}]
)
health["checks"]["inference"] = {
"status": "ok",
"latency_ms": test_response.latency_ms
}
except Exception as e:
health["checks"]["inference"] = {
"status": "error",
"error": str(e)
}
health["status"] = "degraded"

# Check fallback
if health["checks"]["models"].get("primary", "").startswith("unhealthy"):
if health["checks"]["models"].get("fallback") == "healthy":
health["status"] = "degraded"
else:
health["status"] = "unhealthy"

return health

Graceful Degradation

# Emergency fallback when all models fail

EMERGENCY_RESPONSES = {
"create": "I'm unable to process your request right now. Please try again later or create the task manually.",
"list": "System temporarily unavailable. Please refresh in a moment.",
"default": "I'm experiencing technical difficulties. Please try again shortly."
}

def emergency_response(self, user_message: str) -> str:
"""Generate emergency response when models are unavailable."""
message_lower = user_message.lower()
if "create" in message_lower or "add" in message_lower:
return EMERGENCY_RESPONSES["create"]
if "list" in message_lower or "show" in message_lower:
return EMERGENCY_RESPONSES["list"]
return EMERGENCY_RESPONSES["default"]

Production Configuration

# config.production.yaml
primary_model:
name: task-api-model
base_url: ${LITELLM_PROXY_URL}
api_key: ${LITELLM_API_KEY}
timeout: 30.0

fallback_model:
enabled: true
name: gpt-4o-mini
timeout: 60.0

enable_fallback: true
log_level: WARNING

# Production settings
rate_limit:
requests_per_minute: 100
tokens_per_minute: 50000

monitoring:
enabled: true
metrics_port: 9090

Update Your Skill

Finalize your agent-integration skill with:

Complete the skill with a section on "Production Deployment" including:
- Full agent architecture diagram
- Configuration management patterns
- Health check implementation
- Graceful degradation strategies
- CLI, API, and MCP interface templates

Try With AI

Prompt 1: Add Observability

I've built my Task API agent system with CLI, REST API, and MCP interfaces.
Now I need production observability. Help me add:

1. OpenTelemetry tracing for all model calls
2. Prometheus metrics for latency, errors, fallback rate
3. Structured logging with request IDs
4. Dashboard configuration for Grafana

Show me how to integrate these into my agent/core.py and agent/models.py
without breaking existing functionality.

What you're learning: Adding production observability to agent systems.

Prompt 2: Implement Rate Limiting

My Task API agent needs rate limiting before production:
- 100 requests per minute per user
- 50,000 tokens per minute total
- Graceful rejection with retry-after headers

Design a rate limiting system that:
1. Works across CLI, API, and MCP interfaces
2. Tracks usage by user/API key
3. Returns informative error messages
4. Doesn't add significant latency

Implement this as a middleware layer.

What you're learning: Implementing rate limiting for multi-interface systems.

Prompt 3: Build Deployment Pipeline

I need to deploy my Task API agent system. Create a deployment pipeline for:

1. Docker containerization (agent + all interfaces)
2. Docker Compose for local development (with Ollama + LiteLLM)
3. Kubernetes manifests for production
4. Health check endpoints for orchestrator probes
5. Configuration via ConfigMaps and Secrets

Show the complete deployment artifacts and explain the production architecture.

What you're learning: Containerizing and deploying complete agent systems.