CRUD Operations Pattern
CRUD—Create, Read, Update, Delete—forms the foundation of database operations. Every agent that persists data needs these patterns.
In async SQLModel, each operation uses await and follows specific sequences to work correctly.
Create: Adding New Records
Creating a record involves:
- Instantiate the model
- Add to session
- Flush to get database-assigned ID
- Commit to make permanent
- Refresh to get current state
async def create_task(
session: AsyncSession,
title: str,
project_id: int,
created_by_id: int,
) -> Task:
task = Task(
title=title,
project_id=project_id,
created_by_id=created_by_id,
)
session.add(task)
await session.flush() # Writes to DB, assigns task.id
await session.commit() # Makes permanent
await session.refresh(task) # Reloads from DB
return task
Output:
>>> task = await create_task(session, "Fix auth", 1, 1)
>>> task.id
42 # Database assigned
>>> task.created_at
datetime.datetime(2024, 1, 15, 10, 30, 0)
Why each step:
| Step | Purpose | Skip It? |
|---|---|---|
add() | Mark for insertion | No |
flush() | Get auto-assigned values | Optional if you don't need ID |
commit() | Persist to disk | No |
refresh() | Reload defaults | Optional if using expire_on_commit=False |
Read: Fetching Records
Get by ID
Use session.get() for primary key lookups:
async def get_task(session: AsyncSession, task_id: int) -> Task | None:
return await session.get(Task, task_id)
Output:
>>> task = await get_task(session, 42)
>>> task.title
"Fix auth"
>>> task = await get_task(session, 9999)
>>> task is None
True
Query with Filters
Use select() for filtered queries:
from sqlmodel import select
async def list_tasks_by_status(
session: AsyncSession,
status: str,
) -> list[Task]:
stmt = select(Task).where(Task.status == status)
result = await session.exec(stmt)
return list(result.all())
Output:
>>> pending = await list_tasks_by_status(session, "pending")
>>> len(pending)
15
Multiple Filters
Chain .where() for AND conditions:
async def list_project_tasks(
session: AsyncSession,
project_id: int,
status: str | None = None,
priority: str | None = None,
) -> list[Task]:
stmt = select(Task).where(Task.project_id == project_id)
if status:
stmt = stmt.where(Task.status == status)
if priority:
stmt = stmt.where(Task.priority == priority)
stmt = stmt.order_by(Task.created_at.desc())
result = await session.exec(stmt)
return list(result.all())
Output:
>>> tasks = await list_project_tasks(session, 1, status="pending")
>>> [t.title for t in tasks]
["Fix auth", "Update docs", "Add tests"]
Pagination
Add offset and limit:
async def list_paginated(
session: AsyncSession,
limit: int = 50,
offset: int = 0,
) -> list[Task]:
stmt = (
select(Task)
.order_by(Task.created_at.desc())
.offset(offset)
.limit(limit)
)
result = await session.exec(stmt)
return list(result.all())
Output:
>>> page_1 = await list_paginated(session, limit=10, offset=0)
>>> page_2 = await list_paginated(session, limit=10, offset=10)
Update: Modifying Records
Fetch, modify, commit:
async def update_task(
session: AsyncSession,
task: Task,
title: str | None = None,
status: str | None = None,
priority: str | None = None,
) -> Task:
if title is not None:
task.title = title
if status is not None:
task.status = status
if priority is not None:
task.priority = priority
task.updated_at = datetime.utcnow()
session.add(task)
await session.commit()
await session.refresh(task)
return task
Output:
>>> task = await get_task(session, 42)
>>> task.status
"pending"
>>> task = await update_task(session, task, status="in_progress")
>>> task.status
"in_progress"
>>> task.updated_at # Also updated
datetime.datetime(2024, 1, 15, 11, 45, 0)
Alternative: Update without fetching
For bulk updates:
from sqlalchemy import update
async def mark_completed(session: AsyncSession, task_ids: list[int]) -> int:
stmt = (
update(Task)
.where(Task.id.in_(task_ids))
.values(status="completed", updated_at=datetime.utcnow())
)
result = await session.exec(stmt)
await session.commit()
return result.rowcount
Output:
>>> count = await mark_completed(session, [1, 2, 3])
>>> count
3
Delete: Removing Records
Fetch, delete, commit:
async def delete_task(session: AsyncSession, task: Task) -> None:
await session.delete(task)
await session.commit()
Output:
>>> task = await get_task(session, 42)
>>> await delete_task(session, task)
>>> await get_task(session, 42)
None # Gone
Soft Delete Pattern
Hard deletes lose data permanently. Soft deletes mark records as deleted while preserving them for audit trails, recovery, and compliance.
Model Setup
Add deleted_at timestamp to your model:
class Task(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
title: str
status: str = Field(default="pending")
# Soft delete field
deleted_at: datetime | None = Field(default=None, index=True)
Soft Delete Implementation
async def soft_delete_task(session: AsyncSession, task: Task) -> Task:
"""Mark task as deleted without removing from database."""
task.deleted_at = datetime.utcnow()
task.status = "deleted"
session.add(task)
await session.commit()
await session.refresh(task)
return task
Filtering Deleted Records
Always exclude deleted records in normal queries:
async def list_active_tasks(session: AsyncSession) -> list[Task]:
stmt = (
select(Task)
.where(Task.deleted_at.is_(None)) # Only non-deleted
.order_by(Task.created_at.desc())
)
result = await session.exec(stmt)
return list(result.all())
Restore (Undelete)
async def restore_task(session: AsyncSession, task: Task) -> Task:
"""Restore a soft-deleted task."""
task.deleted_at = None
task.status = "pending" # Or previous status if stored
session.add(task)
await session.commit()
await session.refresh(task)
return task
View Deleted Records (Admin)
async def list_deleted_tasks(
session: AsyncSession,
days: int = 30,
) -> list[Task]:
"""List tasks deleted in the last N days."""
cutoff = datetime.utcnow() - timedelta(days=days)
stmt = (
select(Task)
.where(Task.deleted_at.is_not(None))
.where(Task.deleted_at >= cutoff)
.order_by(Task.deleted_at.desc())
)
result = await session.exec(stmt)
return list(result.all())
Hard Delete After Retention
For GDPR compliance or storage cleanup:
async def purge_old_deleted(session: AsyncSession, days: int = 90) -> int:
"""Permanently remove tasks deleted more than N days ago."""
cutoff = datetime.utcnow() - timedelta(days=days)
stmt = (
delete(Task)
.where(Task.deleted_at.is_not(None))
.where(Task.deleted_at < cutoff)
)
result = await session.exec(stmt)
await session.commit()
return result.rowcount
Query Optimization
Slow queries kill agent performance. Learn to identify and fix them.
EXPLAIN ANALYZE
PostgreSQL shows exactly how it executes your query:
EXPLAIN ANALYZE SELECT * FROM task WHERE project_id = 1 AND status = 'pending';
Output:
Index Scan using ix_task_project_status on task (cost=0.29..8.31 rows=1 width=156) (actual time=0.015..0.016 rows=1 loops=1)
Index Cond: ((project_id = 1) AND ((status)::text = 'pending'::text))
Planning Time: 0.089 ms
Execution Time: 0.031 ms
What to look for:
| Term | Good | Bad |
|---|---|---|
| Scan type | Index Scan, Index Only Scan | Seq Scan (on large tables) |
| Rows | Close to actual | Wildly different = stale stats |
| Execution Time | < 10ms for simple queries | > 100ms needs optimization |
Running EXPLAIN from Python
from sqlalchemy import text
async def explain_query(session: AsyncSession, stmt) -> str:
"""Get query execution plan."""
# Convert SQLModel statement to SQL string
compiled = stmt.compile(
dialect=session.bind.dialect,
compile_kwargs={"literal_binds": True},
)
explain = await session.exec(
text(f"EXPLAIN ANALYZE {compiled}")
)
return "\n".join(row[0] for row in explain.all())
Common Query Problems
1. Sequential Scan on Large Table
Seq Scan on task (cost=0.00..12345.00 rows=500000...)
Fix: Add an index on the filtered column.
2. Missing WHERE Clause Index
# Slow: status not indexed
stmt = select(Task).where(Task.priority == "high")
Fix:
class Task(SQLModel, table=True):
priority: str = Field(default="medium", index=True) # Add index
3. Loading Too Many Columns
# Slow: loads all columns when you only need count
tasks = await session.exec(select(Task)).all()
count = len(tasks)
Fix:
# Fast: database does the counting
from sqlalchemy import func
result = await session.exec(select(func.count(Task.id)))
count = result.one()
4. N+1 Query Pattern
Covered in the Relationships lesson—use selectinload().
Query Timing
Measure query execution time:
import time
async def timed_query(session: AsyncSession, stmt):
"""Execute query and return results with timing."""
start = time.perf_counter()
result = await session.exec(stmt)
items = result.all()
elapsed = time.perf_counter() - start
print(f"Query took {elapsed:.3f}s, returned {len(items)} rows")
return items
Complete CRUD Service
Production-ready service layer:
# services/task_service.py
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from datetime import datetime
from models.task import Task
from schemas.task import TaskCreate, TaskUpdate
class TaskService:
def __init__(self, session: AsyncSession):
self.session = session
async def create(self, data: TaskCreate, created_by_id: int) -> Task:
task = Task(
**data.model_dump(),
created_by_id=created_by_id,
)
self.session.add(task)
await self.session.flush()
await self.session.commit()
await self.session.refresh(task)
return task
async def get(self, task_id: int) -> Task | None:
return await self.session.get(Task, task_id)
async def list_by_project(
self,
project_id: int,
status: str | None = None,
) -> list[Task]:
stmt = (
select(Task)
.where(Task.project_id == project_id)
.where(Task.deleted_at.is_(None))
)
if status:
stmt = stmt.where(Task.status == status)
stmt = stmt.order_by(Task.created_at.desc())
result = await self.session.exec(stmt)
return list(result.all())
async def update(self, task: Task, data: TaskUpdate) -> Task:
for key, value in data.model_dump(exclude_unset=True).items():
setattr(task, key, value)
task.updated_at = datetime.utcnow()
self.session.add(task)
await self.session.commit()
await self.session.refresh(task)
return task
async def delete(self, task: Task) -> None:
await self.session.delete(task)
await self.session.commit()
Usage in endpoint:
@router.post("/tasks", response_model=TaskResponse)
async def create_task(
data: TaskCreate,
session: AsyncSession = Depends(get_session),
current_user: Worker = Depends(get_current_user),
):
service = TaskService(session)
return await service.create(data, current_user.id)
Try With AI
Prompt 1: Implement Search
I need to search tasks by title or description.
The search should be case-insensitive and match partial text.
Show me the SQLModel query using ilike() and how to combine
it with existing filters (project_id, status).
What you're learning: Text search patterns—combining full-text search with structured filters.
Prompt 2: Bulk Operations
I need to:
1. Create 100 tasks in a single transaction
2. Update all tasks in a project to "archived" status
3. Delete all tasks older than 30 days
Show me efficient patterns for each that don't fetch
individual records.
What you're learning: Bulk operations—handling large datasets efficiently.
Prompt 3: Debug Empty Results
My list_tasks query returns an empty list, but I know there's data:
async def list_tasks(session: AsyncSession):
stmt = select(Task)
result = session.exec(stmt) # Missing something
return result.all()
What's wrong and how do I fix it?
What you're learning: Async debugging—catching missing await patterns.
Safety Note
Always validate user input before database operations. SQLModel with Pydantic provides validation, but verify IDs exist before updates and deletes. Never trust user-provided IDs for authorization—always verify ownership.
Reflect on Your Skill
You built a relational-db-agent skill in Lesson 0. Test its CRUD knowledge.
Test Your Skill
Using my relational-db-agent skill, generate a complete
TaskService class with:
- create() with flush and commit
- get() by ID
- list_by_project() with optional status filter
- update() with updated_at timestamp
- delete()
Identify Gaps
Ask yourself:
- Did my skill include
await session.flush()before commit in create? - Did it use
await session.exec()for select statements? - Did it call
await session.refresh()after commit? - Did it update
updated_atin the update method?
Improve Your Skill
If you found gaps:
My relational-db-agent skill forgets to update updated_at timestamp.
Add this pattern to all update operations:
task.updated_at = datetime.utcnow()
And include the import: from datetime import datetime
Your skill now generates complete CRUD patterns.