Skip to main content

Transactions and Error Handling

Database transactions ensure your agent's operations succeed completely or fail cleanly. When creating a task with audit logs, either both records exist or neither does.

Getting transactions wrong causes data corruption that's hard to debug.

flush() vs commit()

These two operations are often confused:

OperationWhat It DoesWhen to Use
flush()Writes to DB but doesn't commitNeed auto-assigned values (IDs)
commit()Makes changes permanentOperation complete, persist data
async def create_task_with_audit(session: AsyncSession, data: dict):
# Create task
task = Task(**data)
session.add(task)

# flush() writes to DB - task.id is now available
await session.flush()
print(f"Task ID (not yet committed): {task.id}")

# Use task.id to create audit log
audit = AuditLog(
entity_type="task",
entity_id=task.id, # Available after flush
action="created",
)
session.add(audit)

# commit() makes both permanent
await session.commit()
print(f"Transaction committed: Task {task.id} and audit log")

Output:

Task ID (not yet committed): 42
Transaction committed: Task 42 and audit log

If you skip flush():

task = Task(**data)
session.add(task)
# task.id is None here - database hasn't assigned it yet

audit = AuditLog(entity_id=task.id) # None! Wrong!

Multi-Operation Transactions

Group related operations into a single transaction:

async def create_project_with_tasks(
session: AsyncSession,
project_data: dict,
task_titles: list[str],
) -> Project:
# Create project
project = Project(**project_data)
session.add(project)
await session.flush() # Get project.id

# Create all tasks
tasks = [
Task(title=title, project_id=project.id)
for title in task_titles
]
session.add_all(tasks)

# Single commit for everything
await session.commit()
await session.refresh(project)
return project

Atomicity guarantee: If any task fails to insert, the project is also rolled back.

Rollback on Error

When operations fail, rollback to previous state:

async def safe_create_task(session: AsyncSession, data: dict) -> Task:
try:
task = Task(**data)
session.add(task)
await session.commit()
await session.refresh(task)
return task
except Exception:
await session.rollback()
raise

What rollback does:

  • Discards all uncommitted changes in the session
  • Returns connection to pool in clean state
  • Prevents partial data from persisting

Handling IntegrityError

Database constraints raise IntegrityError:

from sqlalchemy.exc import IntegrityError
from fastapi import HTTPException

async def create_project(
session: AsyncSession,
data: ProjectCreate,
) -> Project:
try:
project = Project(**data.model_dump())
session.add(project)
await session.commit()
await session.refresh(project)
return project
except IntegrityError as e:
await session.rollback()
if "unique constraint" in str(e).lower():
raise HTTPException(
status_code=400,
detail="Project with this name already exists"
)
if "foreign key constraint" in str(e).lower():
raise HTTPException(
status_code=400,
detail="Referenced record does not exist"
)
raise HTTPException(status_code=500, detail="Database error")

Common IntegrityError causes:

ConstraintExampleResponse
UniqueDuplicate email400: Already exists
Foreign keyInvalid project_id400: Parent not found
Not nullMissing required field400: Field required

Context Manager Pattern

Use async with session.begin() for automatic commit/rollback:

async def transactional_update(session: AsyncSession):
async with session.begin():
# All operations in this block are in one transaction
task = await session.get(Task, 42)
task.status = "completed"

audit = AuditLog(entity_id=42, action="completed")
session.add(audit)

# Auto-commit on successful exit
# Auto-rollback on exception

Benefits:

  • Commit happens automatically on clean exit
  • Rollback happens automatically on exception
  • No need for try/except/rollback boilerplate

Caution: Don't mix with manual commit() calls:

# WRONG - double commit
async with session.begin():
session.add(task)
await session.commit() # Error! begin() manages the transaction

Nested Operations

When calling functions that might use the session:

async def outer_operation(session: AsyncSession):
# Start transaction
project = Project(name="New Project")
session.add(project)
await session.flush()

# Call inner function - shares the transaction
await create_initial_tasks(session, project.id)

# Single commit for everything
await session.commit()

async def create_initial_tasks(session: AsyncSession, project_id: int):
# These adds are part of the outer transaction
tasks = [
Task(title="Setup", project_id=project_id),
Task(title="Planning", project_id=project_id),
]
session.add_all(tasks)
# Don't commit here - outer function handles it

Complete Error Handling Pattern

Production-ready service with error handling:

from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from fastapi import HTTPException
import logging

logger = logging.getLogger(__name__)

class TaskService:
def __init__(self, session: AsyncSession):
self.session = session

async def create(self, data: TaskCreate, created_by_id: int) -> Task:
try:
task = Task(
**data.model_dump(),
created_by_id=created_by_id,
)
self.session.add(task)
await self.session.flush()

# Create audit log
audit = AuditLog(
entity_type="task",
entity_id=task.id,
action="created",
actor_id=created_by_id,
)
self.session.add(audit)

await self.session.commit()
await self.session.refresh(task)
return task

except IntegrityError as e:
await self.session.rollback()
logger.warning(f"IntegrityError creating task: {e}")

if "project_id" in str(e):
raise HTTPException(400, "Project not found")
if "created_by_id" in str(e):
raise HTTPException(400, "Creator not found")
raise HTTPException(400, "Invalid data")

except SQLAlchemyError as e:
await self.session.rollback()
logger.error(f"Database error creating task: {e}")
raise HTTPException(500, "Database error")

Output (on FK violation):

WARNING: IntegrityError creating task: insert or update on table "task" violates foreign key constraint
HTTP 400: Project not found

Try With AI

Prompt 1: Atomic Bulk Update

I need to update all tasks in a project to "archived" status,
and create a single audit log entry for the bulk action.

If any task update fails, I want the entire operation to rollback
including the audit log.

Show me the transactional pattern for this.

What you're learning: Bulk transaction patterns—ensuring atomicity across multiple records.

Prompt 2: Retry on Deadlock

My agent occasionally gets "deadlock detected" errors when
multiple agents update the same project simultaneously.

Show me a retry pattern that:
1. Catches deadlock errors
2. Retries up to 3 times with exponential backoff
3. Rolls back and re-starts the transaction on each retry

What you're learning: Concurrency handling—recovering from database contention.

Prompt 3: Partial Success Handling

I'm importing tasks from a CSV. Some rows might have invalid data.
I want to:
1. Insert all valid rows
2. Skip invalid rows
3. Return a report of what succeeded and what failed

Should I use one transaction or many? Show me the pattern.

What you're learning: Batch processing—balancing atomicity with partial success requirements.

Safety Note

Never catch exceptions without rollback. A session with uncommitted changes after an error can corrupt subsequent operations. Always rollback in exception handlers.


Reflect on Your Skill

You built a relational-db-agent skill in Lesson 0. Test its transaction knowledge.

Test Your Skill

Using my relational-db-agent skill, generate a function that:
1. Creates a Project
2. Creates 3 initial Tasks for the project
3. Creates an AuditLog entry
4. Uses flush() to get IDs before commit
5. Rolls back everything if any step fails

Identify Gaps

Ask yourself:

  • Did my skill use flush() to get project.id before creating tasks?
  • Did it include try/except with rollback() in the except block?
  • Did it handle IntegrityError specifically?
  • Did it use a single commit() at the end?

Improve Your Skill

If you found gaps:

My relational-db-agent skill doesn't handle IntegrityError.
Add this pattern for all create/update operations:

from sqlalchemy.exc import IntegrityError

try:
await session.commit()
except IntegrityError:
await session.rollback()
raise HTTPException(400, "Constraint violation")

Your skill now generates robust transaction patterns.