Running Tasks Concurrently
asyncio.gather() runs multiple async functions at the same time and waits for all of them to finish. Instead of waiting for each export to complete before starting the next, gather starts them all together. This is where async pays off.
asyncio.gather for fan-out, create_task for background work. This lesson is where sequential 5-second exports drop to under 1 second. gather returns results in the same order as the input coroutines; create_task schedules a coroutine on the event loop and returns a Task handle.
In Lesson 3, James wrote async def export_note() and ran it with asyncio.run(). It worked. But it exported one note at a time. Five notes, five waits, five seconds.
"I have a coroutine that handles one note," James tells Emma. "I need to run five of them at once."
"That is exactly what asyncio.gather does," Emma says. "You give it a list of coroutines. It starts all of them, waits for all of them, and returns all the results. The total time is the slowest single task, not the sum."
asyncio.gather: Run Many at Once
asyncio.gather() takes multiple coroutines and runs them concurrently. Here is the pattern with the simulated export from Lesson 3:
import asyncio
import time
async def export_note(note_id: int) -> str:
"""Simulate exporting a note (1 second of I/O)."""
await asyncio.sleep(1) # Simulates file write
return f"note_{note_id}.md"
async def main() -> None:
start = time.perf_counter()
# Sequential: one at a time
results_seq: list[str] = []
for i in range(5):
result = await export_note(i)
results_seq.append(result)
seq_time = time.perf_counter() - start
start = time.perf_counter()
# Concurrent: all at once
results_conc: list[str] = await asyncio.gather(
export_note(0),
export_note(1),
export_note(2),
export_note(3),
export_note(4),
)
conc_time = time.perf_counter() - start
print(f"Sequential: {seq_time:.2f}s for {len(results_seq)} notes")
print(f"Concurrent: {conc_time:.2f}s for {len(results_conc)} notes")
asyncio.run(main())
Output:
Sequential: 5.01s for 5 notes
Concurrent: 1.00s for 5 notes
Five sequential exports take 5 seconds (1 second each, back to back). Five concurrent exports take 1 second (all sleeping at the same time). The event loop starts all five coroutines, suspends each at await asyncio.sleep(1), and resumes them when the sleep finishes. Since they all sleep simultaneously, the total wait is just one sleep duration.
gather returns results in the same order you passed the coroutines. results_conc[0] is always from export_note(0), regardless of which coroutine finished first.
When you have a list of items to process, use the unpacking operator:
async def export_many(note_ids: list[int]) -> list[str]:
"""Export multiple notes concurrently."""
coroutines = [export_note(nid) for nid in note_ids]
results: list[str] = await asyncio.gather(*coroutines)
return results
The *coroutines unpacks the list into separate arguments for gather.
asyncio.create_task: Background Work
asyncio.gather waits for everything to finish before continuing. Sometimes you want to start a task in the background, do other work, and collect the result later. That is asyncio.create_task.
import asyncio
async def slow_backup() -> str:
"""Simulate a slow backup operation."""
await asyncio.sleep(3)
return "backup_complete"
async def quick_status() -> str:
"""Simulate a quick status check."""
await asyncio.sleep(0.5)
return "all_systems_ok"
async def main() -> None:
# Start backup in the background
backup_task: asyncio.Task[str] = asyncio.create_task(slow_backup())
# Do other work while backup runs
status = await quick_status()
print(f"Status: {status}")
# Now wait for the backup to finish
backup_result = await backup_task
print(f"Backup: {backup_result}")
asyncio.run(main())
Output:
Status: all_systems_ok
Backup: backup_complete
create_task schedules the coroutine on the event loop immediately. The backup starts running as soon as you create the task. While the backup sleeps for 3 seconds, quick_status() runs and finishes in 0.5 seconds. When you await backup_task, the backup has already been running for 0.5 seconds, so only 2.5 seconds remain.
The key difference:
| Pattern | Behavior |
|---|---|
await coroutine() | Runs and waits. Nothing else happens until it finishes. |
asyncio.gather(a(), b()) | Runs all concurrently, waits for all to finish. |
asyncio.create_task(a()) | Starts running immediately. You await the result when you need it. |
SmartNotes Concurrent Export
Time to combine these tools. Build async_export_all using gather to export all notes concurrently:
import asyncio
import time
from pathlib import Path
from dataclasses import dataclass
@dataclass
class Note:
title: str
body: str
tags: list[str]
async def export_note(note: Note, directory: Path) -> Path:
"""Export a single note to a Markdown file."""
slug = note.title.lower().replace(" ", "-")
filepath = directory / f"{slug}.md"
content = f"# {note.title}\n\n{note.body}\n\nTags: {', '.join(note.tags)}\n"
filepath.write_text(content)
await asyncio.sleep(0.2) # Simulates slow I/O
return filepath
async def async_export_all(
notes: list[Note], directory: Path
) -> int:
"""Export all notes concurrently. Returns count of exported files."""
directory.mkdir(parents=True, exist_ok=True)
coroutines = [export_note(note, directory) for note in notes]
results: list[Path] = await asyncio.gather(*coroutines)
return len(results)
def sequential_export_all(
notes: list[Note], directory: Path
) -> int:
"""Export all notes sequentially (for timing comparison)."""
import time as sync_time
directory.mkdir(parents=True, exist_ok=True)
count = 0
for note in notes:
slug = note.title.lower().replace(" ", "-")
filepath = directory / f"{slug}.md"
content = f"# {note.title}\n\n{note.body}\n\nTags: {', '.join(note.tags)}\n"
filepath.write_text(content)
sync_time.sleep(0.2) # Same simulated delay
count += 1
return count
async def main() -> None:
notes = [
Note("Python Basics", "Variables and types", ["python"]),
Note("Async Intro", "Coroutines and await", ["python", "async"]),
Note("CLI Tools", "Building with argparse", ["python", "cli"]),
Note("File IO", "Reading and writing files", ["python"]),
Note("Testing", "Pytest fundamentals", ["python", "testing"]),
]
directory = Path("export_output")
# Sequential timing
start = time.perf_counter()
seq_count = sequential_export_all(notes, directory)
seq_time = time.perf_counter() - start
# Concurrent timing
start = time.perf_counter()
conc_count = await async_export_all(notes, directory)
conc_time = time.perf_counter() - start
print(f"Sequential: {seq_time:.2f}s ({seq_count} notes)")
print(f"Concurrent: {conc_time:.2f}s ({conc_count} notes)")
print(f"Speedup: {seq_time / conc_time:.1f}x")
asyncio.run(main())
Output:
Sequential: 1.01s (5 notes)
Concurrent: 0.20s (5 notes)
Speedup: 5.0x
Five notes at 0.2 seconds each: sequential takes 1 second, concurrent takes 0.2 seconds. The speedup scales with the number of notes. Ten notes would be 2 seconds vs 0.2 seconds. One hundred notes would be 20 seconds vs 0.2 seconds. For I/O-bound work, asyncio.gather turns total time from the sum of all durations into approximately the duration of the slowest single task.
PRIMM-AI+ Practice: Predict the Gather
Predict [AI-FREE]
Press Shift+Tab to enter Plan Mode.
Given this code:
import asyncio
import time
async def fetch(label: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"{label} done"
async def main() -> None:
start = time.perf_counter()
results = await asyncio.gather(
fetch("A", 2.0),
fetch("B", 1.0),
fetch("C", 3.0),
)
elapsed = time.perf_counter() - start
print(results)
print(f"{elapsed:.1f}s")
asyncio.run(main())
- How long does the program take? (Not 6 seconds. Why?)
- What is the order of items in
results? - Which coroutine finishes first? Does that change the order in
results?
Write your answers before running.
Check your prediction
- About 3.0 seconds. All three coroutines sleep concurrently. The total time equals the longest sleep (C at 3.0s), not the sum (6.0s).
['A done', 'B done', 'C done']. Results follow input order, not completion order.- B finishes first (1.0s), then A (2.0s), then C (3.0s). But
resultsalways matches the order you passed togather.
Run
Press Shift+Tab to exit Plan Mode.
Create the file and run it. Verify the timing matches your prediction.
Investigate
Change fetch("C", 3.0) to fetch("C", 0.5). Before running, predict the new total time and the result order. Run it and check.
Then ask Claude Code:
What happens if one of the coroutines in asyncio.gather raises
an exception? Does it cancel the others? Show me with an example.
The AI demonstrates that by default, if one coroutine fails, gather cancels the remaining coroutines and raises the exception. It shows the return_exceptions=True parameter that collects exceptions as results instead.
Modify
Change the async_export_all function to accept a max_concurrent parameter. Use asyncio.Semaphore to limit how many exports run at the same time. Test with max_concurrent=2 and verify that 5 notes take about 0.6 seconds instead of 0.2 seconds.
Make [Mastery Gate]
Write async_export_all for your SmartNotes project. In Claude Code, type /tdg to guide you:
- Write the stub with typed signature:
async def async_export_all(notes: list[Note], directory: Path) -> int - Write tests: verify all files exist after export, verify timing is faster than sequential threshold, verify empty list returns 0
- Prompt AI to implement
- Run
uv run ruff check,uv run pyright,uv run pytest
Try With AI
If Claude Code is not already running, open your terminal, navigate to your SmartNotes project folder, and type claude. If you need a refresher, Chapter 44 covers the setup.
Prompt 1: Compare Gather vs Create Task
Show me a side-by-side comparison of using asyncio.gather
versus asyncio.create_task for running 3 async functions.
When should I use gather? When should I use create_task?
Give me a decision rule with examples.
What you're learning: gather and create_task solve different problems. The AI clarifies when to use each: gather for "run these and wait for all," create_task for "start this now, I will check later." Understanding the distinction prevents misuse.
Prompt 2: Error Handling in Gather
My asyncio.gather runs 10 export coroutines. If export
number 7 fails with a FileNotFoundError, what happens
to the other 9?
Show me three strategies:
1. Default behavior (cancel all)
2. return_exceptions=True (collect errors)
3. A wrapper that logs errors and continues
Which strategy should I use for SmartNotes exports?
What you're learning: Concurrent error handling differs from sequential. In a for loop, you add try/except and keep going. In gather, one failure can cancel everything. The AI teaches three strategies and helps you pick the right one for your use case.
Prompt 3: Real-World Concurrency Pattern
I want to export 100 SmartNotes to three formats
(markdown, json, csv) concurrently. That is 300 file
writes. Is it safe to gather all 300 at once? What
problems could happen with too many concurrent operations?
Show me a batched approach using asyncio.Semaphore
that limits concurrency to 20 simultaneous writes.
What you're learning: Unbounded concurrency can exhaust file handles or memory. The AI introduces asyncio.Semaphore as a throttle, a pattern used in production code for rate limiting API calls, database connections, and file operations.
James runs the timing comparison. Sequential: 1.01 seconds. Concurrent: 0.20 seconds. Five times faster for five notes.
"In the warehouse," he says, "we had six loading docks. If we loaded trucks one at a time, it took all morning. When we opened all six docks at once, the same work finished before lunch."
"Same principle," Emma says. "Each dock is a coroutine. The warehouse is the event loop. The trucks are I/O operations. Opening all docks at once is asyncio.gather."
"What about errors? If one dock jams, do the other five stop?"
Emma pauses. "That depends on how you configure it. By default, yes, one failure cancels the rest. But you can tell gather to collect errors instead of canceling." She thinks for a moment. "I actually got this wrong on a real project once. I used the default behavior and one failed API call canceled 200 successful ones. The return_exceptions=True flag would have saved me an hour of debugging."
"So gather is the fan-out tool. But the coroutines are still reading and writing files the old way, with regular open(). Is there an async version of file I/O?"
"There is. That is Lesson 5."