Processing Pipelines
A pipeline is a sequence of steps where each step takes the output of the previous step as input. Think of a factory assembly line: raw materials enter, each station transforms them, and a finished product exits. This lesson teaches you to chain file operations (read, filter, export) into pipelines.
This lesson covers the read → transform → write pattern for file I/O, with try/except for graceful error handling. The pipeline pattern is functional composition applied to data processing: each function takes data in, transforms it, and passes it out.
James has six functions: save and load for Markdown, JSON, and CSV. Each works independently. But his real-world task is not "load a JSON file." It is "load the notebook, find all notes tagged 'python', and export them as Markdown files for a study guide."
"That is three steps," Emma says. "Read. Filter. Write. A pipeline."
"Like the order fulfillment process at the warehouse," James says. "Receive the order, pick the items, pack and ship. Each step feeds the next."
"Exactly. And like the warehouse, if any step fails, you need to know where it failed and why."
The Read → Transform → Write Pattern
Every data processing task follows the same three-stage structure:
┌──────────┐ ┌──────────────┐ ┌──────────┐
│ READ │ ──→ │ TRANSFORM │ ──→ │ WRITE │
│ │ │ │ │ │
│ JSON file│ │ Filter, sort │ │ Markdown │
│ CSV file │ │ compute │ │ JSON │
│ Text file│ │ merge │ │ CSV │
└──────────┘ └──────────────┘ └──────────┘
The read stage produces a list of Note objects. The transform stage filters, sorts, or modifies that list. The write stage saves the result to a new file.
Here is a concrete pipeline: read a JSON notebook, filter for notes tagged "python", and export each matching note as a Markdown file:
from pathlib import Path
from dataclasses import dataclass, field, asdict
import json
@dataclass
class Note:
title: str
body: str
word_count: int
author: str = "Anonymous"
is_draft: bool = True
tags: list[str] = field(default_factory=list)
# Stage 1: READ
def load_notebook(file_path: Path) -> list[Note]:
"""Load notes from a JSON file."""
if not file_path.exists():
return []
data = json.loads(file_path.read_text())
notes: list[Note] = []
for note_dict in data:
notes.append(Note(**note_dict))
return notes
# Stage 2: TRANSFORM
def filter_by_tag(notes: list[Note], tag: str) -> list[Note]:
"""Return only notes that have the given tag."""
result: list[Note] = []
for note in notes:
if tag in note.tags:
result.append(note)
return result
# Stage 3: WRITE
def export_as_markdown(notes: list[Note], directory: Path) -> int:
"""Export notes as individual Markdown files. Return count."""
directory.mkdir(parents=True, exist_ok=True)
for note in notes:
filename = note.title.lower().replace(" ", "-") + ".md"
content = f"# {note.title}\n\n{note.body}\n"
(directory / filename).write_text(content)
return len(notes)
Chain them together:
# The pipeline
notes = load_notebook(Path("data/notebook.json"))
python_notes = filter_by_tag(notes, "python")
count = export_as_markdown(python_notes, Path("exports/python"))
print(f"Exported {count} Python notes")
Output:
Exported 2 Python notes
Three lines of code. Each line is one stage. The output of load_notebook feeds into filter_by_tag, which feeds into export_as_markdown. That is a pipeline.
Building Transform Functions
The transform stage is where the interesting work happens. Here are reusable transform functions you can chain in any combination:
def filter_by_tag(notes: list[Note], tag: str) -> list[Note]:
"""Return only notes that have the given tag."""
result: list[Note] = []
for note in notes:
if tag in note.tags:
result.append(note)
return result
def exclude_drafts(notes: list[Note]) -> list[Note]:
"""Return only published notes."""
result: list[Note] = []
for note in notes:
if not note.is_draft:
result.append(note)
return result
def sort_by_word_count(notes: list[Note], descending: bool = True) -> list[Note]:
"""Sort notes by word count.
sorted() can take an optional key= parameter: a function that tells
it what value to compare. Here we define a small helper that returns
the word count of a note. (Chapter 64 covers key functions in depth.)
"""
def get_word_count(note: Note) -> int:
return note.word_count
return sorted(notes, key=get_word_count, reverse=descending)
def filter_by_author(notes: list[Note], author: str) -> list[Note]:
"""Return only notes by the given author."""
result: list[Note] = []
for note in notes:
if note.author == author:
result.append(note)
return result
Chain multiple transforms:
# Pipeline: published Python notes by James, longest first
notes = load_notebook(Path("data/notebook.json"))
result = filter_by_tag(notes, "python")
result = exclude_drafts(result)
result = filter_by_author(result, "James")
result = sort_by_word_count(result)
count = export_as_markdown(result, Path("exports/james-python"))
print(f"Exported {count} notes")
Each transform takes a list[Note] and returns a list[Note]. Because the input and output types match, you can chain them in any order. This composability is what makes the pipeline pattern powerful.
Error Handling for I/O Operations
File I/O can fail. The file might not exist. The directory might be read-only. The JSON might be malformed. You need to handle these failures without crashing.
One common approach: instead of letting the function crash with an exception, return a tuple of two values: the data and an error message. If the operation succeeds, the error is None. If it fails, the data is empty and the error explains what went wrong. The caller unpacks the tuple and decides what to do:
# Pattern: (data, error) tuple
notes, error = load_something(path)
if error:
print(f"Failed: {error}")
else:
print(f"Got {len(notes)} notes")
Here is a full example:
import json
from pathlib import Path
def safe_load_notebook(file_path: Path) -> tuple[list[Note], str | None]:
"""Load notes from a JSON file, returning an error message on failure.
Returns a tuple of (notes, error). If loading succeeds, error is None.
If loading fails, notes is an empty list and error describes the problem.
"""
if not file_path.exists():
return [], f"File not found: {file_path}"
try:
text = file_path.read_text(encoding="utf-8")
except PermissionError:
return [], f"Permission denied: {file_path}"
try:
data = json.loads(text)
except json.JSONDecodeError as e:
return [], f"Invalid JSON in {file_path}: {e}"
try:
notes: list[Note] = []
for note_dict in data:
notes.append(Note(**note_dict))
except TypeError as e:
return [], f"Invalid note data in {file_path}: {e}"
return notes, None
Each try/except block catches one specific kind of failure:
| Error | What went wrong | Example |
|---|---|---|
| File not found | File does not exist (checked with exists()) | Wrong path, typo in filename |
PermissionError | Cannot read the file | System file, locked by another program |
json.JSONDecodeError | File is not valid JSON | Corrupted file, wrong format |
TypeError | JSON data does not match Note fields | Missing field, extra field |
Use it in a pipeline:
notes, error = safe_load_notebook(Path("data/notebook.json"))
if error:
print(f"Error: {error}")
else:
result = filter_by_tag(notes, "python")
count = export_as_markdown(result, Path("exports/python"))
print(f"Exported {count} notes")
Output (if the file is missing):
Error: File not found: data/notebook.json
Output (if the file is valid):
Exported 2 notes
The pipeline does not crash. It reports the error clearly and stops processing. This is the difference between a program you run during development and a program you can give to someone else.
A Complete Pipeline Function
Wrap the entire pipeline in a single function:
def export_tagged_notes(
source: Path,
tag: str,
output_dir: Path,
format: str = "markdown",
) -> tuple[int, str | None]:
"""Read notes from a JSON file, filter by tag, and export.
- format: "markdown" or "json"
- Returns (count_exported, error_message_or_none)
"""
notes, error = safe_load_notebook(source)
if error:
return 0, error
filtered = filter_by_tag(notes, tag)
if not filtered:
return 0, None
if format == "markdown":
count = export_as_markdown(filtered, output_dir)
elif format == "json":
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / f"{tag}-notes.json"
data: list[dict] = []
for note in filtered:
data.append(asdict(note))
output_path.write_text(json.dumps(data, indent=2))
count = len(filtered)
else:
return 0, f"Unknown format: {format}"
return count, None
Call it:
count, error = export_tagged_notes(
source=Path("data/notebook.json"),
tag="python",
output_dir=Path("exports/python"),
format="markdown",
)
if error:
print(f"Pipeline failed: {error}")
else:
print(f"Pipeline complete: {count} notes exported")
Output:
Pipeline complete: 2 notes exported
PRIMM-AI+ Practice: Predict the Pipeline
Predict [AI-FREE]
Press Shift+Tab to enter Plan Mode.
Given a notebook with these notes:
| Title | Author | Tags | Draft? | Word Count |
|---|---|---|---|---|
| Python Tips | James | python, beginner | True | 50 |
| Cooking Pasta | Emma | cooking | False | 30 |
| Debug Guide | James | python, debug | False | 120 |
| Git Basics | James | git | True | 80 |
What does this pipeline produce?
notes = load_notebook(Path("notebook.json"))
result = filter_by_tag(notes, "python")
result = exclude_drafts(result)
result = sort_by_word_count(result)
How many notes remain? In what order? Write your answer.
Check your prediction
1 note: Debug Guide (word_count=120).
filter_by_tag("python") keeps Python Tips and Debug Guide. exclude_drafts() removes Python Tips (is_draft=True). Only Debug Guide remains. sort_by_word_count has nothing to sort with one item.
Run
Press Shift+Tab to exit Plan Mode.
Create the notebook JSON file with the four notes above, then run the pipeline. Verify the result matches your prediction.
Investigate
If you want to go deeper, run /investigate @pipeline_practice.py in Claude Code and ask: "What is the difference between returning an error tuple like (data, error) and raising an exception? When should I use each approach?"
Both approaches are valid. The AI explains the tradeoffs: exceptions are conventional in Python, but return values make pipeline composition easier.
Modify
Add a limit parameter to the pipeline: export_tagged_notes(..., limit: int | None = None). If limit is provided, export only the first N matching notes. Update the function and test it.
Make [Mastery Gate]
Write a function convert_format(source: Path, target: Path) -> tuple[int, str | None] that reads notes from a file (auto-detecting JSON or CSV by extension) and writes them to the target file (auto-detecting format by extension). In Claude Code, type /tdg to guide you through the cycle:
- Write the stub with types and docstring
- Write 4+ tests (JSON→CSV, CSV→JSON, missing file, unknown extension)
- Prompt AI to implement
- Run
uv run ruff check,uv run pyright,uv run pytest
Try With AI
If Claude Code is not already running, open your terminal, navigate to your SmartNotes project folder, and type claude. If you need a refresher, Chapter 44 covers the setup.
Prompt 1: Review My Pipeline
Here is my export_tagged_notes pipeline:
[paste your function]
Review the error handling. Are there any failure modes
I am not catching? What happens if the output directory
is on a full disk? What if two notes produce the same
filename?
What you're learning: Pipeline review goes beyond "does it work." The AI identifies failure modes you did not consider: disk space, filename collisions, race conditions. Production pipelines need these guards.
Prompt 2: Add Logging
Add logging to my pipeline so I can see what it does
at each stage. Use print statements (not the logging
module). Show: how many notes were loaded, how many
passed each filter, how many were exported, and the
total time taken.
What you're learning: Observability makes pipelines debuggable. When a pipeline produces unexpected results, log output at each stage helps you identify where the data went wrong.
Prompt 3: Build a Multi-Format Pipeline
In Claude Code, type:
/tdg
Use the TDG workflow to write and test batch_export(source: Path, output_dir: Path) -> dict[str, int] that reads a JSON notebook and exports notes in all three formats: Markdown files, a JSON backup, and a CSV spreadsheet. Return a dictionary with the count for each format: {"markdown": 3, "json": 3, "csv": 3}.
What you're learning: You are composing all three I/O skills from this chapter into a single pipeline. The TDG cycle ensures correctness across all formats.
James steps back and looks at his code. Three read functions, three write functions, four transform functions, and a pipeline that chains them together. He runs the pipeline: JSON in, filtered Markdown out.
"At the warehouse," he says, "we called this the pick-pack-ship process. The order arrives, items get picked from shelves, packed into boxes, and shipped to the customer. If picking fails, you stop the whole process and report the error. You do not pack an empty box."
Emma nods. "Your error handling does the same thing. If safe_load_notebook fails, the pipeline stops and reports why. No empty exports."
"But the code is getting long," James says. "Six I/O functions, four transforms, the pipeline function, the Note class. All in one file. That worked when SmartNotes was a single function. It does not work at this scale."
"That is exactly the problem Chapter 63 solves," Emma says. "Modules and packages. You split your code into files the same way you split your data into formats: each file handles one responsibility."