Skip to main content

Running Tasks Concurrently

If you're new to programming

asyncio.gather() runs multiple async functions at the same time and waits for all of them to finish. Instead of waiting for each export to complete before starting the next, gather starts them all together. This is where async pays off.

If you've coded before

asyncio.gather for fan-out, create_task for background work. This lesson is where sequential 5-second exports drop to under 1 second. gather returns results in the same order as the input coroutines; create_task schedules a coroutine on the event loop and returns a Task handle.

In Lesson 3, James wrote async def export_note() and ran it with asyncio.run(). It worked. But it exported one note at a time. Five notes, five waits, five seconds.

"I have a coroutine that handles one note," James tells Emma. "I need to run five of them at once."

"That is exactly what asyncio.gather does," Emma says. "You give it a list of coroutines. It starts all of them, waits for all of them, and returns all the results. The total time is the slowest single task, not the sum."


asyncio.gather: Run Many at Once

asyncio.gather() takes multiple coroutines and runs them concurrently. Here is the pattern with the simulated export from Lesson 3:

import asyncio
import time


async def export_note(note_id: int) -> str:
"""Simulate exporting a note (1 second of I/O)."""
await asyncio.sleep(1) # Simulates file write
return f"note_{note_id}.md"


async def main() -> None:
start = time.perf_counter()

# Sequential: one at a time
results_seq: list[str] = []
for i in range(5):
result = await export_note(i)
results_seq.append(result)

seq_time = time.perf_counter() - start

start = time.perf_counter()

# Concurrent: all at once
results_conc: list[str] = await asyncio.gather(
export_note(0),
export_note(1),
export_note(2),
export_note(3),
export_note(4),
)

conc_time = time.perf_counter() - start

print(f"Sequential: {seq_time:.2f}s for {len(results_seq)} notes")
print(f"Concurrent: {conc_time:.2f}s for {len(results_conc)} notes")


asyncio.run(main())

Output:

Sequential: 5.01s for 5 notes
Concurrent: 1.00s for 5 notes

Five sequential exports take 5 seconds (1 second each, back to back). Five concurrent exports take 1 second (all sleeping at the same time). The event loop starts all five coroutines, suspends each at await asyncio.sleep(1), and resumes them when the sleep finishes. Since they all sleep simultaneously, the total wait is just one sleep duration.

gather returns results in the same order you passed the coroutines. results_conc[0] is always from export_note(0), regardless of which coroutine finished first.

When you have a list of items to process, use the unpacking operator:

async def export_many(note_ids: list[int]) -> list[str]:
"""Export multiple notes concurrently."""
coroutines = [export_note(nid) for nid in note_ids]
results: list[str] = await asyncio.gather(*coroutines)
return results

The *coroutines unpacks the list into separate arguments for gather.


asyncio.create_task: Background Work

asyncio.gather waits for everything to finish before continuing. Sometimes you want to start a task in the background, do other work, and collect the result later. That is asyncio.create_task.

import asyncio


async def slow_backup() -> str:
"""Simulate a slow backup operation."""
await asyncio.sleep(3)
return "backup_complete"


async def quick_status() -> str:
"""Simulate a quick status check."""
await asyncio.sleep(0.5)
return "all_systems_ok"


async def main() -> None:
# Start backup in the background
backup_task: asyncio.Task[str] = asyncio.create_task(slow_backup())

# Do other work while backup runs
status = await quick_status()
print(f"Status: {status}")

# Now wait for the backup to finish
backup_result = await backup_task
print(f"Backup: {backup_result}")


asyncio.run(main())

Output:

Status: all_systems_ok
Backup: backup_complete

create_task schedules the coroutine on the event loop immediately. The backup starts running as soon as you create the task. While the backup sleeps for 3 seconds, quick_status() runs and finishes in 0.5 seconds. When you await backup_task, the backup has already been running for 0.5 seconds, so only 2.5 seconds remain.

The key difference:

PatternBehavior
await coroutine()Runs and waits. Nothing else happens until it finishes.
asyncio.gather(a(), b())Runs all concurrently, waits for all to finish.
asyncio.create_task(a())Starts running immediately. You await the result when you need it.

SmartNotes Concurrent Export

Time to combine these tools. Build async_export_all using gather to export all notes concurrently:

import asyncio
import time
from pathlib import Path
from dataclasses import dataclass


@dataclass
class Note:
title: str
body: str
tags: list[str]


async def export_note(note: Note, directory: Path) -> Path:
"""Export a single note to a Markdown file."""
slug = note.title.lower().replace(" ", "-")
filepath = directory / f"{slug}.md"
content = f"# {note.title}\n\n{note.body}\n\nTags: {', '.join(note.tags)}\n"
filepath.write_text(content)
await asyncio.sleep(0.2) # Simulates slow I/O
return filepath


async def async_export_all(
notes: list[Note], directory: Path
) -> int:
"""Export all notes concurrently. Returns count of exported files."""
directory.mkdir(parents=True, exist_ok=True)
coroutines = [export_note(note, directory) for note in notes]
results: list[Path] = await asyncio.gather(*coroutines)
return len(results)


def sequential_export_all(
notes: list[Note], directory: Path
) -> int:
"""Export all notes sequentially (for timing comparison)."""
import time as sync_time

directory.mkdir(parents=True, exist_ok=True)
count = 0
for note in notes:
slug = note.title.lower().replace(" ", "-")
filepath = directory / f"{slug}.md"
content = f"# {note.title}\n\n{note.body}\n\nTags: {', '.join(note.tags)}\n"
filepath.write_text(content)
sync_time.sleep(0.2) # Same simulated delay
count += 1
return count


async def main() -> None:
notes = [
Note("Python Basics", "Variables and types", ["python"]),
Note("Async Intro", "Coroutines and await", ["python", "async"]),
Note("CLI Tools", "Building with argparse", ["python", "cli"]),
Note("File IO", "Reading and writing files", ["python"]),
Note("Testing", "Pytest fundamentals", ["python", "testing"]),
]

directory = Path("export_output")

# Sequential timing
start = time.perf_counter()
seq_count = sequential_export_all(notes, directory)
seq_time = time.perf_counter() - start

# Concurrent timing
start = time.perf_counter()
conc_count = await async_export_all(notes, directory)
conc_time = time.perf_counter() - start

print(f"Sequential: {seq_time:.2f}s ({seq_count} notes)")
print(f"Concurrent: {conc_time:.2f}s ({conc_count} notes)")
print(f"Speedup: {seq_time / conc_time:.1f}x")


asyncio.run(main())

Output:

Sequential: 1.01s (5 notes)
Concurrent: 0.20s (5 notes)
Speedup: 5.0x

Five notes at 0.2 seconds each: sequential takes 1 second, concurrent takes 0.2 seconds. The speedup scales with the number of notes. Ten notes would be 2 seconds vs 0.2 seconds. One hundred notes would be 20 seconds vs 0.2 seconds. For I/O-bound work, asyncio.gather turns total time from the sum of all durations into approximately the duration of the slowest single task.


PRIMM-AI+ Practice: Predict the Gather

Predict [AI-FREE]

Press Shift+Tab to enter Plan Mode.

Given this code:

import asyncio
import time

async def fetch(label: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"{label} done"

async def main() -> None:
start = time.perf_counter()
results = await asyncio.gather(
fetch("A", 2.0),
fetch("B", 1.0),
fetch("C", 3.0),
)
elapsed = time.perf_counter() - start
print(results)
print(f"{elapsed:.1f}s")

asyncio.run(main())
  1. How long does the program take? (Not 6 seconds. Why?)
  2. What is the order of items in results?
  3. Which coroutine finishes first? Does that change the order in results?

Write your answers before running.

Check your prediction
  1. About 3.0 seconds. All three coroutines sleep concurrently. The total time equals the longest sleep (C at 3.0s), not the sum (6.0s).
  2. ['A done', 'B done', 'C done']. Results follow input order, not completion order.
  3. B finishes first (1.0s), then A (2.0s), then C (3.0s). But results always matches the order you passed to gather.

Run

Press Shift+Tab to exit Plan Mode.

Create the file and run it. Verify the timing matches your prediction.

Investigate

Change fetch("C", 3.0) to fetch("C", 0.5). Before running, predict the new total time and the result order. Run it and check.

Then ask Claude Code:

What happens if one of the coroutines in asyncio.gather raises
an exception? Does it cancel the others? Show me with an example.

The AI demonstrates that by default, if one coroutine fails, gather cancels the remaining coroutines and raises the exception. It shows the return_exceptions=True parameter that collects exceptions as results instead.

Modify

Change the async_export_all function to accept a max_concurrent parameter. Use asyncio.Semaphore to limit how many exports run at the same time. Test with max_concurrent=2 and verify that 5 notes take about 0.6 seconds instead of 0.2 seconds.

Make [Mastery Gate]

Write async_export_all for your SmartNotes project. In Claude Code, type /tdg to guide you:

  1. Write the stub with typed signature: async def async_export_all(notes: list[Note], directory: Path) -> int
  2. Write tests: verify all files exist after export, verify timing is faster than sequential threshold, verify empty list returns 0
  3. Prompt AI to implement
  4. Run uv run ruff check, uv run pyright, uv run pytest

Try With AI

Opening Claude Code

If Claude Code is not already running, open your terminal, navigate to your SmartNotes project folder, and type claude. If you need a refresher, Chapter 44 covers the setup.

Prompt 1: Compare Gather vs Create Task

Show me a side-by-side comparison of using asyncio.gather
versus asyncio.create_task for running 3 async functions.

When should I use gather? When should I use create_task?
Give me a decision rule with examples.

What you're learning: gather and create_task solve different problems. The AI clarifies when to use each: gather for "run these and wait for all," create_task for "start this now, I will check later." Understanding the distinction prevents misuse.

Prompt 2: Error Handling in Gather

My asyncio.gather runs 10 export coroutines. If export
number 7 fails with a FileNotFoundError, what happens
to the other 9?

Show me three strategies:
1. Default behavior (cancel all)
2. return_exceptions=True (collect errors)
3. A wrapper that logs errors and continues

Which strategy should I use for SmartNotes exports?

What you're learning: Concurrent error handling differs from sequential. In a for loop, you add try/except and keep going. In gather, one failure can cancel everything. The AI teaches three strategies and helps you pick the right one for your use case.

Prompt 3: Real-World Concurrency Pattern

I want to export 100 SmartNotes to three formats
(markdown, json, csv) concurrently. That is 300 file
writes. Is it safe to gather all 300 at once? What
problems could happen with too many concurrent operations?

Show me a batched approach using asyncio.Semaphore
that limits concurrency to 20 simultaneous writes.

What you're learning: Unbounded concurrency can exhaust file handles or memory. The AI introduces asyncio.Semaphore as a throttle, a pattern used in production code for rate limiting API calls, database connections, and file operations.


James runs the timing comparison. Sequential: 1.01 seconds. Concurrent: 0.20 seconds. Five times faster for five notes.

"In the warehouse," he says, "we had six loading docks. If we loaded trucks one at a time, it took all morning. When we opened all six docks at once, the same work finished before lunch."

"Same principle," Emma says. "Each dock is a coroutine. The warehouse is the event loop. The trucks are I/O operations. Opening all docks at once is asyncio.gather."

"What about errors? If one dock jams, do the other five stop?"

Emma pauses. "That depends on how you configure it. By default, yes, one failure cancels the rest. But you can tell gather to collect errors instead of canceling." She thinks for a moment. "I actually got this wrong on a real project once. I used the default behavior and one failed API call canceled 200 successful ones. The return_exceptions=True flag would have saved me an hour of debugging."

"So gather is the fan-out tool. But the coroutines are still reading and writing files the old way, with regular open(). Is there an async version of file I/O?"

"There is. That is Lesson 5."