Server-Sent Events (SSE) Deep Dive
When you type a prompt into ChatGPT or Claude and watch the response appear word-by-word, you're witnessing SSE in action. Server-Sent Events is the protocol that makes AI feel responsive. Without it, you'd wait 10-30 seconds staring at a loading spinner until the entire response arrived at once.
OpenAI's streaming API, Anthropic's Claude API, Google's Gemini API—they all use SSE to push tokens to your client as the model generates them. Understanding SSE isn't optional for AI engineers; it's the transport layer that connects your users to language models.
This lesson teaches you to parse SSE streams the way production AI clients do. You'll implement the same patterns that power Claude Code's streaming interface and OpenAI's official TypeScript SDK. By the end, you'll have a reusable SSE parser that works with any LLM API.
The SSE Protocol Format
SSE is elegantly simple. The server sends plain text messages separated by double newlines. Each message contains one or more fields.
Message Structure
event: message_start
id: msg_001
data: {"type": "message_start", "message": {"id": "msg_01XfDUDY...", "model": "claude-3-opus"}}
data: {"type": "content_block_delta", "delta": {"text": "Hello"}}
data: {"type": "content_block_delta", "delta": {"text": " world"}}
event: message_stop
data: {"type": "message_stop"}
Output:
Four SSE messages:
1. message_start event with JSON payload
2. content delta (no event type, default is "message")
3. another content delta
4. message_stop event
Field Types
| Field | Purpose | Example |
|---|---|---|
data: | The payload (usually JSON) | data: {"text": "Hello"} |
event: | Event type for routing | event: message_start |
id: | Message ID for reconnection | id: msg_12345 |
retry: | Reconnection delay (ms) | retry: 3000 |
Every field ends with a newline. Messages end with a blank line (double newline). That's the entire protocol.
Multi-Line Data
Data can span multiple lines by repeating the data: prefix:
data: {"content": "This is a long message
data: that spans multiple lines
data: in the original JSON"}
Output:
When parsed, data lines are concatenated with newlines:
{"content": "This is a long message
that spans multiple lines
in the original JSON"}
Browser EventSource API
Browsers have native SSE support through the EventSource API. It handles connection management and reconnection automatically.
Basic Usage
// Browser-native SSE client
const eventSource = new EventSource("https://api.example.com/stream");
// Default "message" events
eventSource.onmessage = (event: MessageEvent) => {
const data = JSON.parse(event.data);
console.log("Received:", data);
};
// Named events (e.g., "update", "error")
eventSource.addEventListener("update", (event: MessageEvent) => {
console.log("Update event:", event.data);
});
// Error handling
eventSource.onerror = (error: Event) => {
console.error("SSE connection error:", error);
// EventSource automatically attempts to reconnect
};
// Clean up
eventSource.close();
Output:
Received: {... parsed JSON from data field ...}
Update event: ... raw data string ...
EventSource Limitations
EventSource is convenient but limited:
| Feature | EventSource | fetch + ReadableStream |
|---|---|---|
| HTTP Method | GET only | Any method |
| Headers | No custom headers | Full control |
| Body | None | Can send JSON body |
| Auth | URL params only | Authorization header |
| Environment | Browser only | Browser + Node.js + Bun |
For AI APIs that require POST requests with JSON bodies and Authorization headers, you need manual parsing.
Parsing SSE with fetch and ReadableStream
Real AI applications use fetch to send POST requests with streaming responses. Here's the production pattern.
The Complete SSE Parser
/**
* Parse SSE stream from a fetch Response.
* Yields parsed JSON objects from data: lines.
*/
async function* parseSSE(
stream: ReadableStream<Uint8Array>
): AsyncGenerator<unknown> {
const reader = stream.getReader();
const decoder = new TextDecoder();
let buffer = "";
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
// Decode chunk and add to buffer
buffer += decoder.decode(value, { stream: true });
// Split on double newlines (SSE message separator)
const messages = buffer.split("\n\n");
// Keep incomplete message in buffer
buffer = messages.pop() ?? "";
// Process complete messages
for (const message of messages) {
for (const line of message.split("\n")) {
// Skip empty lines, comments, and non-data fields
if (!line.startsWith("data: ")) continue;
const data = line.slice(6); // Remove "data: " prefix
// Skip [DONE] signal (OpenAI convention)
if (data === "[DONE]") continue;
try {
yield JSON.parse(data);
} catch {
// Non-JSON data, yield as string
yield data;
}
}
}
}
// Handle any remaining data in buffer
if (buffer.trim()) {
for (const line of buffer.split("\n")) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data !== "[DONE]") {
try {
yield JSON.parse(data);
} catch {
yield data;
}
}
}
}
}
} finally {
reader.releaseLock();
}
}
Output:
Async generator that yields parsed SSE data objects one at a time.
Handles chunked responses, multi-line data, and [DONE] termination.
How the Parser Works
-
Buffering: Network chunks don't align with SSE messages. We accumulate text until we see
\n\n(message separator). -
Splitting:
buffer.split("\n\n")gives us complete messages. The last element might be incomplete, so we keep it in the buffer. -
Line processing: Each message can have multiple lines. We only care about lines starting with
data:. -
JSON parsing: Most AI APIs send JSON. We parse it and yield objects. If parsing fails, we yield the raw string.
-
Cleanup: The
finallyblock ensures we release the reader lock even if the caller stops iterating early.
OpenAI's Streaming Format
OpenAI pioneered the standard that most AI APIs follow. Here's what their streaming responses look like.
Response Structure
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","created":1677858242,"model":"gpt-4","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","created":1677858242,"model":"gpt-4","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","created":1677858242,"model":"gpt-4","choices":[{"index":0,"delta":{"content":" world"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","created":1677858242,"model":"gpt-4","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
data: [DONE]
Output:
Five SSE messages:
1. Role announcement (assistant)
2. First token: "Hello"
3. Second token: " world"
4. Empty delta with finish_reason: "stop"
5. [DONE] signal (OpenAI-specific termination)
Extracting Content
The content lives in choices[0].delta.content. Here's a specialized parser:
interface OpenAIStreamChunk {
id: string;
object: string;
created: number;
model: string;
choices: Array<{
index: number;
delta: {
role?: string;
content?: string;
};
finish_reason: string | null;
}>;
}
/**
* Stream tokens from OpenAI-compatible API.
* Yields content strings as they arrive.
*/
async function* streamOpenAITokens(
response: Response
): AsyncGenerator<string> {
if (!response.body) {
throw new Error("Response body is null");
}
for await (const chunk of parseSSE(response.body)) {
const data = chunk as OpenAIStreamChunk;
const content = data.choices?.[0]?.delta?.content;
if (content) {
yield content;
}
}
}
// Usage example
async function chat(prompt: string): Promise<void> {
const response = await fetch("https://api.openai.com/v1/chat/completions", {
method: "POST",
headers: {
"Content-Type": "application/json",
"Authorization": `Bearer ${process.env.OPENAI_API_KEY}`,
},
body: JSON.stringify({
model: "gpt-4",
messages: [{ role: "user", content: prompt }],
stream: true,
}),
});
if (!response.ok) {
throw new Error(`API error: ${response.status}`);
}
// Stream tokens directly to stdout
for await (const token of streamOpenAITokens(response)) {
process.stdout.write(token);
}
console.log(); // Final newline
}
// Run it
chat("Explain SSE in one sentence.").catch(console.error);
Output:
SSE (Server-Sent Events) is a web protocol that enables servers to push real-time updates to clients over a single HTTP connection.
The tokens appear character-by-character as the model generates them, creating the familiar "typing" effect.
Anthropic's Streaming Format
Anthropic's Claude API uses a similar approach with different event types.
Response Structure
event: message_start
data: {"type":"message_start","message":{"id":"msg_01...","type":"message","role":"assistant","model":"claude-3-opus-20240229","content":[]}}
event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" world"}}
event: content_block_stop
data: {"type":"content_block_stop","index":0}
event: message_stop
data: {"type":"message_stop"}
Output:
Anthropic uses typed events for structure:
- message_start: Begin response
- content_block_delta: Individual tokens
- message_stop: End response
Unified Token Extractor
Here's a parser that works with both OpenAI and Anthropic formats:
type StreamingProvider = "openai" | "anthropic";
interface TokenExtractor {
extractToken: (chunk: unknown) => string | undefined;
}
const extractors: Record<StreamingProvider, TokenExtractor> = {
openai: {
extractToken: (chunk: unknown): string | undefined => {
const data = chunk as OpenAIStreamChunk;
return data.choices?.[0]?.delta?.content;
},
},
anthropic: {
extractToken: (chunk: unknown): string | undefined => {
const data = chunk as { type: string; delta?: { text: string } };
if (data.type === "content_block_delta") {
return data.delta?.text;
}
return undefined;
},
},
};
/**
* Stream tokens from any major LLM provider.
*/
async function* streamTokens(
response: Response,
provider: StreamingProvider
): AsyncGenerator<string> {
if (!response.body) {
throw new Error("Response body is null");
}
const extractor = extractors[provider];
for await (const chunk of parseSSE(response.body)) {
const token = extractor.extractToken(chunk);
if (token) {
yield token;
}
}
}
Output:
Unified generator that works with OpenAI and Anthropic.
Add new providers by extending the extractors object.
Reconnection and Error Recovery
SSE includes built-in support for reconnection through the id field and Last-Event-ID header.
How Reconnection Works
- Server includes
id:field in messages - Client tracks the last received ID
- On disconnect, client reconnects with
Last-Event-IDheader - Server resumes from that point
Implementing Reconnection
interface SSEOptions {
url: string;
method?: string;
headers?: Record<string, string>;
body?: string;
maxRetries?: number;
retryDelay?: number;
}
async function* streamWithReconnection(
options: SSEOptions
): AsyncGenerator<unknown> {
const {
url,
method = "GET",
headers = {},
body,
maxRetries = 3,
retryDelay = 1000,
} = options;
let lastEventId: string | undefined;
let retries = 0;
while (retries <= maxRetries) {
try {
const requestHeaders: Record<string, string> = { ...headers };
// Include last event ID for resume
if (lastEventId) {
requestHeaders["Last-Event-ID"] = lastEventId;
}
const response = await fetch(url, {
method,
headers: requestHeaders,
body,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
if (!response.body) {
throw new Error("Response body is null");
}
// Reset retry count on successful connection
retries = 0;
// Parse stream and track IDs
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const messages = buffer.split("\n\n");
buffer = messages.pop() ?? "";
for (const message of messages) {
let eventData: string | undefined;
let eventId: string | undefined;
for (const line of message.split("\n")) {
if (line.startsWith("data: ")) {
eventData = line.slice(6);
} else if (line.startsWith("id: ")) {
eventId = line.slice(4);
}
}
// Track last ID for reconnection
if (eventId) {
lastEventId = eventId;
}
// Yield parsed data
if (eventData && eventData !== "[DONE]") {
try {
yield JSON.parse(eventData);
} catch {
yield eventData;
}
}
}
}
// Clean exit - stream completed
return;
} catch (error) {
retries++;
if (retries > maxRetries) {
throw new Error(
`SSE connection failed after ${maxRetries} retries: ${error}`
);
}
console.warn(
`SSE connection lost, retrying (${retries}/${maxRetries})...`
);
// Exponential backoff
await new Promise((resolve) =>
setTimeout(resolve, retryDelay * Math.pow(2, retries - 1))
);
}
}
}
Output:
Generator that:
1. Tracks message IDs as they arrive
2. On disconnect, waits with exponential backoff
3. Reconnects with Last-Event-ID header
4. Server can resume from last sent message
Usage with Recovery
async function streamWithRecovery(prompt: string): Promise<void> {
const stream = streamWithReconnection({
url: "https://api.openai.com/v1/chat/completions",
method: "POST",
headers: {
"Content-Type": "application/json",
"Authorization": `Bearer ${process.env.OPENAI_API_KEY}`,
},
body: JSON.stringify({
model: "gpt-4",
messages: [{ role: "user", content: prompt }],
stream: true,
}),
maxRetries: 3,
retryDelay: 1000,
});
try {
for await (const chunk of stream) {
const data = chunk as OpenAIStreamChunk;
const content = data.choices?.[0]?.delta?.content;
if (content) {
process.stdout.write(content);
}
}
console.log();
} catch (error) {
console.error("Stream failed:", error);
}
}
Output:
Streaming with automatic reconnection.
If network drops mid-response, it attempts to resume.
Note: OpenAI doesn't support Last-Event-ID resume, but many SSE servers do.
Handling Partial Messages
Network issues can corrupt SSE streams. Here's how to handle edge cases.
Incomplete JSON
function safeParse(data: string): unknown | null {
try {
return JSON.parse(data);
} catch (error) {
// Log for debugging but don't crash
console.warn("Failed to parse SSE data:", data.slice(0, 100));
return null;
}
}
async function* robustSSEParser(
stream: ReadableStream<Uint8Array>
): AsyncGenerator<unknown> {
for await (const chunk of parseSSE(stream)) {
// parseSSE already handles JSON parsing, but double-check
if (chunk !== null && chunk !== undefined) {
yield chunk;
}
}
}
Output:
Graceful handling of malformed SSE messages.
Logs warning but continues processing valid messages.
Connection Timeout
async function fetchWithTimeout(
url: string,
options: RequestInit,
timeoutMs: number = 30000
): Promise<Response> {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeoutMs);
try {
const response = await fetch(url, {
...options,
signal: controller.signal,
});
return response;
} finally {
clearTimeout(timeoutId);
}
}
Output:
Abort connection if server doesn't respond within timeout.
Prevents hanging on dead connections.
Complete Working Example
Here's a production-ready streaming client combining all patterns:
import { config } from "dotenv";
config();
interface StreamConfig {
apiKey: string;
model: string;
maxTokens?: number;
temperature?: number;
}
interface ChatMessage {
role: "user" | "assistant" | "system";
content: string;
}
async function* streamChat(
messages: ChatMessage[],
config: StreamConfig
): AsyncGenerator<string> {
const response = await fetch("https://api.openai.com/v1/chat/completions", {
method: "POST",
headers: {
"Content-Type": "application/json",
"Authorization": `Bearer ${config.apiKey}`,
},
body: JSON.stringify({
model: config.model,
messages,
max_tokens: config.maxTokens ?? 1024,
temperature: config.temperature ?? 0.7,
stream: true,
}),
});
if (!response.ok) {
const error = await response.text();
throw new Error(`OpenAI API error ${response.status}: ${error}`);
}
if (!response.body) {
throw new Error("Response body is null");
}
// Use our SSE parser
for await (const chunk of parseSSE(response.body)) {
const data = chunk as OpenAIStreamChunk;
const content = data.choices?.[0]?.delta?.content;
if (content) {
yield content;
}
}
}
// Interactive CLI chat
async function main(): Promise<void> {
const config: StreamConfig = {
apiKey: process.env.OPENAI_API_KEY ?? "",
model: "gpt-4",
};
if (!config.apiKey) {
console.error("Set OPENAI_API_KEY environment variable");
process.exit(1);
}
const messages: ChatMessage[] = [
{ role: "system", content: "You are a helpful assistant." },
{ role: "user", content: "Explain server-sent events in 50 words." },
];
process.stdout.write("Assistant: ");
let fullResponse = "";
for await (const token of streamChat(messages, config)) {
process.stdout.write(token);
fullResponse += token;
}
console.log("\n");
console.log(`[Received ${fullResponse.length} characters]`);
}
main().catch(console.error);
Output:
Assistant: Server-Sent Events (SSE) is a web technology enabling servers to push
real-time updates to browsers over HTTP. Unlike WebSockets, SSE is unidirectional
(server to client) and simpler to implement. It's commonly used for live feeds,
notifications, and AI response streaming.
[Received 246 characters]
Try With AI
Prompt 1: Parse a Different Provider
I need to add support for Cohere's streaming API to my SSE parser.
Their format uses:
data: {"event_type":"text-generation","text":"Hello"}
data: {"event_type":"stream-end","finish_reason":"COMPLETE"}
Write a token extractor function that:
1. Extracts text from text-generation events
2. Ignores stream-end events
3. Handles the case where "text" might be missing
Show me how to add this to the extractors object pattern from the lesson.
What you're learning: How to extend the unified streaming pattern to support new providers. This skill is essential as AI APIs proliferate—you'll encounter many variations on the SSE format.
Prompt 2: Handle Streaming Errors
My SSE stream sometimes receives error messages mid-stream:
data: {"type":"error","error":{"type":"overloaded_error","message":"API is temporarily overloaded"}}
Modify the parseSSE function to:
1. Detect error messages by checking for an "error" field
2. Throw a custom StreamError with the error message
3. Include the last successfully received text so we can show partial results
What's the best way to distinguish recoverable errors from fatal ones?
What you're learning: Error handling in streaming contexts requires different patterns than request/response. You're learning to preserve partial results and classify error severity.
Prompt 3: Build a Token Counter
I want to count tokens as they stream for cost estimation. Using the OpenAI
streaming format, help me:
1. Count tokens in real-time as delta.content arrives
2. Track the model name from the first chunk
3. Estimate cost based on model pricing (gpt-4: $0.03/1K tokens)
4. Display running total alongside the streaming text
Show me how to wrap the streamTokens generator to add this metadata tracking.
What you're learning: Real-time metrics during streaming. This pattern extends to latency tracking, rate limiting visualization, and usage dashboards that update as responses stream.
Safety note: SSE connections hold server resources. Always call reader.releaseLock() in a finally block and close connections when users navigate away. Leaked connections degrade both client and server performance.