Skip to main content

Multi-Provider Integration & Custom Processors

Pipecat's power comes from its plugin ecosystem—40+ AI service integrations that you can mix and match. This lesson teaches you to select providers for your needs, integrate native speech-to-speech models, and build custom processors for domain-specific logic.


The Plugin Ecosystem

Pipecat doesn't lock you into specific providers. Instead, it offers a plugin system where each provider is a separate package:

# Install base Pipecat
pip install pipecat-ai

# Install specific providers
pip install "pipecat-ai[deepgram]" # STT
pip install "pipecat-ai[openai]" # LLM + TTS
pip install "pipecat-ai[cartesia]" # TTS
pip install "pipecat-ai[daily]" # Transport

# Or install multiple at once
pip install "pipecat-ai[deepgram,openai,cartesia,daily]"

Provider Categories

CategoryProvidersPurpose
STTDeepgram, Whisper, AssemblyAI, Gladia, AzureAudio → Text
LLMOpenAI, Anthropic, Google, Together, Groq, localText → Text
TTSCartesia, ElevenLabs, Azure, Deepgram Aura, PlayHTText → Audio
TransportDaily, WebSocket, LocalAudio I/O
VisionOpenAI GPT-4V, Anthropic Claude, GoogleImage → Text
S2SOpenAI Realtime, Gemini Live, AWS Nova SonicAudio → Audio

Provider Comparison

Choose providers based on your requirements:

STT Providers:

ProviderLatencyCostBest For
Deepgram Nova-3~90ms$0.0077/minLow latency, streaming
OpenAI Whisper~300ms$0.006/minAccuracy, multilingual
AssemblyAI~150ms$0.0085/minAccuracy, diarization
Gladia~100ms$0.0065/minEU data residency

LLM Providers:

ProviderLatencyCostBest For
GPT-4o-mini200-400ms$0.15/1M tokensCost efficiency
GPT-4o300-500ms$2.50/1M tokensQuality, multimodal
Claude 3.5 Sonnet200-400ms$3.00/1M tokensNuanced responses
Groq Llama 3.350-100ms$0.27/1M tokensSpeed, open source

TTS Providers:

ProviderLatencyCostBest For
Cartesia Sonic40-90ms~$0.024/minSpeed, naturalness
ElevenLabs100-200ms~$0.18/minVoice cloning
Deepgram Aura50-100ms$0.015/minCost efficiency
Azure TTS100-150ms$0.016/minEnterprise, languages

Swapping Providers

The modular advantage: change one processor, keep the pipeline.

Before: Deepgram STT

from pipecat.services.deepgram import DeepgramSTTService

stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
model="nova-2"
)

After: Whisper STT

from pipecat.services.openai import OpenAISTTService

stt = OpenAISTTService(
api_key=os.getenv("OPENAI_API_KEY"),
model="whisper-1"
)

Everything else stays the same. The pipeline doesn't care which STT you use—it only cares that the processor outputs TextFrame.

Swapping TTS

# Option A: Cartesia (speed-optimized)
from pipecat.services.cartesia import CartesiaTTSService
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091"
)

# Option B: ElevenLabs (quality-optimized)
from pipecat.services.elevenlabs import ElevenLabsTTSService
tts = ElevenLabsTTSService(
api_key=os.getenv("ELEVENLABS_API_KEY"),
voice_id="21m00Tcm4TlvDq8ikWAM" # Rachel
)

# Option C: Deepgram Aura (cost-optimized)
from pipecat.services.deepgram import DeepgramTTSService
tts = DeepgramTTSService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
voice="aura-asteria-en"
)

Dynamic Provider Selection

For production systems, you might choose providers dynamically:

def get_tts_service(priority: str):
"""Select TTS based on priority: speed, quality, or cost."""
if priority == "speed":
return CartesiaTTSService(...)
elif priority == "quality":
return ElevenLabsTTSService(...)
elif priority == "cost":
return DeepgramTTSService(...)

Speech-to-Speech Integration

Speech-to-speech (S2S) models process audio directly without the STT → LLM → TTS cascade. Pipecat supports major S2S providers:

Why S2S?

The cascaded approach (STT → LLM → TTS) has inherent latency:

Cascaded: 90ms (STT) + 300ms (LLM) + 80ms (TTS) = ~470ms minimum
S2S: ~200-300ms end-to-end

S2S models also preserve prosody (tone, emphasis, emotion) that's lost in transcription.

When to Use S2S vs Cascaded

Use S2S WhenUse Cascaded When
Latency criticalNeed specific STT accuracy
Preserving emotion/toneNeed custom processing between steps
Natural conversationNeed to log/analyze transcripts
Budget allows (~$0.10+/min)Cost-sensitive (~$0.03/min)

OpenAI Realtime via Pipecat

Pipecat integrates OpenAI's Realtime API through the RTVI processor:

from pipecat.services.openai import OpenAIRealtimeService
from pipecat.transports.services.daily import DailyTransport

async def main():
transport = DailyTransport(
room_url=os.getenv("DAILY_ROOM_URL"),
token=os.getenv("DAILY_TOKEN"),
bot_name="RealtimeAgent"
)

# OpenAI Realtime replaces STT + LLM + TTS
realtime = OpenAIRealtimeService(
api_key=os.getenv("OPENAI_API_KEY"),
voice="alloy",
system_prompt="You are a helpful assistant.",
tools=[
# Function calling still works
{
"name": "create_task",
"description": "Create a new task",
"parameters": {...}
}
]
)

# Simpler pipeline: just transport + S2S
pipeline = Pipeline([
transport.input(),
realtime,
transport.output()
])

await PipelineRunner().run(PipelineTask(pipeline))

What's different:

  • One service replaces three (STT + LLM + TTS)
  • Lower latency (200-300ms vs 400-500ms)
  • Higher cost (~$0.10/min vs ~$0.03/min)
  • Function calling works through the S2S model

Gemini Live via Pipecat

Google's Gemini 2.5 Flash Native Audio:

from pipecat.services.google import GeminiMultimodalLiveService

gemini = GeminiMultimodalLiveService(
api_key=os.getenv("GOOGLE_API_KEY"),
voice="Puck", # 30 voices available
system_instruction="You are a helpful assistant.",
generation_config={
"temperature": 0.7,
"max_output_tokens": 1024
}
)

pipeline = Pipeline([
transport.input(),
gemini,
transport.output()
])

Gemini advantages:

  • Multimodal (voice + vision + text in same stream)
  • Affective dialog (adapts tone to user emotion)
  • 30 HD voices in 24 languages
  • Proactive audio (model decides when to respond)

AWS Nova Sonic via Pipecat

Amazon's speech-to-speech model:

from pipecat.services.aws import NovaSonicService

nova = NovaSonicService(
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
region="us-east-1",
voice_id="matthew"
)

pipeline = Pipeline([
transport.input(),
nova,
transport.output()
])

S2S Provider Comparison

ProviderLatencyCostUnique Feature
OpenAI Realtime200-300ms~$0.10/minNative function calling
Gemini Live<300msCompetitiveMultimodal, affective
Nova Sonic~250ms~$0.08/minAWS ecosystem

Custom Processors

When built-in processors aren't enough, build your own.

When to Customize

Build custom processors when you need:

  • Domain-specific logic: Sentiment analysis, entity extraction
  • Content filtering: Block inappropriate content
  • Translation: Convert languages mid-pipeline
  • Augmentation: Add context to frames

Processor Base Class

from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import Frame, TextFrame, FrameDirection

class MyProcessor(FrameProcessor):
def __init__(self):
super().__init__()

async def process_frame(self, frame: Frame, direction: FrameDirection):
# Check if we handle this frame type
if isinstance(frame, TextFrame):
# Transform the frame
result = await self._transform(frame)
# Push downstream
await self.push_frame(result, direction)
else:
# Pass through frames we don't handle
await self.push_frame(frame, direction)

async def _transform(self, frame: TextFrame) -> TextFrame:
# Your transformation logic
return TextFrame(text=frame.text.upper())

Example: Sentiment Analysis Processor

Detect sentiment and add context:

from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import Frame, TextFrame

class SentimentProcessor(FrameProcessor):
"""Analyzes sentiment and adds context for the LLM."""

def __init__(self):
super().__init__()
# You could use a local model or API here
self.negative_words = ["frustrated", "angry", "upset", "terrible"]

async def process_frame(self, frame: Frame, direction):
if isinstance(frame, TextFrame):
sentiment = self._analyze(frame.text)
if sentiment == "negative":
# Add context for LLM to respond appropriately
augmented = TextFrame(
text=f"[User seems {sentiment}] {frame.text}"
)
await self.push_frame(augmented, direction)
else:
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)

def _analyze(self, text: str) -> str:
text_lower = text.lower()
if any(word in text_lower for word in self.negative_words):
return "negative"
return "neutral"

Usage in pipeline:

pipeline = Pipeline([
transport.input(),
stt,
SentimentProcessor(), # Add after STT, before LLM
llm,
tts,
transport.output()
])

Example: Content Filter Processor

Block sensitive content before TTS:

class ContentFilterProcessor(FrameProcessor):
"""Filters inappropriate content from LLM responses."""

def __init__(self, blocked_patterns: list[str]):
super().__init__()
self.blocked_patterns = blocked_patterns

async def process_frame(self, frame: Frame, direction):
if isinstance(frame, TextFrame):
if self._is_blocked(frame.text):
# Replace with safe message
safe_frame = TextFrame(
text="I'm not able to respond to that request."
)
await self.push_frame(safe_frame, direction)
else:
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)

def _is_blocked(self, text: str) -> bool:
return any(pattern in text.lower() for pattern in self.blocked_patterns)

Example: Translation Processor

Add real-time translation:

from pipecat.services.openai import OpenAILLMService

class TranslationProcessor(FrameProcessor):
"""Translates text between languages."""

def __init__(self, source_lang: str, target_lang: str):
super().__init__()
self.source_lang = source_lang
self.target_lang = target_lang
self.translator = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-mini",
system_prompt=f"Translate from {source_lang} to {target_lang}. "
f"Only output the translation, nothing else."
)

async def process_frame(self, frame: Frame, direction):
if isinstance(frame, TextFrame):
translated = await self._translate(frame.text)
await self.push_frame(TextFrame(text=translated), direction)
else:
await self.push_frame(frame, direction)

async def _translate(self, text: str) -> str:
# Use LLM for translation
response = await self.translator.generate(text)
return response

Finalize Your Skill

Your pipecat skill should now include:

  1. Provider selection guidance: When to use which STT/LLM/TTS
  2. S2S integration patterns: OpenAI Realtime, Gemini Live, Nova Sonic
  3. Custom processor templates: Base class, common patterns
  4. Comparison framework: Cascaded vs S2S decision criteria

Update your skill:

Update my pipecat skill with everything I learned:

1. Provider selection:
- STT: Deepgram for speed, Whisper for accuracy
- LLM: GPT-4o-mini for cost, Claude for nuance
- TTS: Cartesia for speed, ElevenLabs for quality

2. S2S integration:
- OpenAI Realtime for function calling
- Gemini Live for multimodal
- When to choose S2S vs cascaded

3. Custom processors:
- Base class pattern
- Common use cases (sentiment, filtering, translation)
- Frame handling best practices

Test the updated skill by generating a multi-provider voice agent.

Try With AI

Prompt 1: Choose the Right Providers

I need to build a voice agent with these constraints:

- Latency: Under 500ms total response time
- Cost: Under $0.05 per minute
- Quality: Natural voice, good transcription of technical terms
- Region: Data must stay in EU

Use my pipecat skill to recommend:
1. Which STT provider meets these requirements?
2. Which LLM provider?
3. Which TTS provider?
4. Should I consider S2S instead of cascaded?

Explain the tradeoffs for each choice.

What you're learning: Provider selection—balancing competing requirements for real deployments.

Prompt 2: Integrate Speech-to-Speech

I want to try OpenAI's Realtime API through Pipecat. Help me:

1. Configure OpenAIRealtimeService for my Task Manager
2. Set up function calling for create_task and list_tasks
3. Handle the case where Realtime API has an outage (fallback to cascaded)
4. Compare latency between S2S and my current cascaded pipeline

Use my pipecat skill. I'll test both approaches and report numbers.

What you're learning: S2S integration—using native voice models while maintaining fallback options.

Prompt 3: Build a Custom Processor

I need a custom processor for my Task Manager voice agent that:

1. Receives TextFrame from STT (user's speech)
2. Detects if user mentions a priority level ("urgent", "high", "low")
3. If priority detected: Creates structured data and adds to frame
4. If no priority: Passes through unchanged

The LLM should receive: "[Priority: high] Review the proposal"
instead of just "Review the proposal urgently"

Help me implement this using my pipecat skill. Include:
- Processor class structure
- Priority detection logic
- Frame handling
- Testing approach

What you're learning: Custom processor implementation—extending Pipecat for domain-specific needs.


What You Built

By completing this chapter, you now have:

AssetDescription
pipecat skillProduction-ready skill grounded in official docs
Frame mental modelUnderstand data flow through pipelines
Provider knowledgeCan select optimal STT/LLM/TTS for requirements
S2S integrationCan use OpenAI Realtime, Gemini Live through Pipecat
Custom processorsCan extend pipelines for domain needs

Comparing Your Skills

SkillArchitectureBest For
livekit-agentsDistributed (Workers, Sessions)Enterprise scale, semantic turn detection
pipecatCompositional (Frames, Processors)Provider flexibility, custom processing

You can now choose the right framework for each project—or use both when different parts of your system have different requirements.

Next Steps

ChapterWhat You'll Learn
64: OpenAI Realtime APIDirect access without framework abstraction
65: Gemini Live APIMultimodal voice + vision
66: Phone & BrowserReal communication channels
67: CapstoneProduction voice-enabled Task Manager