Transactions and Error Handling
Database transactions ensure your agent's operations succeed completely or fail cleanly. When creating a task with audit logs, either both records exist or neither does.
Getting transactions wrong causes data corruption that's hard to debug.
flush() vs commit()
These two operations are often confused:
| Operation | What It Does | When to Use |
|---|---|---|
flush() | Writes to DB but doesn't commit | Need auto-assigned values (IDs) |
commit() | Makes changes permanent | Operation complete, persist data |
async def create_task_with_audit(session: AsyncSession, data: dict):
# Create task
task = Task(**data)
session.add(task)
# flush() writes to DB - task.id is now available
await session.flush()
print(f"Task ID (not yet committed): {task.id}")
# Use task.id to create audit log
audit = AuditLog(
entity_type="task",
entity_id=task.id, # Available after flush
action="created",
)
session.add(audit)
# commit() makes both permanent
await session.commit()
print(f"Transaction committed: Task {task.id} and audit log")
Output:
Task ID (not yet committed): 42
Transaction committed: Task 42 and audit log
If you skip flush():
task = Task(**data)
session.add(task)
# task.id is None here - database hasn't assigned it yet
audit = AuditLog(entity_id=task.id) # None! Wrong!
Multi-Operation Transactions
Group related operations into a single transaction:
async def create_project_with_tasks(
session: AsyncSession,
project_data: dict,
task_titles: list[str],
) -> Project:
# Create project
project = Project(**project_data)
session.add(project)
await session.flush() # Get project.id
# Create all tasks
tasks = [
Task(title=title, project_id=project.id)
for title in task_titles
]
session.add_all(tasks)
# Single commit for everything
await session.commit()
await session.refresh(project)
return project
Atomicity guarantee: If any task fails to insert, the project is also rolled back.
Rollback on Error
When operations fail, rollback to previous state:
async def safe_create_task(session: AsyncSession, data: dict) -> Task:
try:
task = Task(**data)
session.add(task)
await session.commit()
await session.refresh(task)
return task
except Exception:
await session.rollback()
raise
What rollback does:
- Discards all uncommitted changes in the session
- Returns connection to pool in clean state
- Prevents partial data from persisting
Handling IntegrityError
Database constraints raise IntegrityError:
from sqlalchemy.exc import IntegrityError
from fastapi import HTTPException
async def create_project(
session: AsyncSession,
data: ProjectCreate,
) -> Project:
try:
project = Project(**data.model_dump())
session.add(project)
await session.commit()
await session.refresh(project)
return project
except IntegrityError as e:
await session.rollback()
if "unique constraint" in str(e).lower():
raise HTTPException(
status_code=400,
detail="Project with this name already exists"
)
if "foreign key constraint" in str(e).lower():
raise HTTPException(
status_code=400,
detail="Referenced record does not exist"
)
raise HTTPException(status_code=500, detail="Database error")
Common IntegrityError causes:
| Constraint | Example | Response |
|---|---|---|
| Unique | Duplicate email | 400: Already exists |
| Foreign key | Invalid project_id | 400: Parent not found |
| Not null | Missing required field | 400: Field required |
Context Manager Pattern
Use async with session.begin() for automatic commit/rollback:
async def transactional_update(session: AsyncSession):
async with session.begin():
# All operations in this block are in one transaction
task = await session.get(Task, 42)
task.status = "completed"
audit = AuditLog(entity_id=42, action="completed")
session.add(audit)
# Auto-commit on successful exit
# Auto-rollback on exception
Benefits:
- Commit happens automatically on clean exit
- Rollback happens automatically on exception
- No need for try/except/rollback boilerplate
Caution: Don't mix with manual commit() calls:
# WRONG - double commit
async with session.begin():
session.add(task)
await session.commit() # Error! begin() manages the transaction
Nested Operations
When calling functions that might use the session:
async def outer_operation(session: AsyncSession):
# Start transaction
project = Project(name="New Project")
session.add(project)
await session.flush()
# Call inner function - shares the transaction
await create_initial_tasks(session, project.id)
# Single commit for everything
await session.commit()
async def create_initial_tasks(session: AsyncSession, project_id: int):
# These adds are part of the outer transaction
tasks = [
Task(title="Setup", project_id=project_id),
Task(title="Planning", project_id=project_id),
]
session.add_all(tasks)
# Don't commit here - outer function handles it
Complete Error Handling Pattern
Production-ready service with error handling:
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from fastapi import HTTPException
import logging
logger = logging.getLogger(__name__)
class TaskService:
def __init__(self, session: AsyncSession):
self.session = session
async def create(self, data: TaskCreate, created_by_id: int) -> Task:
try:
task = Task(
**data.model_dump(),
created_by_id=created_by_id,
)
self.session.add(task)
await self.session.flush()
# Create audit log
audit = AuditLog(
entity_type="task",
entity_id=task.id,
action="created",
actor_id=created_by_id,
)
self.session.add(audit)
await self.session.commit()
await self.session.refresh(task)
return task
except IntegrityError as e:
await self.session.rollback()
logger.warning(f"IntegrityError creating task: {e}")
if "project_id" in str(e):
raise HTTPException(400, "Project not found")
if "created_by_id" in str(e):
raise HTTPException(400, "Creator not found")
raise HTTPException(400, "Invalid data")
except SQLAlchemyError as e:
await self.session.rollback()
logger.error(f"Database error creating task: {e}")
raise HTTPException(500, "Database error")
Output (on FK violation):
WARNING: IntegrityError creating task: insert or update on table "task" violates foreign key constraint
HTTP 400: Project not found
Try With AI
Prompt 1: Atomic Bulk Update
I need to update all tasks in a project to "archived" status,
and create a single audit log entry for the bulk action.
If any task update fails, I want the entire operation to rollback
including the audit log.
Show me the transactional pattern for this.
What you're learning: Bulk transaction patterns—ensuring atomicity across multiple records.
Prompt 2: Retry on Deadlock
My agent occasionally gets "deadlock detected" errors when
multiple agents update the same project simultaneously.
Show me a retry pattern that:
1. Catches deadlock errors
2. Retries up to 3 times with exponential backoff
3. Rolls back and re-starts the transaction on each retry
What you're learning: Concurrency handling—recovering from database contention.
Prompt 3: Partial Success Handling
I'm importing tasks from a CSV. Some rows might have invalid data.
I want to:
1. Insert all valid rows
2. Skip invalid rows
3. Return a report of what succeeded and what failed
Should I use one transaction or many? Show me the pattern.
What you're learning: Batch processing—balancing atomicity with partial success requirements.
Safety Note
Never catch exceptions without rollback. A session with uncommitted changes after an error can corrupt subsequent operations. Always rollback in exception handlers.
Reflect on Your Skill
You built a relational-db-agent skill in Lesson 0. Test its transaction knowledge.
Test Your Skill
Using my relational-db-agent skill, generate a function that:
1. Creates a Project
2. Creates 3 initial Tasks for the project
3. Creates an AuditLog entry
4. Uses flush() to get IDs before commit
5. Rolls back everything if any step fails
Identify Gaps
Ask yourself:
- Did my skill use
flush()to get project.id before creating tasks? - Did it include
try/exceptwithrollback()in the except block? - Did it handle
IntegrityErrorspecifically? - Did it use a single
commit()at the end?
Improve Your Skill
If you found gaps:
My relational-db-agent skill doesn't handle IntegrityError.
Add this pattern for all create/update operations:
from sqlalchemy.exc import IntegrityError
try:
await session.commit()
except IntegrityError:
await session.rollback()
raise HTTPException(400, "Constraint violation")
Your skill now generates robust transaction patterns.