Skip to main content

Implementation & Integration

Your specification defines what you're building. Your accumulated skills from Part 11 know how to build it. Now you orchestrate: AI uses YOUR skills to implement YOUR specification.

This is Layer 4 in action. You wrote the spec in Lesson 1. You're not asking AI to figure out what to build - that's decided. You're directing AI to implement each component, validating against your requirements as you go. If the implementation reveals gaps in your spec, you update the spec. The specification remains the source of truth.

By the end of this lesson, your voice-enabled Task Manager will accept voice commands through browser and phone, see your screen when you share it, and handle natural conversation patterns like interruptions and corrections. The economy stack (Deepgram + GPT-4o-mini + Cartesia) keeps costs at your $0.03-0.07/minute target while maintaining sub-800ms response times.


Implementation Strategy

Your specification from Lesson 1 established the requirements. Now you implement in this order:

Implementation Order (Dependencies Flow Downward)
┌─────────────────────────────────────────────────────────────────┐
│ │
│ 1. Voice Agent Core │
│ ├── Economy stack providers (STT, LLM, TTS) │
│ ├── MCP server connection (Task Manager API) │
│ └── Base conversation handling │
│ │ │
│ ▼ │
│ 2. Browser Channel (LiveKit WebRTC) │
│ ├── WebRTC room connection │
│ ├── Audio capture with Silero VAD │
│ └── UI integration (mute, speaking indicator) │
│ │ │
│ ▼ │
│ 3. Phone Channel (Twilio SIP) │
│ ├── SIP trunk routing │
│ ├── Inbound call handling │
│ └── Phone-specific greeting │
│ │ │
│ ▼ │
│ 4. Screen Sharing (Gemini Live) │
│ ├── Screen capture consent flow │
│ ├── Visual context extraction │
│ └── Voice + vision multimodal integration │
│ │ │
│ ▼ │
│ 5. Conversation Polish │
│ ├── Semantic turn detection tuning │
│ ├── Barge-in and interruption handling │
│ └── Filler speech for async operations │
│ │
└─────────────────────────────────────────────────────────────────┘

Test each layer before moving to the next. A working browser channel validates your core before adding phone complexity.


Voice Agent Core

Your livekit-agents skill knows the patterns. Use it to scaffold the economy stack:

from livekit.agents import VoicePipelineAgent, AgentContext, function_tool
from livekit.agents.mcp import MCPServerStdio
from livekit.plugins import deepgram, openai, cartesia, silero

# Economy stack: $0.033/min combined
# Deepgram Nova-3: $0.0077/min (90ms latency)
# GPT-4o-mini: $0.0015/min (~300ms latency)
# Cartesia Sonic-3: $0.024/min (60ms latency)

async def create_task_manager_agent(ctx: AgentContext):
"""Voice agent for Task Manager with economy stack."""

# Configure semantic turn detection for conversational flow
turn_detector = openai.TurnDetector(
min_endpointing_delay=0.4, # Wait at least 400ms
max_endpointing_delay=2.5, # Never more than 2.5s
unlikely_threshold=0.15, # Patient for thinking pauses
)

# Create voice pipeline with economy stack
agent = VoicePipelineAgent(
vad=silero.VAD.load(),
stt=deepgram.STT(model="nova-3"), # Economy: fast, accurate
llm=openai.LLM(model="gpt-4o-mini"), # Economy: capable, cheap
tts=cartesia.TTS(model="sonic-3"), # Economy: natural voice
turn_detector=turn_detector,
allow_interruptions=True,
interrupt_speech_duration=0.5,
interrupt_min_words=2,
)

# System prompt establishes Task Manager persona
agent.system_prompt = """You are a voice assistant for Task Manager.

Your role:
- Help users manage tasks through natural voice conversation
- Create, list, complete, and delete tasks via voice commands
- Keep responses concise (2-3 sentences max for voice)
- Confirm actions briefly: "Done, I've created that task"

Style:
- Professional but conversational
- Proactive: "Would you like to add a due date?"
- Acknowledge corrections: "Got it, let me fix that"

Available tools: list_tasks, create_task, complete_task, delete_task, get_task_details
"""

return agent

Output:

Voice agent created with economy stack:
- STT: Deepgram Nova-3 ($0.0077/min)
- LLM: GPT-4o-mini ($0.0015/min)
- TTS: Cartesia Sonic-3 ($0.024/min)
- Total: $0.033/min (within $0.03-0.07 target)

Turn detection: 400-2500ms window, 0.15 threshold
Interruptions: Enabled (500ms, 2+ words)

Connecting Task Manager via MCP

Your Task Manager API from Parts 6-7 becomes accessible through MCP. The voice agent calls these tools during conversation:

from livekit.agents.mcp import MCPServerStdio
import httpx

# MCP server connection for Task Manager
task_manager_mcp = MCPServerStdio(
command="python",
args=["task_manager_mcp_server.py"],
)

# Define tools with voice-appropriate filler speech
@function_tool(
description="List tasks by status (all, pending, completed, due_today)",
filler_speech="Checking your tasks...",
)
async def list_tasks(status: str = "all") -> dict:
"""Query tasks from Task Manager API."""
async with httpx.AsyncClient() as client:
response = await client.get(
"http://localhost:8000/api/tasks",
params={"status": status}
)
return response.json()

@function_tool(
description="Create a new task with title and optional due date",
filler_speech="Creating that task...",
)
async def create_task(title: str, due_date: str | None = None) -> dict:
"""Create a task in Task Manager."""
async with httpx.AsyncClient() as client:
payload = {"title": title}
if due_date:
payload["due_date"] = due_date
response = await client.post(
"http://localhost:8000/api/tasks",
json=payload
)
return response.json()

@function_tool(
description="Mark a task as complete by its ID or title",
filler_speech="Marking that done...",
)
async def complete_task(task_identifier: str) -> dict:
"""Complete a task (accepts ID or partial title match)."""
# First find the task
tasks = await list_tasks(status="pending")
matching = [t for t in tasks["tasks"]
if task_identifier.lower() in t["title"].lower()]

if not matching:
return {"error": f"No pending task matching '{task_identifier}'"}

task = matching[0] # Take first match
async with httpx.AsyncClient() as client:
response = await client.patch(
f"http://localhost:8000/api/tasks/{task['id']}",
json={"status": "completed"}
)
return response.json()

Output:

MCP tools registered:
- list_tasks: "Checking your tasks..."
- create_task: "Creating that task..."
- complete_task: "Marking that done..."

Conversation example:
User: "What tasks do I have due today?"
Agent: "Checking your tasks... You have 3 tasks due today: Review proposal,
Submit report, and Call vendor."
User: "Mark the proposal one as done"
Agent: "Marking that done... Done! I've completed 'Review proposal'."

Browser Channel: LiveKit WebRTC

The browser channel uses your web-audio-capture skill for microphone handling and connects through LiveKit's WebRTC infrastructure.

Agent-Side Room Connection

from livekit.agents import WorkerOptions, cli
from livekit import rtc

async def entrypoint(ctx: AgentContext):
"""Entry point for browser connections via LiveKit."""

# Create the voice agent
agent = await create_task_manager_agent(ctx)

# Connect tools
agent.tools = [list_tasks, create_task, complete_task]

# Handle room events
@ctx.room.on("participant_connected")
def on_participant_connected(participant: rtc.RemoteParticipant):
print(f"Browser user connected: {participant.identity}")

@ctx.room.on("participant_disconnected")
def on_participant_disconnected(participant: rtc.RemoteParticipant):
print(f"Browser user disconnected: {participant.identity}")

# Start the agent
await agent.start(ctx)

# Greet the user
await agent.say("Hi, I'm your Task Manager assistant. How can I help?")

# Worker configuration
if __name__ == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
worker_type=cli.WorkerType.ROOM,
)
)

Output:

LiveKit worker started
Listening for browser connections on room: task-manager-*

[New connection]
Browser user connected: user_abc123
Agent: "Hi, I'm your Task Manager assistant. How can I help?"

Browser Client Integration

Your web application connects to the LiveKit room and streams audio:

import { Room, RoomEvent, Track } from 'livekit-client';

async function connectVoiceAgent() {
// Create room connection
const room = new Room();

// Handle agent audio
room.on(RoomEvent.TrackSubscribed, (track, publication, participant) => {
if (track.kind === Track.Kind.Audio && participant.identity.startsWith('agent')) {
// Attach agent audio to speaker
const audioElement = track.attach();
document.body.appendChild(audioElement);
}
});

// Connect to LiveKit server
await room.connect(
'wss://your-livekit-server.com',
'user-jwt-token'
);

// Enable microphone
await room.localParticipant.setMicrophoneEnabled(true);

console.log('Voice agent connected. Speak to interact.');
}

// UI controls
function toggleMute() {
const muted = room.localParticipant.isMicrophoneEnabled;
room.localParticipant.setMicrophoneEnabled(!muted);
updateMuteButton(!muted);
}

Output:

Browser console:
Voice agent connected. Speak to interact.

User: "Create a task to review the quarterly report by Friday"
Agent: "Creating that task... Done! I've added 'Review quarterly report'
due this Friday. Anything else?"

Testing Browser Channel

Before adding phone, validate browser works end-to-end:

Test CaseExpected Result
Voice command: "List my tasks"Agent lists tasks from API
Voice command: "Create task"Task created, confirmation spoken
Interrupt during responseAgent stops, listens, responds to new input
Long pause ("Let me think...")Agent waits, doesn't interrupt
Mute button toggleAudio stops/resumes, agent acknowledges silence

Phone Channel: Twilio SIP

Your voice-telephony skill handles Twilio integration. Phone calls route through SIP trunk to your LiveKit agent.

Twilio SIP Configuration

from livekit.agents import WorkerOptions, cli
from livekit.agents.sip import SIPParticipant

# Phone-specific entrypoint
async def phone_entrypoint(ctx: AgentContext):
"""Entry point for phone calls via Twilio SIP."""

# Create agent with phone-specific configuration
agent = await create_task_manager_agent(ctx)
agent.tools = [list_tasks, create_task, complete_task]

# Detect phone participants
@ctx.room.on("participant_connected")
async def on_phone_call(participant):
if isinstance(participant, SIPParticipant):
print(f"Phone call from: {participant.phone_number}")

# Phone-specific greeting (more formal)
await agent.say(
"Task Manager, how can I help you today?"
)

@ctx.room.on("participant_disconnected")
async def on_hangup(participant):
if isinstance(participant, SIPParticipant):
print(f"Call ended: {participant.phone_number}")

await agent.start(ctx)

Twilio to LiveKit Routing

Configure your Twilio SIP trunk to route to LiveKit:

# livekit-server configuration
sip:
inbound:
# Your Twilio SIP trunk endpoint
trunks:
- trunk_id: twilio_main
domain: your-domain.sip.twilio.com
username: livekit_user
password: ${SIP_PASSWORD}

# Dispatch inbound calls to voice agent
dispatch_rules:
- rule_id: task_manager
rule:
dispatchRuleDirect:
roomName: task-manager-phone
pin: "" # No PIN required
# Twilio webhook handler (receives incoming calls)
from fastapi import FastAPI, Request
from twilio.twiml.voice_response import VoiceResponse

app = FastAPI()

@app.post("/twilio/incoming")
async def handle_incoming_call(request: Request):
"""Route incoming Twilio call to LiveKit SIP."""
response = VoiceResponse()

# Connect to LiveKit via SIP
dial = response.dial()
dial.sip(
"sip:task-manager-phone@your-livekit-server.com",
username="livekit_user",
password=os.environ["SIP_PASSWORD"]
)

return str(response)

Output:

Incoming call: +1-555-123-4567
Routing to LiveKit room: task-manager-phone

Phone call from: +1-555-123-4567
Agent: "Task Manager, how can I help you today?"
User: "I need to check my tasks for today"
Agent: "Checking your tasks... You have 3 tasks due today..."

Phone Channel Considerations

Phone audio differs from browser:

FactorBrowserPhoneHandling
Audio QualityHigh (opus)Variable (G.711)Deepgram handles both
Latency~50ms100-200msAccount in turn detection
Background NoiseControlledVariableSilero VAD filters
DisconnectGracefulSudden (hangup)Session cleanup required
# Phone-specific turn detection (more patient for phone latency)
phone_turn_detector = openai.TurnDetector(
min_endpointing_delay=0.5, # Slightly longer for phone
max_endpointing_delay=3.0, # More patient
unlikely_threshold=0.12, # More conservative
)

Multimodal Screen Sharing: Gemini Live

Your Gemini Live API knowledge from Chapter 83 enables the "see my screen" capability. When users share their screen, the agent gains visual context for task creation.

Screen Share Integration

import google.generativeai as genai
from PIL import Image
import io
import base64

class MultimodalVoiceAgent:
"""Voice agent with screen sharing capability via Gemini Live."""

def __init__(self, base_agent: VoicePipelineAgent):
self.base_agent = base_agent
self.screen_context = None

# Initialize Gemini for vision
genai.configure(api_key=os.environ["GOOGLE_API_KEY"])
self.vision_model = genai.GenerativeModel("gemini-2.0-flash")

async def handle_screen_share(self, image_data: bytes):
"""Process incoming screen share frame."""
image = Image.open(io.BytesIO(image_data))

# Extract visual context using Gemini
response = await self.vision_model.generate_content_async([
"Describe what's visible on this screen in 2-3 sentences. "
"Focus on any text, documents, or actionable items.",
image
])

self.screen_context = response.text
return self.screen_context

async def create_task_from_screen(self, voice_command: str):
"""Create task using visual context from screen share."""
if not self.screen_context:
await self.base_agent.say(
"I don't see your screen. Would you like to share it?"
)
return

# Combine voice command with visual context
prompt = f"""
The user said: "{voice_command}"

They're looking at: {self.screen_context}

Based on what they said and what's on their screen,
what task should be created? Respond with just the task title.
"""

response = await self.vision_model.generate_content_async(prompt)
task_title = response.text.strip()

# Create the task
result = await create_task(title=task_title)

await self.base_agent.say(
f"Created task: {task_title}. Want me to add a due date?"
)
# Browser-side screen share consent
async def request_screen_share():
"""Handle screen share consent and streaming."""
try:
# Request screen capture permission
stream = await navigator.mediaDevices.getDisplayMedia({
video: {
cursor: 'always',
frameRate: 1 # Low frame rate sufficient for context
}
})

# Stream to agent for processing
await room.localParticipant.publishTrack(
stream.getVideoTracks()[0],
{ source: Track.Source.ScreenShare }
)

console.log('Screen sharing active')
return true

except (error) {
console.log('Screen share cancelled or denied')
return false
}
}

Voice + Vision Interaction

User: [sharing screen showing email about project deadline]
User: "Add what I'm looking at to my tasks"

Agent processing:
1. Capture current screen frame
2. Gemini analyzes: "Email showing project proposal due January 15th,
from Sarah Johnson, requesting review and approval"
3. Combine with voice command "add to tasks"
4. Generate task: "Review and approve project proposal from Sarah"

Agent: "Created task: 'Review and approve project proposal from Sarah'.
I noticed it's due January 15th - should I set that as the due date?"

User: "Yes please"
Agent: "Done, I've set the due date to January 15th."

Output:

Screen share started
Captured context: "Email from Sarah Johnson, subject: Project Proposal Review,
mentions deadline of January 15th, 2026"

User voice: "Add what I'm looking at to my tasks"
Gemini analysis: Combining voice intent with visual context...
Task created: "Review and approve project proposal from Sarah"
Due date suggested: 2026-01-15 (extracted from email)

Natural Conversation Patterns

The final polish makes your voice agent feel like a natural conversation partner.

Semantic Turn Detection Tuning

Your spec requires sub-800ms response latency. Configure turn detection to balance speed with patience:

# Production configuration based on testing
production_turn_detector = openai.TurnDetector(
min_endpointing_delay=0.35, # Quick response to complete sentences
max_endpointing_delay=2.5, # Patient for complex thoughts
unlikely_threshold=0.15, # Balanced sensitivity
)

# Domain-specific patterns to recognize:
# - "Let me think..." -> Wait longer
# - "Actually, wait..." -> Expect correction
# - "Thanks, that's all" -> End conversation

Barge-In Handling

When users interrupt, acknowledge and adapt:

class ConversationStateManager:
"""Manage conversation state during interruptions."""

def __init__(self, agent: VoicePipelineAgent):
self.agent = agent
self.pending_operations = []
self.last_topic = None

async def handle_interruption(self, event):
"""Called when user interrupts agent speech."""
# Cancel any pending operations
for op in self.pending_operations:
await op.cancel()
self.pending_operations.clear()

# Log for debugging
print(f"Interrupted at: '{event.text_spoken}'")
print(f"User said: '{event.user_speech}'")

# Detect correction patterns
user_words = event.user_speech.lower()
if any(word in user_words for word in ["wait", "no", "actually", "sorry"]):
# User is correcting - acknowledge
self.agent.system_prompt += "\nUser just corrected you. Acknowledge and respond to their new intent."

async def handle_long_pause(self, duration: float):
"""Handle when user pauses mid-thought."""
if duration > 3.0:
# Gentle prompt after 3 seconds
await self.agent.say("Take your time, I'm listening.")

# Wire up handlers
state_manager = ConversationStateManager(agent)

@agent.on("agent_speech_interrupted")
async def on_interrupt(event):
await state_manager.handle_interruption(event)

Output:

Conversation flow:
Agent: "To reset your password, first go to settings..."
User: "No wait, I meant my account password, not my app password"
[Interrupted at: "To reset your password, first go to settings"]
Agent: "Got it, for your account password, here's what you need..."

Filler Speech for Operations

Long operations need spoken feedback to avoid awkward silence:

# Filler patterns by operation duration
FILLER_PATTERNS = {
"quick": ["Here it is...", "Got it..."],
"medium": ["Let me check that for you...", "Working on that..."],
"long": ["This might take a moment...", "Still working on it..."],
}

async def with_progressive_filler(operation, agent, operation_name="that"):
"""Execute operation with progressive filler speech."""

# Start with medium filler
await agent.say(f"Working on {operation_name}...")

task = asyncio.create_task(operation)
elapsed = 0

while not task.done():
await asyncio.sleep(2)
elapsed += 2

if elapsed >= 4 and not task.done():
await agent.say("Almost there...")
elif elapsed >= 8 and not task.done():
await agent.say("Just a moment longer...")

return await task

# Usage
result = await with_progressive_filler(
list_tasks(status="all"),
agent,
operation_name="your task list"
)

Integration Testing

Before declaring implementation complete, validate against your specification:

Automated Test Suite

import pytest
import asyncio
from datetime import datetime

@pytest.mark.asyncio
async def test_browser_voice_command():
"""Test voice command through browser channel."""
agent = await create_task_manager_agent(test_context)

# Simulate voice input
response = await agent.process_speech("What tasks do I have today?")

assert "tasks" in response.lower()
assert response_time < 0.8 # Sub-800ms spec requirement

@pytest.mark.asyncio
async def test_task_creation_flow():
"""Test full task creation via voice."""
agent = await create_task_manager_agent(test_context)

# Create task
await agent.process_speech("Create a task to review the proposal")

# Verify task exists
tasks = await list_tasks(status="pending")
assert any("proposal" in t["title"].lower() for t in tasks["tasks"])

@pytest.mark.asyncio
async def test_interruption_handling():
"""Test barge-in interruption recovery."""
agent = await create_task_manager_agent(test_context)

# Start a response
response_task = asyncio.create_task(
agent.process_speech("What are all my tasks?")
)

# Interrupt after 200ms
await asyncio.sleep(0.2)
await agent.interrupt_with("Actually, just the ones due today")

# Should respond to correction
result = await response_task
assert "today" in result.lower()

@pytest.mark.asyncio
async def test_latency_target():
"""Verify sub-800ms end-to-end latency."""
agent = await create_task_manager_agent(test_context)

latencies = []
for _ in range(10):
start = datetime.now()
await agent.process_speech("List my tasks")
latencies.append((datetime.now() - start).total_seconds() * 1000)

p95_latency = sorted(latencies)[8] # 95th percentile
assert p95_latency < 800, f"P95 latency {p95_latency}ms exceeds 800ms target"

Manual Integration Checklist

Validate these scenarios against your spec:

Browser Channel Tests:
├── [ ] Voice command "list my tasks" returns task list
├── [ ] Voice command creates task with spoken confirmation
├── [ ] Interruption stops agent, responds to correction
├── [ ] Long pause ("um, let me think") doesn't trigger premature response
├── [ ] Mute button pauses audio capture
└── [ ] Latency measured: ___ms (target: <800ms)

Phone Channel Tests:
├── [ ] Inbound call connects to agent
├── [ ] Phone greeting plays: "Task Manager, how can I help?"
├── [ ] Voice commands work through phone audio
├── [ ] Call hangup properly disconnects
└── [ ] Phone-specific turn detection feels natural

Screen Share Tests:
├── [ ] Screen share consent flow works
├── [ ] "Add what I'm looking at" creates contextual task
├── [ ] Visual context extraction is accurate
├── [ ] Screen share can be stopped/restarted
└── [ ] Works without screen share (graceful degradation)

Cost Validation:
├── [ ] Per-minute cost calculated: $___/min
├── [ ] Within target range: $0.03-0.07/min
└── [ ] Cost logging enabled for production

Try With AI

Prompt 1: Scaffold the Voice Agent Core

Using my livekit-agents skill, scaffold the voice agent core for my
Task Manager capstone:

From my spec:
- STT: Deepgram Nova-3 ($0.0077/min)
- LLM: GPT-4o-mini ($0.0015/min)
- TTS: Cartesia Sonic-3 ($0.024/min)
- MCP Tools: list_tasks, create_task, complete_task, delete_task

Requirements:
1. Semantic turn detection with 400-2500ms window
2. Barge-in handling enabled
3. System prompt for Task Manager persona
4. Filler speech for tool operations
5. Interruption state management

Generate the complete Python implementation. I'll test each component
against my spec requirements and validate latency.

Follow my spec exactly. Flag anything that seems impossible or conflicting.

What you're learning: Spec-driven implementation - AI implements YOUR specification. You validate. You update the spec if implementation reveals gaps.

Prompt 2: Integrate Phone Channel

Using my voice-telephony skill, integrate Twilio phone support for my
Task Manager voice agent.

My setup:
- LiveKit server running at: [your-livekit-domain]
- Twilio account with SIP trunk
- Voice agent core already working for browser

Requirements from my spec:
- Inbound calls route to LiveKit agent
- Same conversation logic as browser channel
- Phone-specific greeting: "Task Manager, how can I help?"
- Handle phone latency (100-200ms additional)

Walk me through:
1. Twilio SIP trunk configuration
2. LiveKit SIP routing rules
3. Twilio webhook handler for incoming calls
4. Phone-specific turn detection adjustments
5. Call cleanup on hangup

I need working configuration I can deploy, not just concepts.

What you're learning: Channel integration - connecting telephony to your existing voice agent while respecting channel-specific requirements.

Prompt 3: Add Screen Sharing with Gemini

I need to add screen sharing using Gemini Live API for my Task Manager
voice agent.

Use case from my spec:
- User shares screen while talking
- Says: "Add what I'm looking at to my tasks"
- Agent sees the screen, extracts context, creates task with relevant title
- Agent can suggest due dates from visible dates

My current setup:
- Voice agent core working (LiveKit + economy stack)
- Browser channel operational
- Gemini API key configured

Help me implement:
1. Screen share consent and capture in browser
2. Frame extraction for Gemini analysis
3. Visual context integration with voice commands
4. Graceful handling when no screen is shared
5. Privacy considerations (what NOT to capture)

This is the multimodal piece that differentiates my Digital FTE.

What you're learning: Multimodal integration - combining voice and vision creates capabilities neither modality has alone. The implementation follows your spec while AI handles the technical details.


Safety Note

When implementing voice agents with MCP tools, validate all tool outputs before speaking them. A text error from an API is annoying; a spoken error breaks conversational flow entirely. Implement fallback responses for every failure mode: "I couldn't find that task" rather than "Error 404 from Task Manager API."

For screen sharing, respect privacy boundaries. Never capture screens without explicit consent. Consider what visual data you're sending to Gemini and ensure it aligns with your data handling policies. The consent flow isn't just UX - it's legal compliance in many jurisdictions.