Skip to main content

Frame-Based Pipeline Architecture

Pipecat's power comes from a simple idea: everything is a frame. Audio, text, control signals—all frames flowing through processors. This lesson teaches you to think in frames and build voice pipelines through composition.


The Frame Abstraction

In LiveKit, you thought about jobs—discrete work units distributed across workers. In Pipecat, you think about frames—data units flowing through transformations.

What Is a Frame?

A frame is a typed data container that flows through the pipeline:

from pipecat.frames.frames import Frame, AudioRawFrame, TextFrame

# A frame carries data of a specific type
class AudioRawFrame(Frame):
audio: bytes # Raw audio samples
sample_rate: int # e.g., 16000
num_channels: int # e.g., 1 (mono)

Frames are:

  • Typed: The type tells processors what to do with them
  • Immutable: Processors create new frames, not modify existing ones
  • Chainable: Output frames become input frames for the next processor

Why Frames?

The frame abstraction solves a key problem: how do you connect diverse AI services that speak different "languages"?

Without Frames:
Deepgram outputs JSON → Parse JSON → Extract text → Format for OpenAI
OpenAI outputs JSON → Parse JSON → Extract response → Format for Cartesia

With Frames:
Deepgram → TextFrame → OpenAI → TextFrame → Cartesia
(Each service knows how to handle TextFrame)

Frames create a uniform interface. Any processor that outputs TextFrame can connect to any processor that accepts TextFrame.


Frame Types

Pipecat defines several frame types. The most important:

AudioRawFrame

Raw audio data moving through the pipeline:

from pipecat.frames.frames import AudioRawFrame

# Audio from microphone or transport
audio_frame = AudioRawFrame(
audio=raw_bytes, # PCM audio samples
sample_rate=16000, # Samples per second
num_channels=1 # Mono audio
)

When you see it: Transport input, STT input, TTS output, transport output.

TextFrame

Text content (transcriptions, LLM responses, user input):

from pipecat.frames.frames import TextFrame

# Transcribed speech or LLM response
text_frame = TextFrame(text="Hello, how can I help you today?")

When you see it: STT output, LLM input, LLM output, TTS input.

EndFrame

Signals the end of a conversation or stream:

from pipecat.frames.frames import EndFrame

# Pipeline should shut down gracefully
end_frame = EndFrame()

When you see it: User hangs up, timeout, explicit termination.

Control Frames

Control pipeline behavior without carrying content:

from pipecat.frames.frames import (
StartInterruptionFrame, # User started speaking (barge-in)
StopInterruptionFrame, # User stopped speaking
CancelFrame, # Cancel current processing
)

When you see it: Interruption handling, flow control, error recovery.

Frame Flow Diagram

User Speaks    STT Processor    LLM Processor    TTS Processor    User Hears
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
AudioRawFrame ──▶ TextFrame ──▶ TextFrame ──▶ AudioRawFrame ──▶ Audio Output
"Hello" "Hello" "Hi! How can [audio bytes] "Hi! How..."
I help?"

Processors: The Building Blocks

Processors are transformations. They receive frames, process them, and emit new frames.

Processor Interface

Every processor implements the same interface:

from pipecat.processors.frame_processor import FrameProcessor
from pipecat.frames.frames import Frame

class MyProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
# 1. Check if this processor handles this frame type
if isinstance(frame, TextFrame):
# 2. Transform the frame
processed = await self.transform(frame)
# 3. Push the result downstream
await self.push_frame(processed)
else:
# 4. Pass through frames we don't handle
await self.push_frame(frame)

Key pattern: Processors either transform frames (STT transforms audio to text) or pass them through (STT passes control frames unchanged).

Built-In Processors

Pipecat provides processors for common services:

from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.cartesia import CartesiaTTSService

# Each service is a processor
stt = DeepgramSTTService(api_key="...") # Audio → Text
llm = OpenAILLMService(api_key="...") # Text → Text
tts = CartesiaTTSService(api_key="...") # Text → Audio

Processor Chain

Processors chain together—output of one becomes input of the next:

# Frame flow: Audio → Text → Text → Audio
pipeline = Pipeline([
transport.input(), # Emits AudioRawFrame
stt, # AudioRawFrame → TextFrame
llm, # TextFrame → TextFrame
tts, # TextFrame → AudioRawFrame
transport.output() # Consumes AudioRawFrame
])

Pipelines: Composing Processors

A pipeline is an ordered list of processors. Frames flow from first to last.

Basic Pipeline

from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask

# Create processors
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-mini"
)
tts = CartesiaTTSService(api_key=os.getenv("CARTESIA_API_KEY"))

# Compose into pipeline
pipeline = Pipeline([
transport.input(),
stt,
llm,
tts,
transport.output()
])

# Run the pipeline
async def main():
runner = PipelineRunner()
task = PipelineTask(pipeline)
await runner.run(task)

Pipeline Execution

When you run a pipeline:

  1. Transport receives audio → Creates AudioRawFrame
  2. STT processes audio → Creates TextFrame with transcription
  3. LLM processes text → Creates TextFrame with response
  4. TTS processes response → Creates AudioRawFrame with speech
  5. Transport sends audio → User hears response

All happens asynchronously. Frames queue when processors are busy.

Error Handling

Pipelines handle errors gracefully:

from pipecat.pipeline.task import PipelineParams

task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True, # Handle barge-in
enable_metrics=True, # Track latency
)
)

Transport Abstraction

Transports connect pipelines to the outside world. The same pipeline runs on different transports without code changes.

Daily WebRTC Transport

For browser-based voice interaction:

from pipecat.transports.services.daily import DailyTransport

transport = DailyTransport(
room_url="https://your-domain.daily.co/room-name",
token="your-meeting-token",
bot_name="VoiceAgent",
params=DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True, # Voice Activity Detection
vad_analyzer=SileroVADAnalyzer()
)
)

When to use: Production voice agents, browser integration, multi-user rooms.

FastAPI WebSocket Transport

For custom backend integration:

from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketTransport

transport = FastAPIWebsocketTransport(
websocket=websocket, # From FastAPI endpoint
params=FastAPIWebsocketParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
)
)

When to use: Existing FastAPI backends, custom protocols, server-side processing.

Local Audio Transport

For development and testing:

from pipecat.transports.local.audio import LocalAudioTransport

transport = LocalAudioTransport(
params=LocalAudioParams(
audio_in_enabled=True, # Use microphone
audio_out_enabled=True, # Use speakers
)
)

When to use: Local development, CLI tools, testing without browser.

Transport Comparison

TransportLatencySetup ComplexityBest For
Daily WebRTCLow (~100ms)Medium (Daily account)Production browser apps
FastAPI WebSocketLow-MediumLow (existing backend)Custom integrations
Local AudioLowestLowestDevelopment, testing

Complete Voice Pipeline

Putting it all together:

import asyncio
import os
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask, PipelineParams
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyTransport, DailyParams
from pipecat.vad.silero import SileroVADAnalyzer

async def main():
# 1. Configure transport
transport = DailyTransport(
room_url=os.getenv("DAILY_ROOM_URL"),
token=os.getenv("DAILY_TOKEN"),
bot_name="TaskAgent",
params=DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer()
)
)

# 2. Configure processors
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY")
)

llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-mini",
system_prompt="You are a helpful task management assistant."
)

tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091" # Sonic English
)

# 3. Compose pipeline
pipeline = Pipeline([
transport.input(),
stt,
llm,
tts,
transport.output()
])

# 4. Run with parameters
runner = PipelineRunner()
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True
)
)

await runner.run(task)

if __name__ == "__main__":
asyncio.run(main())

Improve Your Skill

Based on what you learned, update your pipecat skill:

I learned about Pipecat's frame architecture. Update my skill to include:

1. Frame type guidance: When to use AudioRawFrame vs TextFrame vs EndFrame
2. Processor patterns: How to chain processors correctly
3. Pipeline composition: Best practices for building pipelines
4. Transport selection: When to use Daily vs WebSocket vs Local

Also add common pitfalls I should avoid:
- Forgetting to pass through control frames in custom processors
- Not handling EndFrame for graceful shutdown
- Mismatched audio sample rates between services

Test your updated skill:

Using the pipecat skill, create a voice pipeline for my Task Manager
that uses WebSocket transport instead of Daily. The backend is FastAPI.

Try With AI

Prompt 1: Understand Frame Flow

I'm learning Pipecat's frame-based architecture. Help me trace
the frame flow for this scenario:

1. User says "Create a task called review proposal"
2. Agent responds "I've created your task. Anything else?"

For each step, tell me:
- What frame type is created
- Which processor creates it
- Which processor consumes it
- What data is in the frame

What you're learning: Tracing data flow—essential for debugging voice pipelines.

Prompt 2: Build a Processor Chain

Help me build a complete voice pipeline using my pipecat skill:

Requirements:
- Transport: Local audio (for testing)
- STT: Deepgram Nova-3
- LLM: GPT-4o-mini with Task Manager system prompt
- TTS: Cartesia Sonic

Include:
- VAD for detecting when user stops speaking
- Interruption handling (user can interrupt agent)
- Proper error handling

Walk me through each processor and how frames flow.

What you're learning: Processor composition—building production voice systems from modular components.

Prompt 3: Compare Transports

I need to choose the right transport for three deployment scenarios:

Scenario A: Customer support agent on company website
Scenario B: Voice interface for CLI tool during development
Scenario C: Voice agent integrated with existing Next.js + FastAPI app

Use my pipecat skill to recommend transports for each. For each:
1. Which transport and why?
2. What's the setup complexity?
3. What latency should I expect?
4. Any gotchas to watch out for?

What you're learning: Transport selection—matching infrastructure to deployment requirements.