Multi-Provider Integration & Custom Processors
Pipecat's power comes from its plugin ecosystem—40+ AI service integrations that you can mix and match. This lesson teaches you to select providers for your needs, integrate native speech-to-speech models, and build custom processors for domain-specific logic.
The Plugin Ecosystem
Pipecat doesn't lock you into specific providers. Instead, it offers a plugin system where each provider is a separate package:
# Install base Pipecat
pip install pipecat-ai
# Install specific providers
pip install "pipecat-ai[deepgram]" # STT
pip install "pipecat-ai[openai]" # LLM + TTS
pip install "pipecat-ai[cartesia]" # TTS
pip install "pipecat-ai[daily]" # Transport
# Or install multiple at once
pip install "pipecat-ai[deepgram,openai,cartesia,daily]"
Provider Categories
| Category | Providers | Purpose |
|---|---|---|
| STT | Deepgram, Whisper, AssemblyAI, Gladia, Azure | Audio → Text |
| LLM | OpenAI, Anthropic, Google, Together, Groq, local | Text → Text |
| TTS | Cartesia, ElevenLabs, Azure, Deepgram Aura, PlayHT | Text → Audio |
| Transport | Daily, WebSocket, Local | Audio I/O |
| Vision | OpenAI GPT-4V, Anthropic Claude, Google | Image → Text |
| S2S | OpenAI Realtime, Gemini Live, AWS Nova Sonic | Audio → Audio |
Provider Comparison
Choose providers based on your requirements:
STT Providers:
| Provider | Latency | Cost | Best For |
|---|---|---|---|
| Deepgram Nova-3 | ~90ms | $0.0077/min | Low latency, streaming |
| OpenAI Whisper | ~300ms | $0.006/min | Accuracy, multilingual |
| AssemblyAI | ~150ms | $0.0085/min | Accuracy, diarization |
| Gladia | ~100ms | $0.0065/min | EU data residency |
LLM Providers:
| Provider | Latency | Cost | Best For |
|---|---|---|---|
| GPT-4o-mini | 200-400ms | $0.15/1M tokens | Cost efficiency |
| GPT-4o | 300-500ms | $2.50/1M tokens | Quality, multimodal |
| Claude 3.5 Sonnet | 200-400ms | $3.00/1M tokens | Nuanced responses |
| Groq Llama 3.3 | 50-100ms | $0.27/1M tokens | Speed, open source |
TTS Providers:
| Provider | Latency | Cost | Best For |
|---|---|---|---|
| Cartesia Sonic | 40-90ms | ~$0.024/min | Speed, naturalness |
| ElevenLabs | 100-200ms | ~$0.18/min | Voice cloning |
| Deepgram Aura | 50-100ms | $0.015/min | Cost efficiency |
| Azure TTS | 100-150ms | $0.016/min | Enterprise, languages |
Swapping Providers
The modular advantage: change one processor, keep the pipeline.
Before: Deepgram STT
from pipecat.services.deepgram import DeepgramSTTService
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
model="nova-2"
)
After: Whisper STT
from pipecat.services.openai import OpenAISTTService
stt = OpenAISTTService(
api_key=os.getenv("OPENAI_API_KEY"),
model="whisper-1"
)
Everything else stays the same. The pipeline doesn't care which STT you use—it only cares that the processor outputs TextFrame.
Swapping TTS
# Option A: Cartesia (speed-optimized)
from pipecat.services.cartesia import CartesiaTTSService
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091"
)
# Option B: ElevenLabs (quality-optimized)
from pipecat.services.elevenlabs import ElevenLabsTTSService
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="21m00Tcm4TlvDq8ikWAM" # Rachel
)
# Option C: Deepgram Aura (cost-optimized)
from pipecat.services.deepgram import DeepgramTTSService
tts = DeepgramTTSService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
voice="aura-asteria-en"
)
Dynamic Provider Selection
For production systems, you might choose providers dynamically:
def get_tts_service(priority: str):
"""Select TTS based on priority: speed, quality, or cost."""
if priority == "speed":
return CartesiaTTSService(...)
elif priority == "quality":
return ElevenLabsTTSService(...)
elif priority == "cost":
return DeepgramTTSService(...)
Speech-to-Speech Integration
Speech-to-speech (S2S) models process audio directly without the STT → LLM → TTS cascade. Pipecat supports major S2S providers:
Why S2S?
The cascaded approach (STT → LLM → TTS) has inherent latency:
Cascaded: 90ms (STT) + 300ms (LLM) + 80ms (TTS) = ~470ms minimum
S2S: ~200-300ms end-to-end
S2S models also preserve prosody (tone, emphasis, emotion) that's lost in transcription.
When to Use S2S vs Cascaded
| Use S2S When | Use Cascaded When |
|---|---|
| Latency critical | Need specific STT accuracy |
| Preserving emotion/tone | Need custom processing between steps |
| Natural conversation | Need to log/analyze transcripts |
| Budget allows (~$0.10+/min) | Cost-sensitive (~$0.03/min) |
OpenAI Realtime via Pipecat
Pipecat integrates OpenAI's Realtime API through the RTVI processor:
from pipecat.services.openai import OpenAIRealtimeService
from pipecat.transports.services.daily import DailyTransport
async def main():
transport = DailyTransport(
room_url=os.getenv("DAILY_ROOM_URL"),
token=os.getenv("DAILY_TOKEN"),
bot_name="RealtimeAgent"
)
# OpenAI Realtime replaces STT + LLM + TTS
realtime = OpenAIRealtimeService(
api_key=os.getenv("OPENAI_API_KEY"),
voice="alloy",
system_prompt="You are a helpful assistant.",
tools=[
# Function calling still works
{
"name": "create_task",
"description": "Create a new task",
"parameters": {...}
}
]
)
# Simpler pipeline: just transport + S2S
pipeline = Pipeline([
transport.input(),
realtime,
transport.output()
])
await PipelineRunner().run(PipelineTask(pipeline))
What's different:
- One service replaces three (STT + LLM + TTS)
- Lower latency (200-300ms vs 400-500ms)
- Higher cost (~$0.10/min vs ~$0.03/min)
- Function calling works through the S2S model
Gemini Live via Pipecat
Google's Gemini 2.5 Flash Native Audio:
from pipecat.services.google import GeminiMultimodalLiveService
gemini = GeminiMultimodalLiveService(
api_key=os.getenv("GOOGLE_API_KEY"),
voice="Puck", # 30 voices available
system_instruction="You are a helpful assistant.",
generation_config={
"temperature": 0.7,
"max_output_tokens": 1024
}
)
pipeline = Pipeline([
transport.input(),
gemini,
transport.output()
])
Gemini advantages:
- Multimodal (voice + vision + text in same stream)
- Affective dialog (adapts tone to user emotion)
- 30 HD voices in 24 languages
- Proactive audio (model decides when to respond)
AWS Nova Sonic via Pipecat
Amazon's speech-to-speech model:
from pipecat.services.aws import NovaSonicService
nova = NovaSonicService(
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
region="us-east-1",
voice_id="matthew"
)
pipeline = Pipeline([
transport.input(),
nova,
transport.output()
])
S2S Provider Comparison
| Provider | Latency | Cost | Unique Feature |
|---|---|---|---|
| OpenAI Realtime | 200-300ms | ~$0.10/min | Native function calling |
| Gemini Live | <300ms | Competitive | Multimodal, affective |
| Nova Sonic | ~250ms | ~$0.08/min | AWS ecosystem |
Custom Processors
When built-in processors aren't enough, build your own.
When to Customize
Build custom processors when you need:
- Domain-specific logic: Sentiment analysis, entity extraction
- Content filtering: Block inappropriate content
- Translation: Convert languages mid-pipeline
- Augmentation: Add context to frames
Processor Base Class
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import Frame, TextFrame, FrameDirection
class MyProcessor(FrameProcessor):
def __init__(self):
super().__init__()
async def process_frame(self, frame: Frame, direction: FrameDirection):
# Check if we handle this frame type
if isinstance(frame, TextFrame):
# Transform the frame
result = await self._transform(frame)
# Push downstream
await self.push_frame(result, direction)
else:
# Pass through frames we don't handle
await self.push_frame(frame, direction)
async def _transform(self, frame: TextFrame) -> TextFrame:
# Your transformation logic
return TextFrame(text=frame.text.upper())
Example: Sentiment Analysis Processor
Detect sentiment and add context:
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import Frame, TextFrame
class SentimentProcessor(FrameProcessor):
"""Analyzes sentiment and adds context for the LLM."""
def __init__(self):
super().__init__()
# You could use a local model or API here
self.negative_words = ["frustrated", "angry", "upset", "terrible"]
async def process_frame(self, frame: Frame, direction):
if isinstance(frame, TextFrame):
sentiment = self._analyze(frame.text)
if sentiment == "negative":
# Add context for LLM to respond appropriately
augmented = TextFrame(
text=f"[User seems {sentiment}] {frame.text}"
)
await self.push_frame(augmented, direction)
else:
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)
def _analyze(self, text: str) -> str:
text_lower = text.lower()
if any(word in text_lower for word in self.negative_words):
return "negative"
return "neutral"
Usage in pipeline:
pipeline = Pipeline([
transport.input(),
stt,
SentimentProcessor(), # Add after STT, before LLM
llm,
tts,
transport.output()
])
Example: Content Filter Processor
Block sensitive content before TTS:
class ContentFilterProcessor(FrameProcessor):
"""Filters inappropriate content from LLM responses."""
def __init__(self, blocked_patterns: list[str]):
super().__init__()
self.blocked_patterns = blocked_patterns
async def process_frame(self, frame: Frame, direction):
if isinstance(frame, TextFrame):
if self._is_blocked(frame.text):
# Replace with safe message
safe_frame = TextFrame(
text="I'm not able to respond to that request."
)
await self.push_frame(safe_frame, direction)
else:
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)
def _is_blocked(self, text: str) -> bool:
return any(pattern in text.lower() for pattern in self.blocked_patterns)
Example: Translation Processor
Add real-time translation:
from pipecat.services.openai import OpenAILLMService
class TranslationProcessor(FrameProcessor):
"""Translates text between languages."""
def __init__(self, source_lang: str, target_lang: str):
super().__init__()
self.source_lang = source_lang
self.target_lang = target_lang
self.translator = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-mini",
system_prompt=f"Translate from {source_lang} to {target_lang}. "
f"Only output the translation, nothing else."
)
async def process_frame(self, frame: Frame, direction):
if isinstance(frame, TextFrame):
translated = await self._translate(frame.text)
await self.push_frame(TextFrame(text=translated), direction)
else:
await self.push_frame(frame, direction)
async def _translate(self, text: str) -> str:
# Use LLM for translation
response = await self.translator.generate(text)
return response
Finalize Your Skill
Your pipecat skill should now include:
- Provider selection guidance: When to use which STT/LLM/TTS
- S2S integration patterns: OpenAI Realtime, Gemini Live, Nova Sonic
- Custom processor templates: Base class, common patterns
- Comparison framework: Cascaded vs S2S decision criteria
Update your skill:
Update my pipecat skill with everything I learned:
1. Provider selection:
- STT: Deepgram for speed, Whisper for accuracy
- LLM: GPT-4o-mini for cost, Claude for nuance
- TTS: Cartesia for speed, ElevenLabs for quality
2. S2S integration:
- OpenAI Realtime for function calling
- Gemini Live for multimodal
- When to choose S2S vs cascaded
3. Custom processors:
- Base class pattern
- Common use cases (sentiment, filtering, translation)
- Frame handling best practices
Test the updated skill by generating a multi-provider voice agent.
Try With AI
Prompt 1: Choose the Right Providers
I need to build a voice agent with these constraints:
- Latency: Under 500ms total response time
- Cost: Under $0.05 per minute
- Quality: Natural voice, good transcription of technical terms
- Region: Data must stay in EU
Use my pipecat skill to recommend:
1. Which STT provider meets these requirements?
2. Which LLM provider?
3. Which TTS provider?
4. Should I consider S2S instead of cascaded?
Explain the tradeoffs for each choice.
What you're learning: Provider selection—balancing competing requirements for real deployments.
Prompt 2: Integrate Speech-to-Speech
I want to try OpenAI's Realtime API through Pipecat. Help me:
1. Configure OpenAIRealtimeService for my Task Manager
2. Set up function calling for create_task and list_tasks
3. Handle the case where Realtime API has an outage (fallback to cascaded)
4. Compare latency between S2S and my current cascaded pipeline
Use my pipecat skill. I'll test both approaches and report numbers.
What you're learning: S2S integration—using native voice models while maintaining fallback options.
Prompt 3: Build a Custom Processor
I need a custom processor for my Task Manager voice agent that:
1. Receives TextFrame from STT (user's speech)
2. Detects if user mentions a priority level ("urgent", "high", "low")
3. If priority detected: Creates structured data and adds to frame
4. If no priority: Passes through unchanged
The LLM should receive: "[Priority: high] Review the proposal"
instead of just "Review the proposal urgently"
Help me implement this using my pipecat skill. Include:
- Processor class structure
- Priority detection logic
- Frame handling
- Testing approach
What you're learning: Custom processor implementation—extending Pipecat for domain-specific needs.
What You Built
By completing this chapter, you now have:
| Asset | Description |
|---|---|
pipecat skill | Production-ready skill grounded in official docs |
| Frame mental model | Understand data flow through pipelines |
| Provider knowledge | Can select optimal STT/LLM/TTS for requirements |
| S2S integration | Can use OpenAI Realtime, Gemini Live through Pipecat |
| Custom processors | Can extend pipelines for domain needs |
Comparing Your Skills
| Skill | Architecture | Best For |
|---|---|---|
livekit-agents | Distributed (Workers, Sessions) | Enterprise scale, semantic turn detection |
pipecat | Compositional (Frames, Processors) | Provider flexibility, custom processing |
You can now choose the right framework for each project—or use both when different parts of your system have different requirements.
Next Steps
| Chapter | What You'll Learn |
|---|---|
| 64: OpenAI Realtime API | Direct access without framework abstraction |
| 65: Gemini Live API | Multimodal voice + vision |
| 66: Phone & Browser | Real communication channels |
| 67: Capstone | Production voice-enabled Task Manager |