Frame-Based Pipeline Architecture
Pipecat's power comes from a simple idea: everything is a frame. Audio, text, control signals—all frames flowing through processors. This lesson teaches you to think in frames and build voice pipelines through composition.
The Frame Abstraction
In LiveKit, you thought about jobs—discrete work units distributed across workers. In Pipecat, you think about frames—data units flowing through transformations.
What Is a Frame?
A frame is a typed data container that flows through the pipeline:
from pipecat.frames.frames import Frame, AudioRawFrame, TextFrame
# A frame carries data of a specific type
class AudioRawFrame(Frame):
audio: bytes # Raw audio samples
sample_rate: int # e.g., 16000
num_channels: int # e.g., 1 (mono)
Frames are:
- Typed: The type tells processors what to do with them
- Immutable: Processors create new frames, not modify existing ones
- Chainable: Output frames become input frames for the next processor
Why Frames?
The frame abstraction solves a key problem: how do you connect diverse AI services that speak different "languages"?
Without Frames:
Deepgram outputs JSON → Parse JSON → Extract text → Format for OpenAI
OpenAI outputs JSON → Parse JSON → Extract response → Format for Cartesia
With Frames:
Deepgram → TextFrame → OpenAI → TextFrame → Cartesia
(Each service knows how to handle TextFrame)
Frames create a uniform interface. Any processor that outputs TextFrame can connect to any processor that accepts TextFrame.
Frame Types
Pipecat defines several frame types. The most important:
AudioRawFrame
Raw audio data moving through the pipeline:
from pipecat.frames.frames import AudioRawFrame
# Audio from microphone or transport
audio_frame = AudioRawFrame(
audio=raw_bytes, # PCM audio samples
sample_rate=16000, # Samples per second
num_channels=1 # Mono audio
)
When you see it: Transport input, STT input, TTS output, transport output.
TextFrame
Text content (transcriptions, LLM responses, user input):
from pipecat.frames.frames import TextFrame
# Transcribed speech or LLM response
text_frame = TextFrame(text="Hello, how can I help you today?")
When you see it: STT output, LLM input, LLM output, TTS input.
EndFrame
Signals the end of a conversation or stream:
from pipecat.frames.frames import EndFrame
# Pipeline should shut down gracefully
end_frame = EndFrame()
When you see it: User hangs up, timeout, explicit termination.
Control Frames
Control pipeline behavior without carrying content:
from pipecat.frames.frames import (
StartInterruptionFrame, # User started speaking (barge-in)
StopInterruptionFrame, # User stopped speaking
CancelFrame, # Cancel current processing
)
When you see it: Interruption handling, flow control, error recovery.
Frame Flow Diagram
User Speaks STT Processor LLM Processor TTS Processor User Hears
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
AudioRawFrame ──▶ TextFrame ──▶ TextFrame ──▶ AudioRawFrame ──▶ Audio Output
"Hello" "Hello" "Hi! How can [audio bytes] "Hi! How..."
I help?"
Processors: The Building Blocks
Processors are transformations. They receive frames, process them, and emit new frames.
Processor Interface
Every processor implements the same interface:
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import Frame
class MyProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
# 1. Check if this processor handles this frame type
if isinstance(frame, TextFrame):
# 2. Transform the frame
processed = await self.transform(frame)
# 3. Push the result downstream
await self.push_frame(processed)
else:
# 4. Pass through frames we don't handle
await self.push_frame(frame)
Key pattern: Processors either transform frames (STT transforms audio to text) or pass them through (STT passes control frames unchanged).
Built-In Processors
Pipecat provides processors for common services:
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.cartesia import CartesiaTTSService
# Each service is a processor
stt = DeepgramSTTService(api_key="...") # Audio → Text
llm = OpenAILLMService(api_key="...") # Text → Text
tts = CartesiaTTSService(api_key="...") # Text → Audio
Processor Chain
Processors chain together—output of one becomes input of the next:
# Frame flow: Audio → Text → Text → Audio
pipeline = Pipeline([
transport.input(), # Emits AudioRawFrame
stt, # AudioRawFrame → TextFrame
llm, # TextFrame → TextFrame
tts, # TextFrame → AudioRawFrame
transport.output() # Consumes AudioRawFrame
])
Pipelines: Composing Processors
A pipeline is an ordered list of processors. Frames flow from first to last.
Basic Pipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
# Create processors
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-mini"
)
tts = CartesiaTTSService(api_key=os.getenv("CARTESIA_API_KEY"))
# Compose into pipeline
pipeline = Pipeline([
transport.input(),
stt,
llm,
tts,
transport.output()
])
# Run the pipeline
async def main():
runner = PipelineRunner()
task = PipelineTask(pipeline)
await runner.run(task)
Pipeline Execution
When you run a pipeline:
- Transport receives audio → Creates AudioRawFrame
- STT processes audio → Creates TextFrame with transcription
- LLM processes text → Creates TextFrame with response
- TTS processes response → Creates AudioRawFrame with speech
- Transport sends audio → User hears response
All happens asynchronously. Frames queue when processors are busy.
Error Handling
Pipelines handle errors gracefully:
from pipecat.pipeline.task import PipelineParams
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True, # Handle barge-in
enable_metrics=True, # Track latency
)
)
Transport Abstraction
Transports connect pipelines to the outside world. The same pipeline runs on different transports without code changes.
Daily WebRTC Transport
For browser-based voice interaction:
from pipecat.transports.services.daily import DailyTransport
transport = DailyTransport(
room_url="https://your-domain.daily.co/room-name",
token="your-meeting-token",
bot_name="VoiceAgent",
params=DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True, # Voice Activity Detection
vad_analyzer=SileroVADAnalyzer()
)
)
When to use: Production voice agents, browser integration, multi-user rooms.
FastAPI WebSocket Transport
For custom backend integration:
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketTransport
transport = FastAPIWebsocketTransport(
websocket=websocket, # From FastAPI endpoint
params=FastAPIWebsocketParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
)
)
When to use: Existing FastAPI backends, custom protocols, server-side processing.
Local Audio Transport
For development and testing:
from pipecat.transports.local.audio import LocalAudioTransport
transport = LocalAudioTransport(
params=LocalAudioParams(
audio_in_enabled=True, # Use microphone
audio_out_enabled=True, # Use speakers
)
)
When to use: Local development, CLI tools, testing without browser.
Transport Comparison
| Transport | Latency | Setup Complexity | Best For |
|---|---|---|---|
| Daily WebRTC | Low (~100ms) | Medium (Daily account) | Production browser apps |
| FastAPI WebSocket | Low-Medium | Low (existing backend) | Custom integrations |
| Local Audio | Lowest | Lowest | Development, testing |
Complete Voice Pipeline
Putting it all together:
import asyncio
import os
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask, PipelineParams
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyTransport, DailyParams
from pipecat.vad.silero import SileroVADAnalyzer
async def main():
# 1. Configure transport
transport = DailyTransport(
room_url=os.getenv("DAILY_ROOM_URL"),
token=os.getenv("DAILY_TOKEN"),
bot_name="TaskAgent",
params=DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)
# 2. Configure processors
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY")
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-mini",
system_prompt="You are a helpful task management assistant."
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091" # Sonic English
)
# 3. Compose pipeline
pipeline = Pipeline([
transport.input(),
stt,
llm,
tts,
transport.output()
])
# 4. Run with parameters
runner = PipelineRunner()
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True
)
)
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
Improve Your Skill
Based on what you learned, update your pipecat skill:
I learned about Pipecat's frame architecture. Update my skill to include:
1. Frame type guidance: When to use AudioRawFrame vs TextFrame vs EndFrame
2. Processor patterns: How to chain processors correctly
3. Pipeline composition: Best practices for building pipelines
4. Transport selection: When to use Daily vs WebSocket vs Local
Also add common pitfalls I should avoid:
- Forgetting to pass through control frames in custom processors
- Not handling EndFrame for graceful shutdown
- Mismatched audio sample rates between services
Test your updated skill:
Using the pipecat skill, create a voice pipeline for my Task Manager
that uses WebSocket transport instead of Daily. The backend is FastAPI.
Try With AI
Prompt 1: Understand Frame Flow
I'm learning Pipecat's frame-based architecture. Help me trace
the frame flow for this scenario:
1. User says "Create a task called review proposal"
2. Agent responds "I've created your task. Anything else?"
For each step, tell me:
- What frame type is created
- Which processor creates it
- Which processor consumes it
- What data is in the frame
What you're learning: Tracing data flow—essential for debugging voice pipelines.
Prompt 2: Build a Processor Chain
Help me build a complete voice pipeline using my pipecat skill:
Requirements:
- Transport: Local audio (for testing)
- STT: Deepgram Nova-3
- LLM: GPT-4o-mini with Task Manager system prompt
- TTS: Cartesia Sonic
Include:
- VAD for detecting when user stops speaking
- Interruption handling (user can interrupt agent)
- Proper error handling
Walk me through each processor and how frames flow.
What you're learning: Processor composition—building production voice systems from modular components.
Prompt 3: Compare Transports
I need to choose the right transport for three deployment scenarios:
Scenario A: Customer support agent on company website
Scenario B: Voice interface for CLI tool during development
Scenario C: Voice agent integrated with existing Next.js + FastAPI app
Use my pipecat skill to recommend transports for each. For each:
1. Which transport and why?
2. What's the setup complexity?
3. What latency should I expect?
4. Any gotchas to watch out for?
What you're learning: Transport selection—matching infrastructure to deployment requirements.