Skip to main content

Barge-In and Custom Turn Detection

Human conversation is messy. We interrupt. We pause mid-thought. We speak over each other. A voice agent that cannot handle these patterns feels robotic and frustrating.

This lesson teaches you to implement natural conversation dynamics: barge-in handling (when users interrupt), turn detection (when users finish speaking), and the decision framework for choosing direct API versus frameworks.


Understanding Barge-In

Barge-in is when a user starts speaking while the agent is still talking. Humans do this constantly:

  • Correcting a mistake before the agent finishes
  • Answering a question mid-sentence
  • Saying "stop" or "wait" to interrupt
  • Jumping in when they know what comes next

What Happens Without Barge-In Handling

Agent: "You have three tasks due today. First, review the proposal which is..."
User: "Stop, I already did that one!"
Agent: "...due at 5pm. Second, send invoices which..."
User: [frustrated, hangs up]

The agent ignores the user and plows through. Terrible experience.

What Happens With Barge-In Handling

Agent: "You have three tasks due today. First, review the proposal which is—"
User: "Stop, I already did that one!"
Agent: [stops immediately] "Got it, I'll mark the proposal review as complete.
You have two remaining tasks..."

The agent detects the interruption, stops speaking, processes the new input, and continues.


How Barge-In Works in Realtime API

The Realtime API handles barge-in through input audio buffer events:

Agent speaking (audio streaming to user)

User starts speaking

├── input_audio_buffer.speech_started event

├── Agent's audio output interrupted

├── User continues speaking

└── input_audio_buffer.speech_stopped event

Model processes user's interruption

New response generated

Event Sequence

When the user interrupts:

  1. input_audio_buffer.speech_started: User began speaking
  2. response.audio.interrupted: Agent's response was cut off
  3. User's audio is processed
  4. input_audio_buffer.speech_stopped: User finished
  5. response.created: New response starts

Handling Barge-In Events

class BargeInHandler:
"""Handles user interruptions gracefully."""

def __init__(self, audio_player):
self.audio_player = audio_player
self.agent_speaking = False
self.interrupted = False

async def handle_event(self, event: dict):
"""Process barge-in related events."""
event_type = event.get("type")

if event_type == "response.audio.delta":
# Agent is producing audio
self.agent_speaking = True

elif event_type == "response.audio.done":
# Agent finished speaking
self.agent_speaking = False
self.interrupted = False

elif event_type == "input_audio_buffer.speech_started":
# User started speaking
if self.agent_speaking:
print("[barge-in] User interrupted agent")
self.interrupted = True
# Stop playing agent's audio immediately
self.audio_player.stop()

elif event_type == "response.audio.interrupted":
# Server confirms interruption
print("[barge-in] Response interrupted by server")
self.agent_speaking = False

elif event_type == "input_audio_buffer.speech_stopped":
if self.interrupted:
print("[barge-in] User finished interruption, processing...")
else:
print("[turn] User finished speaking")

Audio Playback Coordination

The tricky part is coordinating audio playback:

class AudioPlayer:
"""Plays agent audio with interruption support."""

def __init__(self):
self.playing = False
self.buffer = []
self.play_task = None

async def play(self, audio_chunk: bytes):
"""Queue audio for playback."""
self.buffer.append(audio_chunk)

if not self.playing:
self.play_task = asyncio.create_task(self._play_loop())

async def _play_loop(self):
"""Continuous playback loop."""
self.playing = True

while self.buffer:
chunk = self.buffer.pop(0)
# Play chunk through audio output
await self._play_chunk(chunk)

self.playing = False

def stop(self):
"""Immediately stop playback (for barge-in)."""
self.buffer.clear()
if self.play_task:
self.play_task.cancel()
self.playing = False
# Also stop any currently-playing audio
self._stop_audio_immediately()

async def _play_chunk(self, chunk: bytes):
"""Platform-specific audio playback."""
# Implementation depends on audio library
pass

def _stop_audio_immediately(self):
"""Cut off audio output immediately."""
# Implementation depends on audio library
pass

Turn Detection: Server VAD

Turn detection determines when the user has finished speaking. The Realtime API offers two modes:

ModeHow It WorksWhen to Use
server_vadServer detects speech endMost use cases
noneYou control turn boundariesSpecial scenarios

Server VAD Configuration

session_config = {
"type": "session.update",
"session": {
"turn_detection": {
"type": "server_vad",

# VAD sensitivity (0.0-1.0)
# Higher = more sensitive to speech
"threshold": 0.5,

# Audio to include before detected speech start
# Prevents cutting off initial sounds
"prefix_padding_ms": 300,

# Silence duration before turn ends
# Lower = faster response, may cut off pauses
"silence_duration_ms": 500
}
}
}

Tuning VAD Parameters

ParameterLower ValueHigher Value
thresholdTriggers on quiet sounds, background noiseOnly triggers on clear speech
prefix_padding_msMay cut off word startsCaptures "um" and hesitation
silence_duration_msFast response, may interrupt thoughtWaits for complete sentences

Use Case Profiles

# Fast response profile (customer service, quick Q&A)
# Prioritizes speed over complete utterances
fast_response = {
"type": "server_vad",
"threshold": 0.6,
"prefix_padding_ms": 200,
"silence_duration_ms": 300 # Responds quickly
}

# Complete utterance profile (therapy, coaching, complex queries)
# Prioritizes hearing full thoughts
complete_utterance = {
"type": "server_vad",
"threshold": 0.4,
"prefix_padding_ms": 400,
"silence_duration_ms": 800 # Waits for pauses
}

# Noisy environment profile (call centers, public spaces)
# Reduces false triggers from background noise
noisy_environment = {
"type": "server_vad",
"threshold": 0.7, # Higher threshold
"prefix_padding_ms": 300,
"silence_duration_ms": 600
}

Dynamic VAD Adjustment

You can update VAD settings mid-session:

def adjust_vad_for_context(self, context: str):
"""Adjust VAD based on conversation context."""

if context == "user_thinking":
# User said "let me think..."
# Increase silence duration
config = {
"type": "session.update",
"session": {
"turn_detection": {
"type": "server_vad",
"silence_duration_ms": 2000 # Wait longer
}
}
}
elif context == "quick_confirmation":
# Agent asked yes/no question
# Decrease silence duration
config = {
"type": "session.update",
"session": {
"turn_detection": {
"type": "server_vad",
"silence_duration_ms": 300 # Respond quickly
}
}
}

self.data_channel.send(json.dumps(config))

Manual Turn Detection

Sometimes server VAD is not enough. Manual turn detection gives you full control.

When to Use Manual Turn Detection

ScenarioWhy Server VAD Fails
Push-to-talk interfaceUser presses button to indicate turn
Specific wake wordsOnly respond after "Hey Agent"
Semantic turn detectionUnderstand if user is done from meaning
Multi-party conversationMultiple speakers, need speaker ID
Dictation modePauses do not mean turn end

Disabling Server VAD

manual_turn_config = {
"type": "session.update",
"session": {
"turn_detection": {
"type": "none" # Disable server VAD
}
}
}

Manual Turn Control Events

With type: "none", you must explicitly commit audio:

# Audio is buffered but not processed
# Until you send:
{"type": "input_audio_buffer.commit"}

# Or clear the buffer:
{"type": "input_audio_buffer.clear"}

Push-to-Talk Implementation

class PushToTalkController:
"""Manual turn detection via button press."""

def __init__(self, data_channel):
self.data_channel = data_channel
self.button_held = False

def button_pressed(self):
"""User pressed the talk button."""
print("[ptt] Talk button pressed")
self.button_held = True
# Start recording, audio goes to buffer

def button_released(self):
"""User released the talk button."""
print("[ptt] Talk button released, committing audio")
self.button_held = False

# Commit the audio buffer for processing
self.data_channel.send(json.dumps({
"type": "input_audio_buffer.commit"
}))

def cancel_input(self):
"""User cancelled (e.g., ESC key)."""
print("[ptt] Input cancelled")

# Clear the buffer, don't process
self.data_channel.send(json.dumps({
"type": "input_audio_buffer.clear"
}))

Wake Word Implementation

class WakeWordController:
"""Only respond after wake word detected."""

def __init__(self, data_channel, wake_word: str = "hey assistant"):
self.data_channel = data_channel
self.wake_word = wake_word.lower()
self.listening_for_wake_word = True
self.command_buffer = []

async def handle_transcript(self, transcript: str):
"""Process partial transcripts to detect wake word."""
text = transcript.lower()

if self.listening_for_wake_word:
if self.wake_word in text:
print(f"[wake] Wake word detected!")
self.listening_for_wake_word = False

# Clear any pre-wake-word audio
self.data_channel.send(json.dumps({
"type": "input_audio_buffer.clear"
}))

# Now listening for command
return

# Not awake, keep discarding
return

# Awake and listening for command
# Command ends on silence (you implement this logic)
self.command_buffer.append(transcript)

def command_complete(self):
"""User finished command after wake word."""
print("[wake] Command complete, processing")

self.data_channel.send(json.dumps({
"type": "input_audio_buffer.commit"
}))

# Reset for next wake word
self.listening_for_wake_word = True
self.command_buffer = []

Semantic Turn Detection

For complex scenarios, analyze meaning to detect turns:

class SemanticTurnDetector:
"""Detect turn completion from semantic cues."""

def __init__(self, data_channel):
self.data_channel = data_channel
self.current_transcript = ""

async def analyze_transcript(self, transcript: str):
"""Analyze if user has completed their thought."""
self.current_transcript = transcript

# Simple heuristics
indicators = {
"complete": [
"that's all", "that's it", "done", "finished",
"go ahead", "please", "thanks", "thank you"
],
"incomplete": [
"um", "uh", "let me think", "wait",
"and also", "oh and", "actually"
]
}

lower = transcript.lower()

# Check for completion indicators
for indicator in indicators["complete"]:
if lower.endswith(indicator):
print(f"[semantic] Completion indicator: {indicator}")
self.commit_turn()
return

# Check for incompletion indicators
for indicator in indicators["incomplete"]:
if indicator in lower[-20:]: # Last 20 chars
print(f"[semantic] Holding for incomplete indicator: {indicator}")
return

# No clear indicator, rely on silence
# (You'd combine this with VAD or timer)

def commit_turn(self):
"""Commit audio when semantically complete."""
self.data_channel.send(json.dumps({
"type": "input_audio_buffer.commit"
}))
self.current_transcript = ""

Voice + Image Inputs

The Realtime API supports multimodal input—voice combined with images:

Sending Images During Conversation

import base64

def send_image(self, image_path: str, user_message: str = "What's in this image?"):
"""Send image for visual context."""

# Read and encode image
with open(image_path, "rb") as f:
image_data = base64.b64encode(f.read()).decode()

# Determine MIME type
mime_type = "image/jpeg" # or image/png, image/gif, image/webp

# Create conversation item with image
event = {
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "user",
"content": [
{
"type": "input_image",
"image": {
"type": "base64",
"media_type": mime_type,
"data": image_data
}
},
{
"type": "input_text",
"text": user_message
}
]
}
}

self.data_channel.send(json.dumps(event))

# Request response
self.data_channel.send(json.dumps({"type": "response.create"}))

Use Cases for Voice + Vision

ScenarioHow It Works
Describe what I seeUser points phone camera, asks question
Read this documentUser photographs text, agent reads aloud
What's wrong with thisUser shows error screen, agent diagnoses
Help me assemble thisUser shows parts, agent provides guidance

Screen Sharing Context

For desktop applications, capture and send screen context:

import mss
import io
from PIL import Image

def capture_and_send_screen(self, prompt: str = "What's on my screen?"):
"""Capture screen and send to model."""

with mss.mss() as sct:
# Capture primary monitor
screenshot = sct.grab(sct.monitors[1])

# Convert to PIL Image
img = Image.frombytes("RGB", screenshot.size, screenshot.bgra, "raw", "BGRX")

# Resize if too large (API has size limits)
max_size = 1024
if max(img.size) > max_size:
img.thumbnail((max_size, max_size))

# Convert to base64
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=80)
image_data = base64.b64encode(buffer.getvalue()).decode()

self.send_image_data(image_data, "image/jpeg", prompt)

When to Use Direct API vs Frameworks

You have now learned the Realtime API directly. But should you use it?

Decision Framework

┌─────────────────────────────────────────────────────────────┐
│ Do I need voice AI? │
└─────────────────────────────────────────────────────────────┘
│ Yes

┌─────────────────────────────────────────────────────────────┐
│ Is OpenAI my only/primary provider? │
└─────────────────────────────────────────────────────────────┘
│ │
Yes No
│ │
▼ ▼
┌──────────────────────────┐ ┌──────────────────────────────┐
│ Do I need custom WebRTC │ │ Use Framework │
│ or ultra-low latency? │ │ (LiveKit, Pipecat) │
└──────────────────────────┘ │ → Multi-provider flexibility │
│ │ └──────────────────────────────┘
Yes No
│ │
▼ ▼
┌───────────────┐ ┌───────────────────────────────────────┐
│ Direct API │ │ Use Framework + OpenAI Provider │
│ → Maximum │ │ → Easier scaling, monitoring │
│ control │ │ → Team velocity │
│ → Most work │ │ → Still OpenAI under hood │
└───────────────┘ └───────────────────────────────────────┘

Comparison Matrix

FactorDirect APIFramework (LiveKit/Pipecat)
Setup timeDaysHours
Provider flexibilityOpenAI onlyMultiple providers
Latency controlMaximumGood (framework overhead ~10-20ms)
WebRTC controlFullAbstracted
ScalingBuild yourselfBuilt-in patterns
MonitoringBuild yourselfOften included
DebuggingFull visibilityMay need to dig
Team knowledgeWebRTC, audio expertise neededFramework APIs only
MaintenanceYou maintainCommunity maintained
Phone integrationBuild with Twilio/TelnyxNative SIP in LiveKit

Scenarios Favoring Direct API

  1. Research and experimentation: You need to understand exactly what is happening
  2. Custom WebRTC infrastructure: You have existing WebRTC systems to integrate
  3. Extreme latency sensitivity: Every 10ms matters
  4. Specialized turn detection: Server VAD does not work for your use case
  5. Hybrid processing: You need direct access to audio streams

Scenarios Favoring Frameworks

  1. Production deployment: You need scaling, monitoring, reliability
  2. Provider flexibility: You might switch providers or use multiple
  3. Team velocity: You want to ship quickly with less specialized knowledge
  4. Phone integration: Native SIP support in LiveKit
  5. Standard voice agent: Your use case fits framework patterns

The 80/20 Rule

80% of voice agent projects should use frameworks. The abstractions save enormous development time. Only drop to direct API when you hit a specific limitation that frameworks cannot address.


Complete Barge-In Example

Here is a complete implementation with barge-in handling:

import asyncio
import json

class RealtimeVoiceAgent:
"""Voice agent with full barge-in and turn detection."""

def __init__(self, data_channel, audio_player):
self.data_channel = data_channel
self.audio_player = audio_player
self.agent_speaking = False
self.user_interrupted = False
self.current_response_id = None

def configure_session(self, vad_profile: str = "balanced"):
"""Configure session with VAD settings."""

profiles = {
"fast": {"threshold": 0.6, "prefix_padding_ms": 200, "silence_duration_ms": 300},
"balanced": {"threshold": 0.5, "prefix_padding_ms": 300, "silence_duration_ms": 500},
"patient": {"threshold": 0.4, "prefix_padding_ms": 400, "silence_duration_ms": 800}
}

vad = profiles.get(vad_profile, profiles["balanced"])

config = {
"type": "session.update",
"session": {
"instructions": """
You are a voice assistant. When the user interrupts:
- Acknowledge their interruption briefly
- Address their new input
- Don't repeat what you were saying unless asked
""",
"turn_detection": {
"type": "server_vad",
**vad
},
"input_audio_format": "pcm16",
"output_audio_format": "pcm16"
}
}

self.data_channel.send(json.dumps(config))

async def handle_event(self, event: dict):
"""Process all events with barge-in awareness."""
event_type = event.get("type")

# Response lifecycle
if event_type == "response.created":
self.current_response_id = event["response"]["id"]
self.agent_speaking = True
self.user_interrupted = False
print(f"[agent] Starting response: {self.current_response_id}")

elif event_type == "response.audio.delta":
# Stream audio to player
audio_base64 = event.get("delta", "")
if audio_base64 and not self.user_interrupted:
audio_bytes = base64.b64decode(audio_base64)
await self.audio_player.play(audio_bytes)

elif event_type == "response.audio.done":
print("[agent] Audio complete")

elif event_type == "response.done":
self.agent_speaking = False
print("[agent] Response complete")

# Barge-in detection
elif event_type == "input_audio_buffer.speech_started":
if self.agent_speaking:
print("[barge-in] User interrupted!")
self.user_interrupted = True
self.audio_player.stop()

# Cancel current response
self.data_channel.send(json.dumps({
"type": "response.cancel"
}))

elif event_type == "response.cancelled":
print("[barge-in] Response cancelled, processing interruption")

elif event_type == "input_audio_buffer.speech_stopped":
if self.user_interrupted:
print("[barge-in] User finished, generating new response")
else:
print("[turn] User finished speaking")

# Transcripts
elif event_type == "response.audio_transcript.delta":
text = event.get("delta", "")
print(f"{text}", end="", flush=True)

elif event_type == "response.audio_transcript.done":
print() # Newline

elif event_type == "conversation.item.input_audio_transcription.completed":
user_text = event.get("transcript", "")
print(f"[user] {user_text}")


async def main():
"""Run voice agent with barge-in."""

# ... WebRTC setup code from Lesson 1 ...

agent = RealtimeVoiceAgent(data_channel, audio_player)
agent.configure_session(vad_profile="balanced")

print("\n[ready] Speak now. Interrupt anytime. Ctrl+C to exit.\n")

# Event loop
while True:
event = await receive_event(data_channel)
await agent.handle_event(event)

Sample Interaction:

[ready] Speak now. Interrupt anytime. Ctrl+C to exit.

[user] What are my tasks for today?
[agent] Starting response: resp_abc123
You have three tasks due today. First, review the proposal which is due at—
[barge-in] User interrupted!
[barge-in] Response cancelled, processing interruption
[barge-in] User finished, generating new response
[user] Skip the proposal, what else?
[agent] Starting response: resp_def456
Got it, skipping the proposal. You have two other tasks: send invoices by noon and team standup at 3pm.
[agent] Response complete

[user] Thanks, that's all.
[turn] User finished speaking
[agent] Starting response: resp_ghi789
You're welcome. Have a productive day!
[agent] Response complete

Try With AI

Prompt 1: Design Turn Detection Strategy

I'm building a voice agent for medical intake. Patients describe symptoms,
which may involve:
- Long pauses while they think
- Emotional moments requiring patience
- Medical terminology that might sound like silence
- Background medical equipment noise

Help me design a turn detection strategy:
1. Should I use server VAD, manual detection, or hybrid?
2. What VAD parameters would you recommend?
3. How do I handle the "let me think about that" scenario?
4. Should I use semantic detection to know when they're done?

Consider: False turn ends are worse than slight delays (interrupting patients is bad).

What you are learning: Context-specific turn detection. Different domains have different conversation patterns—understanding this shapes your technical choices.

Prompt 2: Implement Barge-In Gracefully

My voice agent sometimes handles barge-in awkwardly:

Scenario 1: User interrupts with "wait" but agent says "Got it, you want to wait?"
Scenario 2: User corrects a word, agent starts over from scratch
Scenario 3: User says "no no no" during list, agent asks "what do you mean by no?"

Help me improve barge-in handling:
1. How do I classify interrupt intent (stop, correct, redirect)?
2. Should I acknowledge interrupts differently based on type?
3. How do I maintain context through interruptions?
4. What instructions help the model handle these cases?

What you are learning: Nuanced interruption handling. Not all interruptions are the same—intelligent classification improves UX.

Prompt 3: Make the Framework Decision

I'm building a voice agent for real estate property tours. Requirements:
- Browser-based (clients view on phones)
- Shows property images while describing features
- Needs to handle 50 concurrent sessions
- Want to switch from OpenAI to Gemini if costs are better
- Launch in 3 months with 2 developers

Analyze: Should I use OpenAI Realtime API directly or a framework?

Consider:
- LiveKit Agents (native SIP, scaling)
- Pipecat (multi-provider, flexibility)
- Direct OpenAI (maximum control)

Provide a reasoned recommendation with trade-offs.

What you are learning: Architectural decision-making. Real projects have constraints—time, team, budget, requirements. Understanding trade-offs is a senior skill.


Safety Note

Barge-in and turn detection are core UX concerns:

  • Too aggressive barge-in: Agent cuts off users mid-thought, feels rude
  • Too slow barge-in: Agent talks over users, feels robotic
  • Too fast turn detection: Agent jumps in during natural pauses
  • Too slow turn detection: Users wait awkwardly for response

Test with real users in realistic conditions. Record sessions (with consent) and review for timing issues. Tune parameters based on observed conversation patterns, not assumptions.


Chapter Summary

You now understand the OpenAI Realtime API at the protocol level:

Lesson 1: WebRTC connection, audio formats, session configuration Lesson 2: Voice function calling, filler speech, confirmation patterns Lesson 3: Barge-in handling, turn detection, API vs framework decisions

Key takeaways:

  1. Native speech-to-speech eliminates transcription latency but locks you to one provider
  2. Function calling in voice requires filler speech and confirmation patterns
  3. Barge-in is essential for natural conversation—users will interrupt
  4. Turn detection should match your domain—fast for Q&A, patient for complex queries
  5. Frameworks handle 80% of use cases—drop to direct API only when needed

Next, Chapter 83 explores Gemini Live API—Google's approach to multimodal voice with affective dialog and proactive responses. You will have direct experience with both major native speech-to-speech providers.