Testing Streaming Responses
Your chat interface displays tokens as they arrive from the AI. Users see words appearing in real-time, creating the illusion of the AI "thinking aloud." But streaming introduces complexity that non-streaming APIs don't have. What happens if the network drops mid-response? What if the user cancels while tokens are still arriving? What if chunks arrive out of order?
You can't test streaming by calling the real API thousands of times. Each call costs money, takes seconds, and produces non-deterministic output. Instead, you need mocks that simulate streaming behavior precisely: chunks arriving over time, cancellation interrupting the flow, and partial responses that stop unexpectedly.
This lesson teaches testing patterns for streaming AI responses. You'll mock async generators to simulate chunk delivery, test SSE parsing with synthetic event streams, verify cancellation cleans up properly, and design tests that catch ordering bugs.
The Streaming Testing Challenge
Streaming responses differ fundamentally from request-response APIs:
| Aspect | Regular API | Streaming API |
|---|---|---|
| Response | Single JSON object | Many chunks over time |
| Timing | Response time matters | Inter-chunk timing matters |
| Failure | Request fails or succeeds | Can fail mid-stream |
| Cancellation | Cancel before response | Cancel during streaming |
| Testing | Mock returns value | Mock yields sequence |
Traditional mocking with mockResolvedValue doesn't work for streams. You need mocks that yield values over time, respond to abort signals, and can simulate failures at any point in the sequence.
Mocking Async Generators
AI SDK streaming methods return async iterables. The simplest mock is an async generator function:
import { describe, it, expect, vi } from "vitest";
// Type for OpenAI-style streaming chunks
interface ChatChunk {
id: string;
choices: Array<{
delta: { content?: string; role?: string };
finish_reason: string | null;
}>;
}
describe("async generator mocking", () => {
it("mocks a basic streaming response", async () => {
// Create a mock that yields chunks
const mockStream = vi.fn<[], AsyncGenerator<ChatChunk>>();
mockStream.mockImplementation(async function* () {
yield {
id: "chunk-1",
choices: [{ delta: { role: "assistant" }, finish_reason: null }],
};
yield {
id: "chunk-2",
choices: [{ delta: { content: "Hello" }, finish_reason: null }],
};
yield {
id: "chunk-3",
choices: [{ delta: { content: " there" }, finish_reason: null }],
};
yield {
id: "chunk-4",
choices: [{ delta: { content: "!" }, finish_reason: "stop" }],
};
});
// Consume the mock stream
const chunks: string[] = [];
for await (const chunk of mockStream()) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
chunks.push(content);
}
}
expect(chunks).toEqual(["Hello", " there", "!"]);
expect(chunks.join("")).toBe("Hello there!");
});
});
Output:
PASS tests/streaming.test.ts
async generator mocking
✓ mocks a basic streaming response (2ms)
The mock yields chunks in sequence. Each chunk contains a delta with partial content. The consuming code extracts content from each delta and assembles the complete message.
Adding Timing Control
Real streams have delays between chunks. Simulate this with setTimeout wrapped in a Promise:
import { describe, it, expect, vi } from "vitest";
function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
describe("timed streaming", () => {
it("simulates realistic chunk timing", async () => {
const mockStream = vi.fn<[], AsyncGenerator<{ content: string }>>();
mockStream.mockImplementation(async function* () {
yield { content: "The" };
await delay(10); // 10ms between chunks
yield { content: " answer" };
await delay(10);
yield { content: " is" };
await delay(10);
yield { content: " 42" };
});
const startTime = Date.now();
const parts: string[] = [];
for await (const chunk of mockStream()) {
parts.push(chunk.content);
}
const elapsed = Date.now() - startTime;
expect(parts.join("")).toBe("The answer is 42");
expect(elapsed).toBeGreaterThanOrEqual(30); // At least 3 delays
expect(elapsed).toBeLessThan(100); // But not too slow
});
});
Output:
PASS tests/timed-streaming.test.ts
timed streaming
✓ simulates realistic chunk timing (42ms)
Timing control lets you test timeout handling, loading indicators, and debouncing logic.
Reusable Stream Factory
For complex tests, create a factory function that generates mock streams:
import { vi } from "vitest";
interface StreamChunk {
type: "content" | "tool_call" | "done";
content?: string;
tool_call?: { name: string; arguments: string };
usage?: { prompt_tokens: number; completion_tokens: number };
}
interface MockStreamOptions {
chunks: StreamChunk[];
delayBetweenChunks?: number;
errorAfterChunk?: number;
errorMessage?: string;
}
function createMockStream(options: MockStreamOptions) {
const {
chunks,
delayBetweenChunks = 0,
errorAfterChunk,
errorMessage = "Stream error",
} = options;
return async function* (): AsyncGenerator<StreamChunk> {
for (let i = 0; i < chunks.length; i++) {
// Check if we should error after this chunk
if (errorAfterChunk !== undefined && i >= errorAfterChunk) {
throw new Error(errorMessage);
}
yield chunks[i];
// Delay before next chunk (not after last)
if (delayBetweenChunks > 0 && i < chunks.length - 1) {
await new Promise((r) => setTimeout(r, delayBetweenChunks));
}
}
};
}
Output:
// Usage example - factory creates customizable mocks
const happyPath = createMockStream({
chunks: [
{ type: "content", content: "Hello" },
{ type: "content", content: " world" },
{ type: "done", usage: { prompt_tokens: 10, completion_tokens: 5 } },
],
});
const errorCase = createMockStream({
chunks: [
{ type: "content", content: "Starting..." },
{ type: "content", content: "then..." },
],
errorAfterChunk: 2,
errorMessage: "Connection lost",
});
Now you can create mocks for success, failure, and edge cases without repeating boilerplate.
Testing SSE (Server-Sent Events)
Many AI APIs use Server-Sent Events for streaming. SSE delivers events as text lines in a specific format:
data: {"content": "Hello"}
data: {"content": " world"}
data: [DONE]
Testing SSE requires mocking the event parsing:
import { describe, it, expect, vi, beforeEach } from "vitest";
// SSE parser function under test
function parseSSEChunk(line: string): { content?: string; done?: boolean } | null {
if (!line.startsWith("data: ")) {
return null; // Skip non-data lines
}
const data = line.slice(6); // Remove "data: " prefix
if (data === "[DONE]") {
return { done: true };
}
try {
return JSON.parse(data);
} catch {
return null; // Invalid JSON
}
}
describe("SSE parsing", () => {
it("parses content chunks", () => {
const result = parseSSEChunk('data: {"content": "Hello"}');
expect(result).toEqual({ content: "Hello" });
});
it("recognizes done signal", () => {
const result = parseSSEChunk("data: [DONE]");
expect(result).toEqual({ done: true });
});
it("ignores non-data lines", () => {
expect(parseSSEChunk("")).toBeNull();
expect(parseSSEChunk(": comment")).toBeNull();
expect(parseSSEChunk("event: message")).toBeNull();
});
it("handles malformed JSON gracefully", () => {
const result = parseSSEChunk("data: {broken json");
expect(result).toBeNull();
});
});
Output:
PASS tests/sse-parsing.test.ts
SSE parsing
✓ parses content chunks (1ms)
✓ recognizes done signal
✓ ignores non-data lines
✓ handles malformed JSON gracefully
Mocking a Full SSE Stream
Test the complete SSE consumption flow by mocking fetch to return a streaming response:
import { describe, it, expect, vi, beforeEach } from "vitest";
// Create a mock ReadableStream from SSE lines
function createSSEStream(events: string[]): ReadableStream<Uint8Array> {
const encoder = new TextEncoder();
let index = 0;
return new ReadableStream({
pull(controller) {
if (index < events.length) {
// SSE format: "data: <content>\n\n"
const chunk = `data: ${events[index]}\n\n`;
controller.enqueue(encoder.encode(chunk));
index++;
} else {
controller.close();
}
},
});
}
// Function under test
async function* consumeSSEStream(
response: Response
): AsyncGenerator<{ content: string }> {
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Process complete lines
const lines = buffer.split("\n\n");
buffer = lines.pop() || ""; // Keep incomplete line
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data !== "[DONE]") {
yield JSON.parse(data);
}
}
}
}
}
describe("SSE stream consumption", () => {
it("yields parsed events from SSE stream", async () => {
const sseEvents = [
'{"content": "Hello"}',
'{"content": " there"}',
'{"content": "!"}',
"[DONE]",
];
const mockResponse = new Response(createSSEStream(sseEvents));
const chunks: string[] = [];
for await (const event of consumeSSEStream(mockResponse)) {
chunks.push(event.content);
}
expect(chunks).toEqual(["Hello", " there", "!"]);
});
it("handles empty stream", async () => {
const mockResponse = new Response(createSSEStream(["[DONE]"]));
const chunks: string[] = [];
for await (const event of consumeSSEStream(mockResponse)) {
chunks.push(event.content);
}
expect(chunks).toEqual([]);
});
});
Output:
PASS tests/sse-stream.test.ts
SSE stream consumption
✓ yields parsed events from SSE stream (3ms)
✓ handles empty stream (1ms)
Testing Cancellation with AbortController
Users cancel AI requests constantly. They change their mind, navigate away, or the response takes too long. Your code must handle AbortSignal properly:
import { describe, it, expect, vi } from "vitest";
interface StreamOptions {
signal?: AbortSignal;
}
// Function that respects abort signal
async function* streamWithAbort(
chunks: string[],
options: StreamOptions = {}
): AsyncGenerator<string> {
for (const chunk of chunks) {
// Check abort before each yield
if (options.signal?.aborted) {
throw new DOMException("Aborted", "AbortError");
}
yield chunk;
// Simulate network delay
await new Promise((resolve) => setTimeout(resolve, 20));
// Check abort after delay too
if (options.signal?.aborted) {
throw new DOMException("Aborted", "AbortError");
}
}
}
describe("abort handling", () => {
it("stops streaming when aborted", async () => {
const controller = new AbortController();
const chunks = ["one", "two", "three", "four", "five"];
const received: string[] = [];
// Abort after 50ms (should get ~2 chunks)
setTimeout(() => controller.abort(), 50);
try {
for await (const chunk of streamWithAbort(chunks, {
signal: controller.signal,
})) {
received.push(chunk);
}
// Should not reach here
expect.fail("Expected abort error");
} catch (error) {
expect((error as Error).name).toBe("AbortError");
}
// Should have received some but not all chunks
expect(received.length).toBeGreaterThan(0);
expect(received.length).toBeLessThan(5);
});
it("throws immediately if already aborted", async () => {
const controller = new AbortController();
controller.abort(); // Abort before starting
const stream = streamWithAbort(["chunk"], {
signal: controller.signal,
});
await expect(async () => {
for await (const _ of stream) {
// Should not yield anything
}
}).rejects.toThrow("Aborted");
});
it("completes normally without abort", async () => {
const chunks = ["a", "b", "c"];
const received: string[] = [];
for await (const chunk of streamWithAbort(chunks)) {
received.push(chunk);
}
expect(received).toEqual(["a", "b", "c"]);
});
});
Output:
PASS tests/abort.test.ts
abort handling
✓ stops streaming when aborted (55ms)
✓ throws immediately if already aborted (2ms)
✓ completes normally without abort (65ms)
Testing Cleanup on Cancellation
Cancellation should trigger cleanup—closing connections, resetting state, stopping timers:
import { describe, it, expect, vi } from "vitest";
class StreamConsumer {
private cleanedUp = false;
public chunks: string[] = [];
async consume(
stream: AsyncGenerator<string>,
signal?: AbortSignal
): Promise<void> {
try {
for await (const chunk of stream) {
if (signal?.aborted) {
throw new DOMException("Aborted", "AbortError");
}
this.chunks.push(chunk);
}
} finally {
// Cleanup runs on success, error, or cancellation
this.cleanup();
}
}
private cleanup(): void {
this.cleanedUp = true;
}
get wasCleanedUp(): boolean {
return this.cleanedUp;
}
}
describe("cleanup on cancellation", () => {
it("runs cleanup when stream completes normally", async () => {
const consumer = new StreamConsumer();
async function* stream() {
yield "a";
yield "b";
}
await consumer.consume(stream());
expect(consumer.chunks).toEqual(["a", "b"]);
expect(consumer.wasCleanedUp).toBe(true);
});
it("runs cleanup when stream is cancelled", async () => {
const consumer = new StreamConsumer();
const controller = new AbortController();
async function* stream() {
yield "a";
await new Promise((r) => setTimeout(r, 100));
yield "b"; // Won't reach here
}
// Abort after first chunk
setTimeout(() => controller.abort(), 20);
try {
await consumer.consume(stream(), controller.signal);
} catch {
// Expected abort error
}
expect(consumer.chunks).toEqual(["a"]);
expect(consumer.wasCleanedUp).toBe(true); // Cleanup still ran
});
it("runs cleanup when stream throws error", async () => {
const consumer = new StreamConsumer();
async function* stream() {
yield "a";
throw new Error("Stream failed");
}
try {
await consumer.consume(stream());
} catch {
// Expected error
}
expect(consumer.wasCleanedUp).toBe(true); // Cleanup still ran
});
});
Output:
PASS tests/cleanup.test.ts
cleanup on cancellation
✓ runs cleanup when stream completes normally (1ms)
✓ runs cleanup when stream is cancelled (25ms)
✓ runs cleanup when stream throws error (1ms)
Testing Chunk Ordering
Streaming UI must display chunks in order. Race conditions can cause chunks to appear out of sequence. Test that your code preserves ordering:
import { describe, it, expect, vi } from "vitest";
interface OrderedChunk {
index: number;
content: string;
}
// Processor that tracks ordering
class ChunkProcessor {
private processed: OrderedChunk[] = [];
private expectedIndex = 0;
public orderErrors: string[] = [];
async processStream(
stream: AsyncGenerator<OrderedChunk>
): Promise<string> {
for await (const chunk of stream) {
// Verify ordering
if (chunk.index !== this.expectedIndex) {
this.orderErrors.push(
`Expected index ${this.expectedIndex}, got ${chunk.index}`
);
}
this.processed.push(chunk);
this.expectedIndex++;
}
return this.processed.map((c) => c.content).join("");
}
}
describe("chunk ordering", () => {
it("detects correct ordering", async () => {
const processor = new ChunkProcessor();
async function* orderedStream() {
yield { index: 0, content: "A" };
yield { index: 1, content: "B" };
yield { index: 2, content: "C" };
}
const result = await processor.processStream(orderedStream());
expect(result).toBe("ABC");
expect(processor.orderErrors).toHaveLength(0);
});
it("detects out-of-order chunks", async () => {
const processor = new ChunkProcessor();
async function* outOfOrderStream() {
yield { index: 0, content: "A" };
yield { index: 2, content: "C" }; // Skipped index 1
yield { index: 1, content: "B" }; // Late arrival
}
await processor.processStream(outOfOrderStream());
expect(processor.orderErrors.length).toBeGreaterThan(0);
expect(processor.orderErrors[0]).toContain("Expected index 1, got 2");
});
it("detects duplicate indices", async () => {
const processor = new ChunkProcessor();
async function* duplicateStream() {
yield { index: 0, content: "A" };
yield { index: 1, content: "B" };
yield { index: 1, content: "B-duplicate" }; // Duplicate!
}
await processor.processStream(duplicateStream());
expect(processor.orderErrors).toContain("Expected index 2, got 1");
});
});
Output:
PASS tests/ordering.test.ts
chunk ordering
✓ detects correct ordering (1ms)
✓ detects out-of-order chunks (1ms)
✓ detects duplicate indices (1ms)
Testing Concurrent Chunk Processing
If you process chunks concurrently (for performance), test that results still appear in order:
import { describe, it, expect } from "vitest";
// Simulates async processing of chunks
async function processChunkAsync(
content: string,
delayMs: number
): Promise<string> {
await new Promise((r) => setTimeout(r, delayMs));
return content.toUpperCase();
}
// Processor that maintains order despite async processing
async function processStreamInOrder(
chunks: Array<{ content: string; delay: number }>
): Promise<string[]> {
// Start all processing concurrently
const promises = chunks.map(async (chunk, index) => ({
index,
result: await processChunkAsync(chunk.content, chunk.delay),
}));
// Wait for all and sort by original index
const results = await Promise.all(promises);
results.sort((a, b) => a.index - b.index);
return results.map((r) => r.result);
}
describe("concurrent processing with order preservation", () => {
it("maintains order when chunks finish out of order", async () => {
// Chunks with different processing times
// Third chunk is fastest, first is slowest
const chunks = [
{ content: "slow", delay: 30 }, // index 0, finishes last
{ content: "medium", delay: 20 }, // index 1, finishes second
{ content: "fast", delay: 10 }, // index 2, finishes first
];
const results = await processStreamInOrder(chunks);
// Despite finishing in different order, results are sorted
expect(results).toEqual(["SLOW", "MEDIUM", "FAST"]);
});
});
Output:
PASS tests/concurrent-ordering.test.ts
concurrent processing with order preservation
✓ maintains order when chunks finish out of order (35ms)
Testing Partial Responses
Streams can fail mid-response. Network errors, server crashes, and client disconnects all cause partial responses. Test graceful handling:
import { describe, it, expect } from "vitest";
interface PartialResult {
content: string;
complete: boolean;
error?: string;
}
async function consumeWithRecovery(
stream: AsyncGenerator<string>
): Promise<PartialResult> {
const parts: string[] = [];
try {
for await (const chunk of stream) {
parts.push(chunk);
}
return { content: parts.join(""), complete: true };
} catch (error) {
// Return partial content with error info
return {
content: parts.join(""),
complete: false,
error: (error as Error).message,
};
}
}
describe("partial response handling", () => {
it("returns complete response on success", async () => {
async function* successStream() {
yield "Hello";
yield " ";
yield "world";
}
const result = await consumeWithRecovery(successStream());
expect(result.complete).toBe(true);
expect(result.content).toBe("Hello world");
expect(result.error).toBeUndefined();
});
it("returns partial content on mid-stream failure", async () => {
async function* failingStream() {
yield "Started";
yield " processing";
throw new Error("Connection reset");
}
const result = await consumeWithRecovery(failingStream());
expect(result.complete).toBe(false);
expect(result.content).toBe("Started processing");
expect(result.error).toBe("Connection reset");
});
it("returns empty content on immediate failure", async () => {
async function* immediateFailure() {
throw new Error("Server unavailable");
yield "never reached"; // TypeScript knows this is unreachable
}
const result = await consumeWithRecovery(immediateFailure());
expect(result.complete).toBe(false);
expect(result.content).toBe("");
expect(result.error).toBe("Server unavailable");
});
it("handles empty stream", async () => {
async function* emptyStream() {
// Yields nothing
}
const result = await consumeWithRecovery(emptyStream());
expect(result.complete).toBe(true);
expect(result.content).toBe("");
});
});
Output:
PASS tests/partial-response.test.ts
partial response handling
✓ returns complete response on success (1ms)
✓ returns partial content on mid-stream failure (1ms)
✓ returns empty content on immediate failure (1ms)
✓ handles empty stream (1ms)
Testing Retry After Partial Failure
Some applications retry from the last successful point:
import { describe, it, expect, vi } from "vitest";
interface RetryableStream {
createStream: (startIndex: number) => AsyncGenerator<{ index: number; content: string }>;
}
async function consumeWithRetry(
source: RetryableStream,
maxRetries: number = 3
): Promise<string> {
const collected: string[] = [];
let retries = 0;
let lastIndex = -1;
while (retries <= maxRetries) {
try {
const stream = source.createStream(lastIndex + 1);
for await (const chunk of stream) {
collected[chunk.index] = chunk.content;
lastIndex = chunk.index;
}
// Success - stream completed
return collected.join("");
} catch (error) {
retries++;
if (retries > maxRetries) {
throw new Error(`Failed after ${maxRetries} retries: ${(error as Error).message}`);
}
// Continue loop to retry from lastIndex + 1
}
}
return collected.join("");
}
describe("retry from last position", () => {
it("resumes from last successful chunk", async () => {
let attemptCount = 0;
const source: RetryableStream = {
createStream: async function* (startIndex: number) {
attemptCount++;
// First attempt: fail after index 1
// Second attempt: succeed from index 2
if (attemptCount === 1 && startIndex === 0) {
yield { index: 0, content: "A" };
yield { index: 1, content: "B" };
throw new Error("Network error");
}
// Retry starts from index 2
if (startIndex === 2) {
yield { index: 2, content: "C" };
yield { index: 3, content: "D" };
}
},
};
const result = await consumeWithRetry(source);
expect(result).toBe("ABCD");
expect(attemptCount).toBe(2);
});
it("gives up after max retries", async () => {
const source: RetryableStream = {
createStream: async function* () {
yield { index: 0, content: "A" };
throw new Error("Persistent failure");
},
};
await expect(consumeWithRetry(source, 2)).rejects.toThrow(
"Failed after 2 retries"
);
});
});
Output:
PASS tests/retry.test.ts
retry from last position
✓ resumes from last successful chunk (2ms)
✓ gives up after max retries (2ms)
Complete Example: Testing a Chat Stream Consumer
Putting it all together, here's a comprehensive test suite for a streaming chat client:
// src/chat-stream.ts
export interface ChatStreamOptions {
signal?: AbortSignal;
onChunk?: (content: string) => void;
onComplete?: (fullContent: string) => void;
onError?: (error: Error, partialContent: string) => void;
}
export interface ChatStreamResult {
content: string;
complete: boolean;
chunkCount: number;
}
export async function consumeChatStream(
stream: AsyncGenerator<{ delta: string }>,
options: ChatStreamOptions = {}
): Promise<ChatStreamResult> {
const parts: string[] = [];
let chunkCount = 0;
try {
for await (const chunk of stream) {
if (options.signal?.aborted) {
throw new DOMException("Aborted", "AbortError");
}
parts.push(chunk.delta);
chunkCount++;
options.onChunk?.(chunk.delta);
}
const fullContent = parts.join("");
options.onComplete?.(fullContent);
return { content: fullContent, complete: true, chunkCount };
} catch (error) {
const partialContent = parts.join("");
options.onError?.(error as Error, partialContent);
return { content: partialContent, complete: false, chunkCount };
}
}
// tests/chat-stream.test.ts
import { describe, it, expect, vi, beforeEach } from "vitest";
import { consumeChatStream, ChatStreamOptions } from "../src/chat-stream";
describe("consumeChatStream", () => {
describe("successful streaming", () => {
it("assembles complete message from chunks", async () => {
async function* stream() {
yield { delta: "Hello" };
yield { delta: " " };
yield { delta: "world" };
}
const result = await consumeChatStream(stream());
expect(result.content).toBe("Hello world");
expect(result.complete).toBe(true);
expect(result.chunkCount).toBe(3);
});
it("calls onChunk for each chunk", async () => {
const onChunk = vi.fn();
async function* stream() {
yield { delta: "A" };
yield { delta: "B" };
}
await consumeChatStream(stream(), { onChunk });
expect(onChunk).toHaveBeenCalledTimes(2);
expect(onChunk).toHaveBeenNthCalledWith(1, "A");
expect(onChunk).toHaveBeenNthCalledWith(2, "B");
});
it("calls onComplete with full content", async () => {
const onComplete = vi.fn();
async function* stream() {
yield { delta: "Done" };
}
await consumeChatStream(stream(), { onComplete });
expect(onComplete).toHaveBeenCalledWith("Done");
});
});
describe("error handling", () => {
it("returns partial content on error", async () => {
async function* failingStream() {
yield { delta: "Partial" };
throw new Error("Stream failed");
}
const result = await consumeChatStream(failingStream());
expect(result.content).toBe("Partial");
expect(result.complete).toBe(false);
});
it("calls onError with error and partial content", async () => {
const onError = vi.fn();
async function* failingStream() {
yield { delta: "Started" };
throw new Error("Network error");
}
await consumeChatStream(failingStream(), { onError });
expect(onError).toHaveBeenCalledWith(
expect.any(Error),
"Started"
);
expect(onError.mock.calls[0][0].message).toBe("Network error");
});
});
describe("cancellation", () => {
it("stops on abort signal", async () => {
const controller = new AbortController();
async function* slowStream() {
yield { delta: "One" };
await new Promise((r) => setTimeout(r, 50));
yield { delta: "Two" }; // Should not reach
}
setTimeout(() => controller.abort(), 20);
const result = await consumeChatStream(slowStream(), {
signal: controller.signal,
});
expect(result.content).toBe("One");
expect(result.complete).toBe(false);
});
it("reports abort as error", async () => {
const controller = new AbortController();
const onError = vi.fn();
controller.abort(); // Already aborted
async function* stream() {
yield { delta: "Never" };
}
await consumeChatStream(stream(), {
signal: controller.signal,
onError,
});
expect(onError).toHaveBeenCalled();
expect(onError.mock.calls[0][0].name).toBe("AbortError");
});
});
describe("edge cases", () => {
it("handles empty stream", async () => {
async function* emptyStream() {
// No yields
}
const result = await consumeChatStream(emptyStream());
expect(result.content).toBe("");
expect(result.complete).toBe(true);
expect(result.chunkCount).toBe(0);
});
it("handles single empty chunk", async () => {
async function* singleEmpty() {
yield { delta: "" };
}
const result = await consumeChatStream(singleEmpty());
expect(result.content).toBe("");
expect(result.chunkCount).toBe(1);
});
});
});
Output:
PASS tests/chat-stream.test.ts
consumeChatStream
successful streaming
✓ assembles complete message from chunks (1ms)
✓ calls onChunk for each chunk (1ms)
✓ calls onComplete with full content (1ms)
error handling
✓ returns partial content on error (1ms)
✓ calls onError with error and partial content (1ms)
cancellation
✓ stops on abort signal (25ms)
✓ reports abort as error (1ms)
edge cases
✓ handles empty stream (1ms)
✓ handles single empty chunk (1ms)
Test Files 1 passed (1)
Tests 9 passed (9)
Quick Reference
| Pattern | Use Case |
|---|---|
async function* mock | Simulate streaming responses |
yield with delays | Test timing-sensitive code |
createSSEStream() | Mock Server-Sent Events |
AbortController | Test cancellation handling |
| Index tracking | Verify chunk ordering |
| Try/catch in consumer | Handle partial responses |
finally for cleanup | Ensure cleanup runs always |
Try With AI
Prompt 1: Mock Complex Stream Shapes
I need to test an AI SDK that returns tool calls mid-stream:
const stream = client.stream({
messages: [...],
tools: [{ name: "search", ... }]
});
for await (const chunk of stream) {
if (chunk.type === "content") {
// Text content
} else if (chunk.type === "tool_call") {
// Tool invocation
}
}
Create a mock stream factory that can yield both content
chunks and tool call chunks. Include an example test that
verifies tool calls are handled correctly.
What you're learning: How to mock complex, multi-type streaming responses. AI assistants frequently interleave content with tool calls, and your tests need to verify both paths work correctly.
Prompt 2: Test Backpressure Handling
My streaming consumer processes chunks slower than they arrive.
I want to test that:
1. No chunks are dropped
2. The stream doesn't buffer infinitely
3. Slow processing doesn't cause memory issues
Create a test that simulates a fast producer and slow consumer.
How do I verify chunks aren't lost? How do I detect if the
buffer grows too large?
What you're learning: Testing backpressure is critical for real-world streaming. Fast networks and slow UI updates can cause memory issues. This prompt teaches you to think about producer-consumer dynamics.
Prompt 3: Race Condition Detection
I have a bug where sometimes chunks appear out of order in the UI.
It only happens under load. How do I write a test that
reliably reproduces this race condition?
The stream consumer looks like:
async function consume(stream) {
for await (const chunk of stream) {
await updateUI(chunk); // Async operation
}
}
Create a test that intentionally triggers the race condition
and verify my fix prevents it.
What you're learning: Race conditions in async code are notoriously hard to reproduce. This prompt teaches techniques for making race conditions deterministic in tests, letting you verify fixes work.
Safety note: When testing streaming code, be careful with infinite loops. Always include timeouts or maximum iteration counts in your tests. A mock stream that never ends will hang your test suite. Use Vitest's --timeout flag or add explicit timeouts to async tests.