Tracing, Hooks and Observability
Your Customer Support Digital FTE handles 500 conversations daily. Yesterday, three users reported the agent gave incorrect refund amounts. Without visibility into what happened during those conversations, you're debugging blind. You need to know: Which tools were called? What did the LLM decide at each step? Where did the reasoning go wrong?
This is the production reality of AI agents. Unlike traditional software where logs show a clear execution path, agent behavior emerges from LLM reasoning---opaque decisions that can vary between runs. Observability transforms that opacity into transparency. You'll see exactly what your agent thinks, does, and decides.
In previous lessons, you built agents with tools, handoffs, and guardrails. Now you'll add the instrumentation that makes them production-ready. By the end of this lesson, you'll have implemented lifecycle hooks that log every agent action, traces that you can view in OpenAI's dashboard, and token monitoring that tracks costs in real-time.
Understanding Traces and Spans
The OpenAI Agents SDK uses a trace-and-span model for observability:
- Traces represent end-to-end operations of a workflow. They contain spans and have properties like
workflow_name,trace_id,group_id, and optionalmetadata. - Spans are individual operations within a trace (agent execution, LLM call, tool call). Each span has a
trace_id,parent_id, start/end times, andspan_data.
Trace: "Customer Support Workflow"
├── Span: Agent "Triage" execution
│ ├── Span: LLM generation
│ ├── Span: Tool call "lookup_order"
│ └── Span: LLM generation
├── Span: Handoff to "Billing"
└── Span: Agent "Billing" execution
├── Span: LLM generation
└── Span: Tool call "process_refund"
Default Tracing Behavior
Tracing is enabled by default. The SDK automatically traces:
- Entire runner operations
- Agent executions
- LLM generations
- Function tool calls
- Guardrails
- Handoffs
You don't need to do anything to get basic tracing---just run your agent and view traces in the OpenAI dashboard.
from agents import Agent, Runner
agent = Agent(
name="SupportAgent",
instructions="Help customers with their orders."
)
# This run is automatically traced
result = await Runner.run(agent, "What's the status of order #12345?")
# View at: https://platform.openai.com/traces
Creating Higher-Level Traces with trace()
When you need to group multiple agent runs into a single logical workflow, use the trace() context manager:
from agents import Agent, Runner, trace
async def handle_customer_inquiry(query: str):
"""Process a customer inquiry through multiple agents."""
research_agent = Agent(
name="Researcher",
instructions="Gather relevant information about the query."
)
response_agent = Agent(
name="Responder",
instructions="Formulate a helpful response based on research."
)
# Wrap multiple runs in a single trace
with trace("Customer Inquiry Workflow"):
# First agent gathers information
research_result = await Runner.run(
research_agent,
f"Research: {query}"
)
# Second agent formulates response
response_result = await Runner.run(
response_agent,
f"Based on this research: {research_result.final_output}\n\nRespond to: {query}"
)
return response_result.final_output
The trace() function accepts these parameters:
| Parameter | Type | Description |
|---|---|---|
workflow_name | str | Logical name for the workflow (e.g., "Customer Support") |
trace_id | str | None | Custom trace ID (auto-generated if not provided) |
group_id | str | None | Links related traces (e.g., conversation session ID) |
metadata | dict | None | Additional data for filtering/analysis |
disabled | bool | Set True to disable this trace |
Linking Conversations with group_id
Multi-turn conversations span multiple traces. Use group_id to link them:
from agents import Agent, Runner, trace
import uuid
# Generate session ID for this conversation
session_id = f"session_{uuid.uuid4().hex[:8]}"
agent = Agent(
name="ConversationAgent",
instructions="Have a helpful conversation."
)
# Turn 1
with trace("Conversation Turn", group_id=session_id):
result1 = await Runner.run(agent, "Hi, I'm planning a trip to Japan.")
# Turn 2 - same group_id links to Turn 1
with trace("Conversation Turn", group_id=session_id):
result2 = await Runner.run(agent, "What's the best time to visit?")
# Turn 3
with trace("Conversation Turn", group_id=session_id):
result3 = await Runner.run(agent, "Tell me about cherry blossom season.")
print(f"All turns grouped under: {session_id}")
In the dashboard, filter by group_id to see the entire conversation.
Adding Metadata for Analysis
Attach metadata for filtering and debugging:
with trace(
"Customer Service",
group_id="chat_123",
metadata={
"customer_id": "user_456",
"plan": "enterprise",
"region": "us-west"
}
):
result = await Runner.run(support_agent, query)
Custom Spans for Sub-Operations
Use custom_span() to create spans for operations you want to track separately within a trace:
from agents import Agent, Runner, trace
from agents.tracing import custom_span # or: from agents import custom_span
async def research_and_write(topic: str):
"""Research a topic and write a report."""
researcher = Agent(name="Researcher", instructions="Gather facts.")
writer = Agent(name="Writer", instructions="Write clearly.")
reviewer = Agent(name="Reviewer", instructions="Check accuracy.")
with trace("Research Report Workflow"):
# Custom span for research phase
with custom_span("research_phase", data={"topic": topic}):
research = await Runner.run(researcher, f"Research: {topic}")
facts = research.final_output
# Custom span for writing phase
with custom_span("writing_phase"):
draft = await Runner.run(writer, f"Write about: {facts}")
content = draft.final_output
# Custom span for review phase
with custom_span("review_phase"):
review = await Runner.run(reviewer, f"Review: {content}")
return review.final_output
The dashboard shows the hierarchy:
Research Report Workflow (trace)
├── research_phase (custom span)
│ └── Researcher agent execution
├── writing_phase (custom span)
│ └── Writer agent execution
└── review_phase (custom span)
└── Reviewer agent execution
Disabling Tracing
Disable tracing when needed for performance or privacy:
Method 1: Environment Variable
export OPENAI_AGENTS_DISABLE_TRACING=1
Method 2: RunConfig for Individual Runs
from agents import Agent, Runner, RunConfig
config = RunConfig(tracing_disabled=True)
result = await Runner.run(
agent,
"Process this without tracing",
run_config=config
)
Method 3: Global Disable
from agents import set_tracing_disabled
set_tracing_disabled(True) # Disable all tracing
# ... agent runs not traced ...
set_tracing_disabled(False) # Re-enable
Method 4: Disable Specific Trace
with trace("Sensitive Workflow", disabled=True):
# This trace won't be recorded
result = await Runner.run(agent, sensitive_input)
Controlling Sensitive Data
By default, traces include LLM inputs/outputs and tool call data. Control this with RunConfig:
from agents import Agent, Runner, RunConfig
# Exclude sensitive data from traces
config = RunConfig(trace_include_sensitive_data=False)
result = await Runner.run(
agent,
"Process my credit card ending in 4242",
run_config=config
)
# Trace spans are created, but sensitive content is omitted
Note: Organizations with Zero Data Retention (ZDR) policies cannot use tracing.
Implementing RunHooks
The RunHooks class provides callbacks for lifecycle events. Create a subclass and implement the methods you need:
from agents import Agent, Runner, RunHooks, RunContextWrapper
from datetime import datetime
import time
class ObservabilityHooks(RunHooks):
"""Lifecycle hooks for observing agent execution."""
def __init__(self):
self.start_time = None
self.events = []
async def on_agent_start(
self,
context: RunContextWrapper,
agent: Agent
) -> None:
"""Called when an agent begins processing."""
self.start_time = time.time()
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
self.events.append(f"[{timestamp}] AGENT_START: {agent.name}")
print(f"[{timestamp}] Agent '{agent.name}' started")
async def on_agent_end(
self,
context: RunContextWrapper,
agent: Agent,
output
) -> None:
"""Called when an agent completes."""
elapsed = time.time() - self.start_time if self.start_time else 0
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
self.events.append(f"[{timestamp}] AGENT_END: {agent.name} ({elapsed:.2f}s)")
print(f"[{timestamp}] Agent '{agent.name}' completed in {elapsed:.2f}s")
async def on_tool_start(
self,
context: RunContextWrapper,
agent: Agent,
tool
) -> None:
"""Called before a tool executes."""
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
tool_name = tool.name if hasattr(tool, 'name') else str(tool)
self.events.append(f"[{timestamp}] TOOL_START: {tool_name}")
print(f"[{timestamp}] Tool '{tool_name}' called")
async def on_tool_end(
self,
context: RunContextWrapper,
agent: Agent,
tool,
result: str
) -> None:
"""Called after a tool completes."""
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
tool_name = tool.name if hasattr(tool, 'name') else str(tool)
self.events.append(f"[{timestamp}] TOOL_END: {tool_name}")
print(f"[{timestamp}] Tool '{tool_name}' returned: {result[:50]}...")
async def on_handoff(
self,
context: RunContextWrapper,
from_agent: Agent,
to_agent: Agent
) -> None:
"""Called when control transfers between agents."""
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
self.events.append(f"[{timestamp}] HANDOFF: {from_agent.name} -> {to_agent.name}")
print(f"[{timestamp}] Handoff: {from_agent.name} -> {to_agent.name}")
# Use hooks with Runner
agent = Agent(name="SupportAgent", instructions="Help customers.")
hooks = ObservabilityHooks()
result = await Runner.run(
agent,
"What's my order status?",
hooks=hooks
)
print(f"\n--- Events ---")
for event in hooks.events:
print(event)
Available Hook Methods
| Method | When Called | Parameters |
|---|---|---|
on_agent_start | Agent begins processing | context, agent |
on_agent_end | Agent completes | context, agent, output |
on_llm_start | Before LLM call | context, agent, system_prompt, input_items |
on_llm_end | After LLM call | context, agent, response |
on_tool_start | Before tool execution | context, agent, tool |
on_tool_end | After tool execution | context, agent, tool, result |
on_handoff | Agent handoff occurs | context, from_agent, to_agent |
Trace Processors for External Integrations
The SDK supports 20+ observability platforms. Use trace processors to send data to external systems.
Adding a Processor (Keeps Default)
from agents import add_trace_processor
# Your custom processor implementing TracingProcessor interface
class MyProcessor:
def on_trace_start(self, trace): ...
def on_trace_end(self, trace): ...
def on_span_start(self, span): ...
def on_span_end(self, span): ...
def shutdown(self): ...
def force_flush(self): ...
add_trace_processor(MyProcessor())
# Now both OpenAI backend AND your processor receive traces
Replacing All Processors
from agents import set_trace_processors
# Replace default with only your processors
set_trace_processors([MyProcessor()])
# OpenAI backend no longer receives traces
Tracing Non-OpenAI Models
When using LiteLLM or other providers, set an API key for trace export:
import os
from agents import set_tracing_export_api_key, Agent, Runner
from agents.extensions.models.litellm_model import LitellmModel
# Use OpenAI API key for tracing even with non-OpenAI models
set_tracing_export_api_key(os.environ["OPENAI_API_KEY"])
# Use Claude via LiteLLM
model = LitellmModel(
model="anthropic/claude-3-sonnet",
api_key=os.environ["ANTHROPIC_API_KEY"]
)
agent = Agent(name="ClaudeAgent", model=model)
result = await Runner.run(agent, "Hello!")
# Traces still sent to OpenAI dashboard
Token Usage Monitoring
Track token usage for cost management:
from agents import Agent, Runner, trace
# Pricing per 1M tokens (GPT-4o as of late 2024)
INPUT_COST_PER_M = 2.50
OUTPUT_COST_PER_M = 10.00
agent = Agent(
name="CostTrackedAgent",
instructions="Answer questions helpfully."
)
with trace("Cost Tracked Workflow") as t:
result = await Runner.run(agent, "Explain quantum computing briefly.")
# Access usage from result
if result.raw_responses:
total_input = 0
total_output = 0
for response in result.raw_responses:
if hasattr(response, 'usage') and response.usage:
total_input += response.usage.input_tokens
total_output += response.usage.output_tokens
# Calculate cost
input_cost = (total_input / 1_000_000) * INPUT_COST_PER_M
output_cost = (total_output / 1_000_000) * OUTPUT_COST_PER_M
total_cost = input_cost + output_cost
print(f"Input tokens: {total_input:,}")
print(f"Output tokens: {total_output:,}")
print(f"Estimated cost: ${total_cost:.6f}")
Complete Observability System
Combine all patterns into a production-ready monitoring system:
from agents import Agent, Runner, RunHooks, RunContextWrapper, trace, function_tool
from pydantic import BaseModel
from datetime import datetime
from typing import List
import time
import json
# Metrics model
class SessionMetrics(BaseModel):
session_id: str
start_time: str = ""
turn_count: int = 0
total_input_tokens: int = 0
total_output_tokens: int = 0
tool_calls: List[str] = []
handoffs: List[str] = []
# Production hooks
class ProductionHooks(RunHooks):
def __init__(self, metrics: SessionMetrics):
self.metrics = metrics
self.turn_start = None
async def on_agent_start(self, context: RunContextWrapper, agent: Agent) -> None:
self.turn_start = time.time()
self.metrics.turn_count += 1
log = {
"event": "agent_start",
"agent": agent.name,
"session": self.metrics.session_id,
"turn": self.metrics.turn_count,
"timestamp": datetime.now().isoformat()
}
print(f"[LOG] {json.dumps(log)}")
async def on_agent_end(self, context: RunContextWrapper, agent: Agent, output) -> None:
elapsed = time.time() - self.turn_start if self.turn_start else 0
log = {
"event": "agent_end",
"agent": agent.name,
"session": self.metrics.session_id,
"elapsed_seconds": round(elapsed, 3),
"timestamp": datetime.now().isoformat()
}
print(f"[LOG] {json.dumps(log)}")
async def on_tool_start(self, context: RunContextWrapper, agent: Agent, tool) -> None:
tool_name = tool.name if hasattr(tool, 'name') else str(tool)
self.metrics.tool_calls.append(tool_name)
log = {
"event": "tool_start",
"tool": tool_name,
"session": self.metrics.session_id,
"timestamp": datetime.now().isoformat()
}
print(f"[LOG] {json.dumps(log)}")
async def on_handoff(self, context: RunContextWrapper, from_agent: Agent, to_agent: Agent) -> None:
handoff = f"{from_agent.name}->{to_agent.name}"
self.metrics.handoffs.append(handoff)
log = {
"event": "handoff",
"from": from_agent.name,
"to": to_agent.name,
"session": self.metrics.session_id,
"timestamp": datetime.now().isoformat()
}
print(f"[LOG] {json.dumps(log)}")
# Tools
@function_tool
def lookup_order(order_id: str) -> str:
"""Look up order details."""
return f"Order {order_id}: 2x Widget Pro, shipped Dec 27"
@function_tool
def check_refund_eligibility(order_id: str) -> str:
"""Check refund eligibility."""
return f"Order {order_id}: Eligible for refund (within 30-day window)"
# Agent
support_agent = Agent(
name="CustomerSupport",
instructions="Help customers with orders and refunds.",
tools=[lookup_order, check_refund_eligibility]
)
# Run with full observability
async def handle_support_request(query: str, session_id: str):
metrics = SessionMetrics(
session_id=session_id,
start_time=datetime.now().isoformat()
)
hooks = ProductionHooks(metrics)
with trace(
"Customer Support",
group_id=session_id,
metadata={"channel": "web"}
):
result = await Runner.run(
support_agent,
query,
hooks=hooks
)
# Track tokens
if result.raw_responses:
for response in result.raw_responses:
if hasattr(response, 'usage') and response.usage:
metrics.total_input_tokens += response.usage.input_tokens
metrics.total_output_tokens += response.usage.output_tokens
print(f"\n=== Session Metrics ===")
print(f"Session: {metrics.session_id}")
print(f"Turns: {metrics.turn_count}")
print(f"Tokens: {metrics.total_input_tokens} in / {metrics.total_output_tokens} out")
print(f"Tools: {metrics.tool_calls}")
return result.final_output
Progressive Project: Support Desk Assistant
Your Support Desk runs in production. Yesterday, a customer complained the agent gave wrong refund information. How do you investigate?
What You're Building
Add observability to debug production issues:
| Capability | What It Shows |
|---|---|
| trace() wrapper | Group related operations |
| Lifecycle hooks | Every agent/tool/handoff event |
| Token tracking | Cost per conversation |
| group_id | Multi-turn conversation traces |
Adding Observability
Extend your Support Desk from Lesson 6 with tracing and monitoring.
Step 1: Create a metrics model
Create a SupportMetrics class tracking session ID, turn count, tool calls, and token usage.
Step 2: Create observability hooks
Implement ProductionHooks with on_agent_start, on_agent_end, on_tool_start, and on_handoff methods that log JSON entries.
Step 3: Wrap runs in traces
Use trace() with group_id to link conversation turns:
with trace("Support Session", group_id=session_id):
result = await Runner.run(agent, message, hooks=hooks)
Step 4: Track token usage
After each run, accumulate tokens from result.raw_responses and calculate costs.
Step 5: Demo scenario
Simulate a 3-turn conversation:
- Customer identifies themselves
- Customer asks about an order
- Customer requests escalation
Print metrics showing tool calls, tokens, and costs.
Extension Challenge
Add cost alerts when spending exceeds thresholds:
if metrics.estimated_cost > 0.10:
print(f"ALERT: Session cost ${metrics.estimated_cost:.4f} exceeds $0.10")
What's Next
Your Support Desk is observable. In Lesson 8, you'll add MCP integration to connect agents to external documentation and APIs.
Try With AI
Prompt 1: Design a Monitoring Architecture
I'm building a production agent handling 1000+ conversations daily.
Help me design observability:
1. What metrics should I track? (latency, tokens, errors)
2. How should I structure log entries for querying?
3. What alerts should I set up?
4. How do I use trace() and group_id for multi-turn conversations?
Show me the RunHooks implementation and trace configuration.
What you're learning: System design for production observability where traditional debugging doesn't work.
Prompt 2: Debug with Traces
My agent responds slowly (3+ seconds per turn). I have tracing enabled.
Help me:
1. What to look for in traces to find bottlenecks
2. Write hooks that measure time in each phase
3. Create a diagnostic pinpointing LLM latency vs tool execution
Show diagnostic code and explain patterns for each bottleneck type.
What you're learning: Performance debugging for AI agents---understanding where time is spent in the agent loop.
Prompt 3: External Integration
I want to send my traces to [Langfuse/Weights & Biases/custom system].
Help me:
1. Create a custom TracingProcessor
2. Use add_trace_processor() to add it
3. Format trace data for my target platform
Show the complete processor implementation.
What you're learning: Integrating agent observability with your existing monitoring infrastructure.
Safety Note
Tracing data contains complete conversation history. For production:
- Data retention: OpenAI retains traces for 30 days by default
- Access control: Restrict trace access for PII-containing conversations
- Sensitive data: Use
RunConfig.trace_include_sensitive_data=Falsefor sensitive content - Compliance: Ensure trace storage meets GDPR, HIPAA, etc. requirements
- Costs: Tracing adds overhead; consider disabling or sampling in high-volume scenarios
- ZDR: Organizations with Zero Data Retention policies cannot use tracing