Skip to main content

LiveKit Architecture Deep Dive

In Lesson 0, you built your livekit-agents skill from official documentation. You now have a knowledge artifact grounded in authoritative sources. But a skill without deep understanding is just a template generator.

This lesson takes you inside LiveKit's architecture. You will understand why OpenAI chose this framework for ChatGPT's Advanced Voice Mode, how the distributed component model enables enterprise scale, and how each piece fits together to create responsive voice agents.

By the end, you will not just generate voice agent code. You will understand what that code does, why it works, and when to deviate from patterns. And you will improve your skill based on what you learn.

The Restaurant Analogy

Before diving into technical components, consider how a restaurant operates:

The Host (Worker): Stands at the entrance. When customers arrive, the host decides which server handles them based on current load. The host does not serve tables directly—they coordinate.

The Server (Agent): Your application logic. A server knows the menu, handles questions, takes orders, and coordinates with the kitchen. Each server can handle multiple tables but has limits.

The Table Conversation (AgentSession): A specific interaction with one customer party. The conversation has context (what they ordered, dietary restrictions mentioned, how the evening is going). When the party leaves, that session ends, but the server continues to other tables.

The Dining Room (Room): The physical space where customers and servers interact. Multiple tables exist in the same room. Customers and servers communicate through the room's audio space.

LiveKit's architecture maps directly:

┌─────────────────────────────────────────────────────────────┐
│ LiveKit Cloud / Server │
│ (Restaurant Building - Infrastructure) │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────┼─────────────────────┐
│ │ │
┌───────▼───────┐ ┌───────▼───────┐ ┌───────▼───────┐
│ Worker 1 │ │ Worker 2 │ │ Worker N │
│ (Host 1) │ │ (Host 2) │ │ (Host N) │
│ Pod in K8s │ │ Pod in K8s │ │ Pod in K8s │
└───────┬───────┘ └───────────────┘ └───────────────┘

┌───────▼───────┐
│ Agent │ ◄── Your application logic (Server)
│ (VoiceAgent) │ What the assistant knows and does
└───────┬───────┘

┌───────▼───────┐
│ AgentSession │ ◄── Individual conversation (Table)
│ (w/ User A) │ Stateful, contextual, temporary
└───────┬───────┘

┌───────▼───────────────────────────────────┐
│ LiveKit Room │
│ (Audio/Video Space) │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ STT │ │ LLM │ │ TTS │ │
│ │Deepgram │ │ GPT-4o │ │Cartesia │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└────────────────────────────────────────────┘

This separation matters for production systems. Workers scale horizontally—add more pods when call volume increases. Agents encapsulate business logic—update reasoning without touching infrastructure. Sessions maintain conversation state—users do not repeat themselves after brief disconnections.

Workers: Process-Level Orchestration

A Worker is a process that connects to LiveKit Cloud and waits for job assignments. When a user initiates a voice session (by joining a Room), LiveKit dispatches a job to an available Worker.

Worker Initialization

Workers start by registering with LiveKit Cloud:

from livekit.agents import cli, WorkerOptions

def entrypoint(ctx: JobContext):
"""Called when a job is assigned to this worker."""
# Your agent logic here
pass

if __name__ == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
)
)

Output:

[worker] Connecting to ws://localhost:7880
[worker] Worker registered: worker_8f3a2b1c
[worker] Waiting for jobs...

The cli.run_app helper handles connection management, reconnection on failure, and graceful shutdown. In production, you would configure credentials via environment variables:

opts = WorkerOptions(
entrypoint_fnc=entrypoint,
ws_url=os.getenv("LIVEKIT_URL"),
api_key=os.getenv("LIVEKIT_API_KEY"),
api_secret=os.getenv("LIVEKIT_API_SECRET"),
)

Load Reporting

Workers report their current load so LiveKit distributes jobs efficiently:

def calculate_load() -> float:
"""Return 0.0 (idle) to 1.0 (at capacity)."""
active_sessions = get_active_session_count()
max_sessions = 10 # Worker capacity
return min(active_sessions / max_sessions, 1.0)

opts = WorkerOptions(
entrypoint_fnc=entrypoint,
load_fnc=calculate_load, # LiveKit queries this periodically
)

Output (during operation):

[worker] Reporting load: 0.3 (3/10 sessions active)
[livekit] Job assigned to worker_8f3a2b1c (lowest load)
[worker] Reporting load: 0.4 (4/10 sessions active)

When a Worker reports high load (approaching 1.0), LiveKit routes new jobs to other Workers. This enables horizontal scaling—add more Worker pods when aggregate load increases.

Job Acceptance Filtering

Not every Worker should handle every job. You can filter based on job metadata:

def should_accept_job(job_request: JobRequest) -> bool:
"""Decide whether this worker should handle this job."""
metadata = json.loads(job_request.job.metadata or "{}")

# Only accept jobs for specific agent types
agent_type = metadata.get("agent_type", "general")
return agent_type in ["sales", "support", "general"]

opts = WorkerOptions(
entrypoint_fnc=entrypoint,
request_fnc=should_accept_job,
)

This pattern enables specialized Worker pools. Sales agents run on Workers optimized for CRM integration. Technical support agents run on Workers with access to documentation systems. The routing happens before job assignment.

JobContext: The Execution Environment

When a Worker accepts a job, LiveKit creates a JobContext containing everything your agent needs:

from livekit.agents import JobContext, AutoSubscribe
from livekit import rtc

async def entrypoint(ctx: JobContext):
"""The main function for each voice session."""

# Access job metadata
job_id = ctx.job.id
room_name = ctx.job.room.name

# The room is not connected yet
room = ctx.room # rtc.Room instance

# Connect to the LiveKit room
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)

# Now room is connected and ready
print(f"Connected to room: {room.name}")
print(f"Local participant: {room.local_participant.identity}")

Output:

[agent] Job received: job_a1b2c3d4
[agent] Connecting to room: meeting-room-42
[agent] Connected to room: meeting-room-42
[agent] Local participant: agent-voice-assistant

Connection Control

The ctx.connect() method controls precisely when and how your agent joins the room:

# Subscribe to audio only (most common for voice agents)
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)

# Subscribe to everything (for video agents)
await ctx.connect(auto_subscribe=AutoSubscribe.SUBSCRIBE_ALL)

# Manual subscription control
await ctx.connect(auto_subscribe=AutoSubscribe.SUBSCRIBE_NONE)
# Then subscribe to specific tracks manually

For voice agents, AUDIO_ONLY reduces bandwidth and processing. Your agent does not need video frames unless it is a multimodal agent (covered in Chapter 83).

Waiting for Participants

Users might join before or after your agent connects. The wait_for_participant method handles both cases:

async def entrypoint(ctx: JobContext):
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)

# Wait for a human participant (not another agent)
participant = await ctx.wait_for_participant(
kind=rtc.ParticipantKind.STANDARD
)

print(f"User joined: {participant.identity}")

# Access participant attributes (set by your frontend)
language = participant.attributes.get("language", "en")
print(f"User language preference: {language}")

Output:

[agent] Waiting for participant...
[agent] User joined: user-john-doe
[agent] User language preference: es

This pattern ensures your agent only starts conversing when a real user is present, not when test bots or monitoring tools connect.

Shutdown Callbacks

Voice sessions require cleanup—saving conversation history, releasing resources, updating CRM records. Register shutdown callbacks for post-session work:

async def entrypoint(ctx: JobContext):
session_data = {"transcript": [], "start_time": time.time()}

async def save_session():
"""Called when session ends."""
duration = time.time() - session_data["start_time"]
await database.save_transcript(
room=ctx.job.room.name,
transcript=session_data["transcript"],
duration=duration
)
print(f"Session saved: {len(session_data['transcript'])} turns, {duration:.1f}s")

# Register cleanup
ctx.add_shutdown_callback(save_session)

# Main agent logic
await ctx.connect()
# ... rest of agent implementation

Output (on session end):

[agent] Session ending...
[agent] Running shutdown callback: save_session
[agent] Session saved: 12 turns, 247.3s
[agent] Disconnected from room

Shutdown callbacks have a 60-second timeout by default. Keep cleanup operations fast or run them asynchronously.

Agents: Your Application Logic

The Agent class encapsulates what your voice assistant does. While JobContext manages the execution environment, Agent manages the conversational behavior:

from livekit.agents import Agent, AgentSession, RoomEvent

class TaskManagerAgent(Agent):
"""Voice agent for the Task Manager application."""

def __init__(self):
super().__init__()
self.instructions = """
You are a voice assistant for Task Manager. You help users:
- Check their tasks for today
- Create new tasks with due dates
- Mark tasks as complete
- Review upcoming deadlines

Be concise in voice responses. Confirm actions taken.
"""

async def on_enter(self):
"""Called when agent enters a session."""
print("Agent entered session")
# Initialize session-specific state
self.task_context = await self.load_user_tasks()

async def on_user_turn(self, turn):
"""Called when user finishes speaking."""
user_text = turn.text
print(f"User said: {user_text}")

# Process through your LLM with tool support
response = await self.process_with_tools(user_text)
return response

async def on_exit(self):
"""Called when agent exits session."""
print("Agent exiting session")
await self.save_session_state()

Output (during conversation):

[agent] Agent entered session
[agent] Loaded 7 tasks for user
[agent] User said: What tasks do I have today?
[agent] Processing with tools...
[agent] Tool called: list_tasks(status="today")
[agent] Speaking response: You have 3 tasks due today...

Agent vs AgentSession

The distinction matters:

ConceptPurposeLifecycle
AgentApplication logic and behaviorReusable across sessions
AgentSessionState for one conversationCreated per user, destroyed on disconnect

One Agent class can handle many AgentSessions. The Agent defines behavior; the AgentSession maintains context:

async def entrypoint(ctx: JobContext):
# Create the agent (reusable logic)
agent = TaskManagerAgent()

await ctx.connect()
participant = await ctx.wait_for_participant()

# Create a session for this specific conversation
session = AgentSession(agent)

# Start the session with the room (this begins listening)
await session.start(ctx.room, participant)

# Session runs until participant disconnects
await session.wait_for_close()

Output:

[agent] Creating TaskManagerAgent
[agent] Waiting for participant...
[agent] Participant joined: user-jane
[agent] Creating AgentSession for user-jane
[agent] Session started, listening for audio...
[session] User speaking...
[session] Turn detected, processing...

WebRTC Audio Integration

LiveKit uses WebRTC for low-latency audio transport. Your agent receives and publishes audio through tracks:

Subscribing to Audio

When users speak, their audio arrives as a track:

from livekit import rtc

async def entrypoint(ctx: JobContext):
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)

@ctx.room.on("track_subscribed")
def on_track_subscribed(
track: rtc.Track,
publication: rtc.RemoteTrackPublication,
participant: rtc.RemoteParticipant
):
if track.kind == rtc.TrackKind.KIND_AUDIO:
print(f"Subscribed to audio from {participant.identity}")
# The track is now receiving audio frames
asyncio.create_task(process_audio(track))

@ctx.room.on("track_unsubscribed")
def on_track_unsubscribed(
track: rtc.Track,
publication: rtc.RemoteTrackPublication,
participant: rtc.RemoteParticipant
):
print(f"Unsubscribed from {participant.identity}")

Output:

[room] Track subscribed: audio from user-john
[audio] Processing audio stream...
[audio] Audio frame received: 480 samples, 48000 Hz

Publishing Audio

Your agent speaks by publishing an audio track:

async def speak(ctx: JobContext, audio_source: rtc.AudioSource):
"""Publish audio to the room."""
# Create a local audio track
track = rtc.LocalAudioTrack.create_audio_track("voice", audio_source)

# Publish to room
options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE)
publication = await ctx.room.local_participant.publish_track(track, options)

print(f"Published audio track: {publication.sid}")

In practice, you do not manage audio frames directly. The AgentSession handles the STT/LLM/TTS pipeline and publishes responses automatically.

Room Events

Critical events in a voice session:

@ctx.room.on("participant_connected")
def on_participant_connected(participant: rtc.RemoteParticipant):
print(f"Participant joined: {participant.identity}")

@ctx.room.on("participant_disconnected")
def on_participant_disconnected(participant: rtc.RemoteParticipant):
print(f"Participant left: {participant.identity}")

@ctx.room.on("connection_state_changed")
def on_connection_state_changed(state: rtc.ConnectionState):
print(f"Connection state: {state}")
if state == rtc.ConnectionState.CONN_DISCONNECTED:
# Handle disconnection (maybe attempt reconnect)
pass

@ctx.room.on("data_received")
def on_data_received(data: rtc.DataPacket):
# Receive non-audio data (e.g., typed messages)
print(f"Data received: {data.data.decode()}")

Output (during session):

[room] Participant joined: user-alex
[room] Connection state: CONN_CONNECTED
[room] Track subscribed: audio from user-alex
...
[room] Participant left: user-alex
[room] Connection state: CONN_DISCONNECTED

Voice Pipeline Configuration

A voice agent needs three AI services working in concert:

ComponentPurposeExample Providers
STTConvert user speech to textDeepgram, Whisper, AssemblyAI
LLMGenerate response contentGPT-4o, Claude, Gemini
TTSConvert response to speechCartesia, ElevenLabs, Deepgram

Configuring the Pipeline

LiveKit provides plugin classes for each provider:

from livekit.agents import VoiceAgent
from livekit.plugins import deepgram, openai, cartesia

async def entrypoint(ctx: JobContext):
await ctx.connect()

# Configure the voice pipeline
agent = VoiceAgent(
# Speech-to-Text: Fast, accurate transcription
stt=deepgram.STT(
model="nova-3", # Latest Deepgram model
language="en",
),

# Language Model: Reasoning and response generation
llm=openai.LLM(
model="gpt-4o-mini", # Fast, cost-effective
temperature=0.7,
),

# Text-to-Speech: Natural voice synthesis
tts=cartesia.TTS(
voice="sonic-english-female", # Choose voice
),

# System instructions
instructions="""
You are a helpful voice assistant for Task Manager.
Keep responses concise - users are listening, not reading.
Confirm actions taken. Ask clarifying questions when needed.
"""
)

# Wait for user and start session
participant = await ctx.wait_for_participant()
session = AgentSession(agent)
await session.start(ctx.room, participant)

Output:

[agent] Initializing STT: deepgram.nova-3
[agent] Initializing LLM: openai.gpt-4o-mini
[agent] Initializing TTS: cartesia.sonic-english-female
[agent] Pipeline ready, waiting for speech...
[stt] User speaking detected
[stt] Transcription: "What tasks are due today?"
[llm] Generating response...
[tts] Synthesizing speech...
[agent] Speaking: "You have 3 tasks due today..."

Switching Providers

The plugin architecture makes swapping providers trivial:

# Option A: Budget-optimized
stt = deepgram.STT(model="nova-3") # ~$0.0077/min
llm = openai.LLM(model="gpt-4o-mini") # ~$0.0015/min
tts = deepgram.TTS(model="aura-stella") # ~$0.015/min
# Total: ~$0.024/min

# Option B: Quality-optimized
stt = deepgram.STT(model="nova-3") # Same STT
llm = openai.LLM(model="gpt-4o") # Better reasoning
tts = cartesia.TTS(voice="sonic") # Higher quality voice
# Total: ~$0.08/min

# Option C: Local development (no API costs)
stt = whisper.STT(model="base") # Local Whisper
llm = ollama.LLM(model="llama3.2") # Local Ollama
tts = piper.TTS(model="en_US") # Local Piper
# Total: $0/min (but higher latency)

This flexibility lets you prototype cheaply and upgrade for production.

Putting It All Together

Here is a complete voice agent implementation:

from livekit.agents import (
cli, JobContext, AutoSubscribe,
VoiceAgent, AgentSession, WorkerOptions
)
from livekit.plugins import deepgram, openai, cartesia
from livekit import rtc
import asyncio
import os

async def entrypoint(ctx: JobContext):
"""Main entrypoint for voice agent jobs."""

# Connect to room, audio only
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)

# Wait for a human participant
participant = await ctx.wait_for_participant(
kind=rtc.ParticipantKind.STANDARD
)
print(f"User connected: {participant.identity}")

# Configure the voice pipeline
agent = VoiceAgent(
stt=deepgram.STT(model="nova-3"),
llm=openai.LLM(model="gpt-4o-mini"),
tts=cartesia.TTS(voice="sonic-english"),
instructions="""
You are a voice assistant for Task Manager.
Help users manage their tasks efficiently.
Keep responses under 30 words when possible.
""",
)

# Create and start session
session = AgentSession(agent)
await session.start(ctx.room, participant)

# Register cleanup
async def save_transcript():
transcript = session.get_transcript()
print(f"Saving transcript: {len(transcript)} turns")
# await database.save(transcript)

ctx.add_shutdown_callback(save_transcript)

# Wait for session to end
await session.wait_for_close()
print("Session complete")

if __name__ == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
)
)

Output (full session):

$ python voice_agent.py dev

[worker] Starting in development mode
[worker] Connected to ws://localhost:7880
[worker] Waiting for jobs...

[worker] Job received: job_xyz123
[agent] Connecting to room: task-manager-room
[agent] User connected: user-jane
[agent] Pipeline initialized
[agent] Listening for speech...

[stt] User speaking...
[stt] "Add a task to review the proposal by Friday"
[llm] Generating response...
[tts] Synthesizing...
[agent] Speaking: "I've created a task to review the proposal, due Friday. Is there anything else?"

[stt] User speaking...
[stt] "No, that's all. Thanks."
[llm] Generating response...
[agent] Speaking: "You're welcome. Have a productive day!"

[agent] Participant disconnected
[agent] Saving transcript: 2 turns
[agent] Session complete
[worker] Job finished, waiting for next...

Reflect on Your Skill

You now understand the architecture that your livekit-agents skill generates code for. Consider what you learned:

New Knowledge:

  • Workers coordinate job distribution across processes
  • JobContext provides the execution environment for each session
  • Agents encapsulate behavior; AgentSessions encapsulate state
  • WebRTC handles audio through tracks and room events
  • The plugin system makes provider switching trivial

Skill Improvement Questions:

  1. Does your skill explain when to use load_fnc for custom load reporting?
  2. Does it guide users on AutoSubscribe options?
  3. Does it include shutdown callback patterns for cleanup?
  4. Does it show how to switch between provider configurations?

Update your skill based on gaps you discovered. A skill that evolves with your understanding becomes increasingly valuable.

Try With AI

Prompt 1: Explore Agent Lifecycle

I'm learning LiveKit's Agents architecture. Use my livekit-agents skill
to help me understand the agent lifecycle:

1. When a user starts a voice call, what happens first?
2. Walk me through the sequence: Worker → Job → Agent → Session
3. What state exists at each level? What gets created vs reused?
4. If the user's connection drops for 5 seconds, what survives?

Draw a sequence diagram showing the complete lifecycle from user click
to session end. I want to understand the distributed nature of this system.

What you are learning: Distributed systems thinking. Understanding component lifecycles helps you debug issues that span multiple layers—a skill essential for production voice agents.

Prompt 2: Debug Session Management

I implemented a LiveKit voice agent, but I'm seeing this behavior:
- First user connects: Works perfectly
- Second user connects: Works perfectly
- Third user connects: Long delay before agent responds

My Worker is running on a single machine. Help me diagnose:

1. What could cause progressive degradation?
2. How do I check if my Worker is reporting load correctly?
3. Should I be using connection pooling for my LLM/STT/TTS clients?
4. What metrics should I monitor to catch this earlier?

Use my livekit-agents skill to help me think through this systematically.

What you are learning: Production debugging. Voice agents fail in ways that text agents do not—latency accumulation, resource exhaustion, audio pipeline backpressure. Understanding failure modes makes you a better architect.

Prompt 3: Improve Your Skill

Based on what I learned about LiveKit architecture, here's what my
livekit-agents skill is missing:

1. It doesn't explain the Worker/Agent/Session separation clearly
2. It doesn't include JobContext patterns (wait_for_participant, shutdown hooks)
3. It doesn't show how to configure load_fnc for horizontal scaling

Help me update the skill with:
- An architecture decision tree (when to add more Workers vs optimize Agents)
- A JobContext checklist (what every entrypoint should include)
- Provider configuration templates (budget vs quality vs local)

The goal is making the skill MORE USEFUL for production deployments.

What you are learning: Skill evolution. Your livekit-agents skill should grow with your understanding. Each lesson adds patterns that make the skill more valuable for future projects.

Safety Note

Voice agents process audio in realtime, creating unique failure modes. Your agent might hear correctly but respond inappropriately. Always test with:

  • Background noise (office chatter, traffic, music)
  • Diverse accents and speech patterns
  • Edge cases (very long pauses, incomplete sentences, corrections)
  • Network degradation (dropped packets, latency spikes)

Start with internal testing before exposing to users. Monitor transcripts early to catch misunderstandings before they become user complaints.