Relationships and Eager Loading
Your Task Manager has interconnected data: Tasks belong to Projects, Workers are assigned to Tasks, Tasks can have subtasks. SQLModel models these relationships—but accessing them incorrectly triggers the infamous N+1 problem.
One-to-Many Relationships
A Project has many Tasks:
from sqlmodel import SQLModel, Field, Relationship
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .task import Task
class Project(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str
# One-to-many: one project has many tasks
tasks: list["Task"] = Relationship(back_populates="project")
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .project import Project
class Task(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
title: str
project_id: int = Field(foreign_key="project.id", index=True)
# Many-to-one: many tasks belong to one project
project: "Project" = Relationship(back_populates="tasks")
Key elements:
| Element | Purpose |
|---|---|
foreign_key="project.id" | Creates FK constraint in database |
Relationship() | Defines Python-side navigation |
back_populates="project" | Links both sides of relationship |
TYPE_CHECKING | Prevents circular imports |
Many-to-One with Multiple Foreign Keys
When a model references the same table multiple times, you need sa_relationship_kwargs:
class Task(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
title: str
# Two foreign keys to Worker
assignee_id: int | None = Field(foreign_key="worker.id", index=True)
created_by_id: int = Field(foreign_key="worker.id", index=True)
# Must specify which FK each relationship uses
assignee: "Worker" = Relationship(
back_populates="assigned_tasks",
sa_relationship_kwargs={"foreign_keys": "[Task.assignee_id]"},
)
created_by: "Worker" = Relationship(
back_populates="created_tasks",
sa_relationship_kwargs={"foreign_keys": "[Task.created_by_id]"},
)
class Worker(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
handle: str # @john or @researcher
assigned_tasks: list["Task"] = Relationship(
back_populates="assignee",
sa_relationship_kwargs={"foreign_keys": "[Task.assignee_id]"},
)
created_tasks: list["Task"] = Relationship(
back_populates="created_by",
sa_relationship_kwargs={"foreign_keys": "[Task.created_by_id]"},
)
Without foreign_keys: SQLAlchemy can't determine which FK to use → error.
Self-Referential Relationships
Tasks can have parent tasks and subtasks:
class Task(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
title: str
parent_task_id: int | None = Field(
default=None,
foreign_key="task.id",
index=True,
)
# Parent (many-to-one to self)
parent: "Task" = Relationship(
back_populates="subtasks",
sa_relationship_kwargs={
"remote_side": "Task.id",
"foreign_keys": "[Task.parent_task_id]",
},
)
# Children (one-to-many from self)
subtasks: list["Task"] = Relationship(
back_populates="parent",
sa_relationship_kwargs={"foreign_keys": "[Task.parent_task_id]"},
)
Key addition: "remote_side": "Task.id" tells SQLAlchemy which side is the "one" in the one-to-many.
The N+1 Problem
Watch what happens when you iterate over tasks and access relationships:
# Get all tasks
tasks = (await session.exec(select(Task))).all()
# Access assignee for each task
for task in tasks:
print(f"{task.title}: {task.assignee.handle}")
What actually happens:
SELECT * FROM task; -- 1 query
SELECT * FROM worker WHERE id = 1; -- +1 query
SELECT * FROM worker WHERE id = 2; -- +1 query
SELECT * FROM worker WHERE id = 1; -- +1 query (duplicate!)
SELECT * FROM worker WHERE id = 3; -- +1 query
... (N more queries for N tasks)
For 100 tasks with 50 unique assignees: 101 queries instead of 2.
This is the N+1 problem: 1 query to get tasks, then N queries to get each assignee.
Eager Loading with selectinload
Load relationships upfront with a single additional query:
from sqlalchemy.orm import selectinload
stmt = (
select(Task)
.options(selectinload(Task.assignee))
)
result = await session.exec(stmt)
tasks = result.unique().all() # unique() required!
for task in tasks:
print(f"{task.title}: {task.assignee.handle}") # No extra queries
What actually happens:
SELECT * FROM task; -- 1 query
SELECT * FROM worker WHERE id IN (1, 2, 3, ...); -- 1 query
For 100 tasks with 50 unique assignees: 2 queries total.
Why unique() Is Required
selectinload can create duplicate parent objects in results. Always call unique():
# WRONG - may have duplicates
tasks = (await session.exec(stmt)).all()
# CORRECT - deduplicates
tasks = (await session.exec(stmt)).unique().all()
Loading Multiple Relationships
Chain .options() for multiple relationships:
stmt = (
select(Task)
.options(
selectinload(Task.assignee),
selectinload(Task.project),
selectinload(Task.subtasks),
)
.where(Task.project_id == project_id)
)
result = await session.exec(stmt)
tasks = result.unique().all()
Nested Eager Loading
Load relationships of relationships:
# Load tasks → assignee → assigned_tasks (assignee's other tasks)
stmt = (
select(Task)
.options(
selectinload(Task.assignee).selectinload(Worker.assigned_tasks)
)
)
When to Use Each Strategy
| Relationship Type | Strategy | Why |
|---|---|---|
| Many-to-one (task → project) | selectinload or joinedload | Both efficient |
| One-to-many (project → tasks) | selectinload | Avoids row explosion |
| Many-to-many | selectinload | Single efficient query |
| Self-referential | selectinload | Handles recursion |
joinedload vs selectinload:
joinedload: Uses SQL JOIN, good for single objectsselectinload: Uses separate IN query, better for collections
Complete Query Pattern
Production query with eager loading:
from sqlmodel import select
from sqlalchemy.orm import selectinload
async def get_project_tasks(
session: AsyncSession,
project_id: int,
include_subtasks: bool = False,
) -> list[Task]:
stmt = (
select(Task)
.options(
selectinload(Task.assignee),
selectinload(Task.created_by),
)
.where(Task.project_id == project_id)
.where(Task.deleted_at.is_(None))
.order_by(Task.created_at.desc())
)
if include_subtasks:
stmt = stmt.options(selectinload(Task.subtasks))
result = await session.exec(stmt)
return result.unique().all()
Output (queries logged):
SELECT task.* FROM task WHERE project_id = 1 AND deleted_at IS NULL ORDER BY created_at DESC
SELECT worker.* FROM worker WHERE id IN (1, 2, 3)
SELECT worker.* FROM worker WHERE id IN (1, 4)
SELECT task.* FROM task WHERE parent_task_id IN (10, 11, 12)
4 queries total instead of potentially hundreds.
Common Mistakes
Database code fails in predictable ways. Learn to recognize and fix these patterns.
1. MissingGreenlet Error (Forgetting await)
Symptom:
sqlalchemy.exc.MissingGreenletError: greenlet_spawn has not been called
Cause: Calling sync method in async context, or missing await.
# WRONG - missing await
def get_task(session, task_id):
return session.get(Task, task_id) # MissingGreenletError!
# CORRECT - async function with await
async def get_task(session, task_id):
return await session.get(Task, task_id)
Also common:
# WRONG - forgot await before exec
result = session.exec(stmt) # MissingGreenletError!
# CORRECT
result = await session.exec(stmt)
2. Accessing Relationships Without Loading
Symptom:
sqlalchemy.exc.MissingGreenletError: greenlet_spawn has not been called
Cause: Accessing lazy-loaded relationship in async context.
# WRONG - triggers lazy load (sync) in async context
task = await session.get(Task, task_id)
print(task.assignee.handle) # MissingGreenletError!
# CORRECT - eager load first
stmt = select(Task).where(Task.id == task_id).options(
selectinload(Task.assignee)
)
result = await session.exec(stmt)
task = result.one()
print(task.assignee.handle) # Works!
3. Expired Object After Commit
Symptom:
sqlalchemy.orm.exc.DetachedInstanceError: Instance is not bound to a Session
Cause: Accessing object after session closed or commit without refresh.
# WRONG - object expired after commit
async def create_task(session, data):
task = Task(**data)
session.add(task)
await session.commit()
return task # Object is expired!
# CORRECT - refresh after commit
async def create_task(session, data):
task = Task(**data)
session.add(task)
await session.commit()
await session.refresh(task) # Reload from DB
return task
Or use expire_on_commit=False:
async_session = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # Objects stay valid after commit
)
4. Connection Leaks
Symptom: Application slows down, then hangs with QueuePool limit reached.
Cause: Sessions not properly closed.
# WRONG - session never closed
async def get_tasks():
session = AsyncSession(engine)
tasks = await session.exec(select(Task))
return tasks.all()
# Session leaked! Connection stays open
# CORRECT - context manager ensures cleanup
async def get_tasks():
async with AsyncSession(engine) as session:
tasks = await session.exec(select(Task))
return tasks.all()
# Session automatically closed
5. Forgetting unique() with selectinload
Symptom: Duplicate items in results.
# WRONG - may return duplicates
stmt = select(Task).options(selectinload(Task.subtasks))
result = await session.exec(stmt)
tasks = result.all() # Could have duplicate Task objects!
# CORRECT - deduplicate results
tasks = result.unique().all()
6. N+1 in List Comprehensions
Symptom: Slow endpoint, many queries in logs.
# WRONG - N+1 hidden in list comprehension
tasks = (await session.exec(select(Task))).all()
return [{"title": t.title, "assignee": t.assignee.handle} for t in tasks]
# Each t.assignee triggers a new query!
# CORRECT - eager load before comprehension
stmt = select(Task).options(selectinload(Task.assignee))
tasks = (await session.exec(stmt)).unique().all()
return [{"title": t.title, "assignee": t.assignee.handle} for t in tasks]
# All assignees loaded in single query
Quick Reference
| Error | Likely Cause | Fix |
|---|---|---|
MissingGreenletError | Missing await | Add await to async operations |
DetachedInstanceError | Expired object | Use refresh() or expire_on_commit=False |
QueuePool limit reached | Connection leak | Use async with for sessions |
| Duplicate results | Missing unique() | Add result.unique().all() |
| Slow queries | N+1 pattern | Use selectinload() |
Try With AI
Prompt 1: Debug Relationship Error
I'm getting this error:
"Could not determine join condition between parent/child tables"
My models:
class Task(SQLModel, table=True):
assignee_id: int | None = Field(foreign_key="worker.id")
created_by_id: int = Field(foreign_key="worker.id")
assignee: "Worker" = Relationship(back_populates="tasks")
class Worker(SQLModel, table=True):
tasks: list["Task"] = Relationship(back_populates="assignee")
What's wrong and how do I fix it?
What you're learning: Multi-FK disambiguation—recognizing when sa_relationship_kwargs is needed.
Prompt 2: Optimize Query
My endpoint is slow. Here's the code:
@router.get("/projects/{project_id}/summary")
async def get_summary(project_id: int, session: AsyncSession = Depends(get_session)):
project = await session.get(Project, project_id)
return {
"name": project.name,
"tasks": [
{"title": t.title, "assignee": t.assignee.handle}
for t in project.tasks
]
}
Profile it and show me the optimized version with proper eager loading.
What you're learning: N+1 optimization—identifying and fixing query performance issues.
Prompt 3: Recursive Loading
My Task model has parent/subtasks for hierarchical tasks.
I need to load a task with all its subtasks, and each subtask's
subtasks (2 levels deep).
Show me the query with nested selectinload and explain
the performance implications of loading deeper levels.
What you're learning: Recursive relationships—handling tree structures in queries.
Safety Note
Eager loading loads data you might not need. Balance N+1 prevention with memory usage. For large collections, consider pagination instead of loading everything.
Reflect on Your Skill
You built a relational-db-agent skill in Lesson 0. Test its relationship knowledge.
Test Your Skill
Using my relational-db-agent skill, generate:
1. A Task model with parent/subtasks self-referential relationship
2. A query that loads tasks with assignee using selectinload
3. Include result.unique().all() pattern
Identify Gaps
Ask yourself:
- Did my skill include
sa_relationship_kwargswithremote_sidefor parent? - Did it use
selectinload()fromsqlalchemy.orm? - Did it call
unique()beforeall()? - Did it add
TYPE_CHECKINGfor type hints?
Improve Your Skill
If you found gaps:
My relational-db-agent skill forgets result.unique().all().
Add this pattern for all selectinload queries:
result = await session.exec(stmt)
items = result.unique().all() # Always use unique() with selectinload
And add to safety notes: "selectinload can create duplicates - always use unique()"
Your skill now prevents N+1 queries correctly.