Threading Basics
A thread is like a second worker inside your program. Both work at the same time. This lesson shows you how to create threads for I/O tasks and why they are not the final answer.
threading.Thread for I/O concurrency. GIL prevents CPU parallelism. Brief coverage; async is the focus of this chapter. This lesson motivates the transition.
In Lesson 1, James timed his sequential export: 30 files, 0.1 seconds each, 3 seconds total. Every file write blocked the next one.
Emma opens a new file. "Python has threads. Let me show you a quick demo, then I will show you something better."
Your First Thread
Create a file called thread_demo.py:
import threading
import time
def slow_task(name: str, duration: float) -> None:
"""Simulate a slow I/O operation."""
print(f" {name}: starting")
time.sleep(duration)
print(f" {name}: done ({duration}s)")
def main() -> None:
start: float = time.perf_counter()
thread_a = threading.Thread(target=slow_task, args=("Task A", 2.0))
thread_b = threading.Thread(target=slow_task, args=("Task B", 1.0))
thread_a.start()
thread_b.start()
thread_a.join()
thread_b.join()
elapsed: float = time.perf_counter() - start
print(f"\nTotal time: {elapsed:.2f} seconds")
if __name__ == "__main__":
main()
Run it:
uv run python thread_demo.py
Output:
Task A: starting
Task B: starting
Task B: done (1.0s)
Task A: done (2.0s)
Total time: 2.01 seconds
Two things to notice:
- Both tasks started immediately.
start()launches the thread and returns without waiting. Task B started while Task A was still sleeping. - Total time is 2 seconds, not 3. Task A (2s) and Task B (1s) overlapped. The total equals the duration of the longest task.
Here is the lifecycle of a thread:
| Step | Method | What it does |
|---|---|---|
| Create | threading.Thread(target=func, args=(...)) | Defines the thread but does not run it |
| Start | thread.start() | Begins execution in the background |
| Join | thread.join() | Waits for the thread to finish before continuing |
What happens without join()? The main program might exit before the threads finish. join() tells the main program: "Wait here until this thread completes."
Try removing both join() calls and running the script. The output order may change, and the "Total time" line may print before the tasks finish.
Threading the Export
Apply threads to the SmartNotes export problem. Create threaded_export.py:
import threading
import time
from pathlib import Path
def export_note(note_id: int, fmt: str, delay: float) -> None:
"""Simulate exporting a single note."""
time.sleep(delay)
print(f" Wrote note_{note_id}.{fmt}")
def export_sequential(count: int, formats: list[str]) -> float:
"""Export notes one at a time. Return elapsed seconds."""
start: float = time.perf_counter()
for fmt in formats:
for i in range(count):
export_note(i, fmt, 0.1)
return time.perf_counter() - start
def export_threaded(count: int, formats: list[str]) -> float:
"""Export notes with threads. Return elapsed seconds."""
start: float = time.perf_counter()
threads: list[threading.Thread] = []
for fmt in formats:
for i in range(count):
t = threading.Thread(
target=export_note, args=(i, fmt, 0.1)
)
threads.append(t)
t.start()
for t in threads:
t.join()
return time.perf_counter() - start
def main() -> None:
count: int = 10
formats: list[str] = ["md", "json", "csv"]
total_files: int = count * len(formats)
print(f"--- Sequential ({total_files} files) ---")
seq_time: float = export_sequential(count, formats)
print(f"Sequential: {seq_time:.2f}s\n")
print(f"--- Threaded ({total_files} files) ---")
thr_time: float = export_threaded(count, formats)
print(f"Threaded: {thr_time:.2f}s\n")
print(f"Speedup: {seq_time / thr_time:.1f}x")
if __name__ == "__main__":
main()
Run it:
uv run python threaded_export.py
Output:
--- Sequential (30 files) ---
Wrote note_0.md
...
Wrote note_9.csv
Sequential: 3.02s
--- Threaded (30 files) ---
Wrote note_0.md
...
Wrote note_9.csv
Threaded: 0.11s
Speedup: 27.5x
Thirty threads, each sleeping 0.1 seconds, all running at the same time. Total time is approximately 0.1 seconds instead of 3 seconds. The overlap is massive for I/O-bound work.
Why Threads Are Not the Answer
Threads fixed the timing problem. So why not stop here? Two reasons.
Reason 1: The GIL Blocks CPU Work
Create a file called cpu_threads.py:
import threading
import time
def cpu_work(label: str) -> None:
"""Simulate CPU-bound work (no I/O, pure computation)."""
total: int = 0
for i in range(5_000_000):
total += i
print(f" {label}: done (total={total})")
def main() -> None:
# Sequential
start: float = time.perf_counter()
cpu_work("Sequential A")
cpu_work("Sequential B")
seq_time: float = time.perf_counter() - start
print(f"Sequential: {seq_time:.2f}s\n")
# Threaded
start = time.perf_counter()
t1 = threading.Thread(target=cpu_work, args=("Thread A",))
t2 = threading.Thread(target=cpu_work, args=("Thread B",))
t1.start()
t2.start()
t1.join()
t2.join()
thr_time: float = time.perf_counter() - start
print(f"Threaded: {thr_time:.2f}s")
print(f"Speedup: {seq_time / thr_time:.1f}x")
if __name__ == "__main__":
main()
Run it:
uv run python cpu_threads.py
Output:
Sequential A: done (total=12499997500000)
Sequential B: done (total=12499997500000)
Sequential: 0.58s
Thread A: done (total=12499997500000)
Thread B: done (total=12499997500000)
Threaded: 0.59s
Speedup: 1.0x
No speedup. The GIL allows only one thread to run Python code at a time. For CPU-bound work, threads take turns rather than running simultaneously. Two threads doing CPU work take the same time as doing them one after the other.
Reason 2: Shared State Risks
When multiple threads access the same variable, unpredictable things happen. This is called a race condition. A brief example:
import threading
counter: int = 0
def increment() -> None:
global counter
for _ in range(100_000):
counter += 1
def main() -> None:
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Expected: 200000, Got: {counter}")
if __name__ == "__main__":
main()
Output (varies each run):
Expected: 200000, Got: 148923
Two threads both reading and writing counter at the same time. Some increments get lost because both threads read the same value, add 1, and write it back, overwriting each other's work. Fixing this requires locks, which add complexity.
For SmartNotes, the threads do not share state (each writes a different file), so race conditions are not a concern here. But in more complex programs, shared state and threads create bugs that are hard to find and harder to fix.
The Verdict
| Consideration | Threads | Async (next lesson) |
|---|---|---|
| I/O speedup | Yes | Yes |
| CPU speedup | No (GIL) | No (single-threaded) |
| Shared state risks | Yes (race conditions) | No (cooperative switching) |
| Complexity | Moderate (locks, joins) | Low (await keyword) |
| Python ecosystem | Older, well-supported | Modern, growing rapidly |
Threading works for I/O but carries complexity baggage. async/await gives the same I/O concurrency without the shared state risks. For SmartNotes, async is the better tool.
PRIMM-AI+ Practice: Predict Thread Timing
Predict [AI-FREE]
Press Shift+Tab to enter Plan Mode.
You have three threads:
thread_a = threading.Thread(target=slow_task, args=("A", 3.0))
thread_b = threading.Thread(target=slow_task, args=("B", 1.0))
thread_c = threading.Thread(target=slow_task, args=("C", 2.0))
All three start at the same time. Predict on paper:
- Which thread finishes first?
- Which thread finishes last?
- What is the total wall-clock time?
Check your predictions
- Thread B finishes first (1.0 seconds)
- Thread A finishes last (3.0 seconds)
- Total time is approximately 3.0 seconds (the longest thread)
When threads run concurrently, total time equals the duration of the slowest thread, not the sum.
Run
Press Shift+Tab to exit Plan Mode.
Create three_threads.py with the three threads above, start all three, join all three, and print the total time. Compare to your prediction.
Investigate
Run /investigate @three_threads.py in Claude Code and ask: "Why does total time equal the longest thread instead of the sum? What happens inside the operating system when I call start() on three threads?"
Modify
Add a fourth thread with a 0.5-second duration. Predict the new total time before running. Then change thread A's duration to 10 seconds and predict again. Run both and verify.
Make [Mastery Gate]
Write a function run_threaded(durations: list[float]) -> float that takes a list of simulated I/O durations, runs each in a separate thread, and returns the total elapsed time. Use /tdg in Claude Code:
- Write the stub with types and docstring
- Write 3+ tests: empty list returns near-zero, single item returns approximately that duration, multiple items return approximately the maximum duration
- Generate the implementation
- Verify with
uv run ruff check,uv run pyright,uv run pytest
Try With AI
If Claude Code is not already running, open your terminal, navigate to your SmartNotes project folder, and type claude. If you need a refresher, Chapter 44 covers the setup.
Prompt 1: Thread Safety Analysis
Here is my threaded export function:
[paste export_threaded from this lesson]
Is this code thread-safe? Could any race condition occur?
Explain what thread safety means in this context and why
this specific code is safe even without locks.
What you're learning: Thread safety analysis is a critical skill. Most threaded I/O code is safe because each thread writes to a different file. But the moment threads share data (a counter, a list, a dictionary), you need locks. The AI helps you build the judgment of "safe by design" vs "safe by accident."
Prompt 2: ThreadPoolExecutor Comparison
Rewrite my threaded_export function using
concurrent.futures.ThreadPoolExecutor instead of
raw threading.Thread. Show both versions side by side.
Which is simpler? When would I use one vs the other?
What you're learning: ThreadPoolExecutor is a higher-level abstraction over threads. It manages a pool of workers and distributes tasks automatically. Understanding both the low-level (Thread) and high-level (ThreadPoolExecutor) APIs helps you choose the right tool.
Prompt 3: Threading in My Domain
I work in [your field: logistics/finance/marketing/etc].
Give me one realistic scenario where Python threads would
help with I/O-bound work and one where threads would NOT
help because the work is CPU-bound. For each, show a
simplified code sketch.
What you're learning: Connecting programming patterns to your domain makes them sticky. The I/O-bound example (fetching data from APIs, writing reports to disk) reinforces when threads help. The CPU-bound example (processing large datasets, running calculations) reinforces when they do not.
James looks at the timing comparison: 3 seconds sequential, 0.1 seconds threaded. "Threads are like hiring a second forklift driver. Works, but now you need to coordinate so they do not collide."
"Exactly," Emma says. "And that coordination is where threads get expensive. The race condition example? That was two threads incrementing one number. Imagine fifty threads updating a shared dictionary. I spent two days debugging a thread issue like that in a data pipeline." She shakes her head. "The bug only appeared under heavy load. In testing, everything looked fine."
James raises an eyebrow. "So threads work but carry risks?"
"For I/O, yes. And Python has a better tool for I/O concurrency. async and await. One thread, no shared state risks, same performance benefit for I/O work. Think of it as a single forklift driver who is very good at not waiting."
"One driver who never stands idle?"
"Never stands idle. When one dock says 'I need 30 seconds to strap this pallet,' the driver immediately moves to the next dock. No second driver needed. No coordination problems. That is async."