Skip to main content

Capstone: Complete Task API Database Layer

You've learned each piece: engines, sessions, models, CRUD, relationships, transactions, and migrations. Now you'll combine them into a production-ready database layer for the Task API.

This is what you'd build for a real agent backend.

The Assignment

Build a complete database layer for the Task API with:

  1. Database Configuration - Async engine with proper pooling
  2. Models - Task, Project, Worker with all relationships
  3. Service Layer - CRUD operations with eager loading
  4. Transaction Handling - Multi-operation atomic updates
  5. Migrations - Alembic setup for schema management

Project Structure

task-api/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app
│ ├── database.py # Engine and session
│ ├── models/
│ │ ├── __init__.py
│ │ ├── task.py
│ │ ├── project.py
│ │ └── worker.py
│ ├── services/
│ │ ├── __init__.py
│ │ └── task_service.py
│ └── routers/
│ ├── __init__.py
│ └── tasks.py
├── alembic/
│ ├── versions/
│ └── env.py
├── alembic.ini
├── pyproject.toml
└── tests/
└── test_database.py

Part 1: Database Configuration

Create app/database.py:

Requirements:

  • Async engine with create_async_engine
  • Connection pooling (pool_pre_ping=True, configurable pool size)
  • URL conversion for PostgreSQL and SQLite
  • Session factory with expire_on_commit=False
  • get_session() dependency
  • create_db_and_tables() for initialization

Reference pattern:

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlmodel.ext.asyncio.session import AsyncSession

Part 2: Models

Create three models in app/models/:

Worker Model

Fields:

  • id: Primary key
  • handle: Unique string (e.g., "@john", "@researcher")
  • type: "human" or "agent"
  • email: Optional, for humans
  • created_at: Timestamp

Relationships:

  • assigned_tasks: Tasks assigned to this worker
  • created_tasks: Tasks created by this worker

Project Model

Fields:

  • id: Primary key
  • name: Required string
  • description: Optional text
  • status: Default "active"
  • metadata: JSONB dict
  • created_at, updated_at: Timestamps

Relationships:

  • tasks: All tasks in this project

Task Model

Fields:

  • id: Primary key
  • title: Required string (max 500)
  • description: Optional text
  • status: Default "pending"
  • priority: Default "medium"
  • tags: JSONB list
  • project_id: FK to project (required)
  • assignee_id: FK to worker (optional)
  • created_by_id: FK to worker (required)
  • parent_task_id: FK to self (optional)
  • created_at, updated_at, due_date: Timestamps

Relationships:

  • project: The project this task belongs to
  • assignee: Worker assigned to this task
  • created_by: Worker who created this task
  • parent: Parent task (self-referential)
  • subtasks: Child tasks (self-referential)

Part 3: Service Layer

Create app/services/task_service.py:

Methods:

  • create(data, created_by_id): Create task with audit
  • get(task_id): Get single task with assignee loaded
  • list_by_project(project_id, status): List with eager loading
  • update(task, data): Update with timestamp
  • delete(task): Delete task
  • assign(task, worker_id): Assign task to worker

Requirements:

  • Use selectinload for relationships
  • Use result.unique().all() pattern
  • Handle IntegrityError appropriately
  • Use transactions for multi-step operations

Part 4: Transaction Handling

Implement these transactional operations in the service:

Create Project with Initial Tasks

async def create_project_with_tasks(
session: AsyncSession,
project_data: ProjectCreate,
task_titles: list[str],
created_by_id: int,
) -> Project:
"""Create project and initial tasks atomically."""
# Your implementation

Requirements:

  • Create project, flush to get ID
  • Create all tasks with project_id
  • Single commit for everything
  • Rollback on any failure

Bulk Status Update

async def complete_project_tasks(
session: AsyncSession,
project_id: int,
) -> int:
"""Mark all project tasks as completed."""
# Your implementation

Requirements:

  • Update all tasks in single query
  • Update updated_at timestamp
  • Return count of affected rows

Part 5: Alembic Setup

Initialize and configure Alembic:

  1. Run alembic init -t async alembic
  2. Configure alembic.ini with your database URL
  3. Update env.py:
    • Import all models
    • Set target_metadata = SQLModel.metadata
    • Configure async migrations
  4. Generate initial migration
  5. Apply migration

Verify with:

alembic upgrade head
alembic current

Part 6: Multi-Tenancy (Extension)

Production SaaS applications serve multiple customers (tenants) from a single database. Here are the patterns.

Row-Level Multi-Tenancy

Add tenant_id to every model:

class Task(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
tenant_id: int = Field(foreign_key="tenant.id", index=True) # Required!
title: str
# ... other fields

class Tenant(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str
subdomain: str = Field(unique=True) # e.g., "acme" for acme.taskapi.com
created_at: datetime = Field(default_factory=datetime.utcnow)

Enforce tenant isolation in every query:

class TaskService:
def __init__(self, session: AsyncSession, tenant_id: int):
self.session = session
self.tenant_id = tenant_id # From authenticated user

async def list_tasks(self) -> list[Task]:
stmt = (
select(Task)
.where(Task.tenant_id == self.tenant_id) # Always filter!
.options(selectinload(Task.assignee))
)
result = await self.session.exec(stmt)
return result.unique().all()

async def get(self, task_id: int) -> Task | None:
stmt = (
select(Task)
.where(Task.id == task_id)
.where(Task.tenant_id == self.tenant_id) # Prevent cross-tenant access!
)
result = await self.session.exec(stmt)
return result.first()

Tenant Context Middleware

Pass tenant through request context:

from contextvars import ContextVar

current_tenant_id: ContextVar[int] = ContextVar("current_tenant_id")

@app.middleware("http")
async def tenant_middleware(request: Request, call_next):
# Extract tenant from subdomain or header
host = request.headers.get("host", "")
subdomain = host.split(".")[0]

async with AsyncSession(engine) as session:
stmt = select(Tenant).where(Tenant.subdomain == subdomain)
result = await session.exec(stmt)
tenant = result.first()

if not tenant:
return JSONResponse(status_code=404, content={"error": "Tenant not found"})

token = current_tenant_id.set(tenant.id)
try:
return await call_next(request)
finally:
current_tenant_id.reset(token)

Schema-Per-Tenant

For stronger isolation, use PostgreSQL schemas:

# Each tenant gets a schema: tenant_123.task, tenant_123.project
async def create_tenant_schema(engine, tenant_id: int):
schema = f"tenant_{tenant_id}"
async with engine.begin() as conn:
await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema}"))
# Create tables in schema
await conn.execute(text(f"SET search_path TO {schema}"))
await conn.run_sync(SQLModel.metadata.create_all)
PatternIsolationComplexityUse When
Row-levelLogicalLowMost SaaS apps, shared infrastructure
Schema-per-tenantStrongerMediumCompliance requirements, data separation
Database-per-tenantCompleteHighEnterprise, regulatory mandates

Security Checklist for Multi-Tenancy

  • Every table has tenant_id column
  • Every query filters by tenant_id
  • Service layer requires tenant context
  • No way to access data without tenant filter
  • Indexes include tenant_id for performance

Validation Checklist

Before submitting, verify:

Configuration

  • Engine uses pool_pre_ping=True
  • Session uses expire_on_commit=False
  • URL conversion handles postgresql and sqlite

Models

  • All models have table=True
  • JSONB columns use sa_column=Column(JSONB, ...)
  • Foreign keys have index=True
  • Self-referential uses remote_side in sa_relationship_kwargs
  • Multiple FKs to same table use foreign_keys in sa_relationship_kwargs

Service Layer

  • All queries use await
  • List queries use selectinload
  • Results use unique().all()
  • Create uses flush() before commit to get ID
  • Update sets updated_at
  • Errors trigger rollback()

Transactions

  • Multi-step operations use single commit
  • Failures rollback everything
  • IntegrityError caught and handled

Migrations

  • All models imported in env.py
  • target_metadata set correctly
  • Initial migration generated
  • Migration applies successfully

Reference Implementation

Study the reference implementation for patterns:

/Users/mjs/Documents/code/mjunaidca/taskforce_agent1/apps/api/src/taskflow_api/
├── database.py # Engine and session patterns
├── models/
│ └── task.py # Model with relationships
└── routers/
└── tasks.py # CRUD with eager loading

Try With AI

Prompt 1: Generate Complete Model

Using my relational-db-agent skill, generate the complete Task model with:
- All fields (id, title, description, status, priority, tags, project_id,
assignee_id, created_by_id, parent_task_id, timestamps)
- JSONB for tags
- All relationships (project, assignee, created_by, parent, subtasks)
- Proper sa_relationship_kwargs for multi-FK and self-referential

Include all imports.

What you're learning: Model synthesis—combining all relationship patterns in one model.

Prompt 2: Generate Service Layer

Using my relational-db-agent skill, generate TaskService with:
- create() with flush/commit and IntegrityError handling
- get() with selectinload for assignee
- list_by_project() with eager loading and optional status filter
- update() with updated_at timestamp
- delete() with proper error handling

Show complete implementation with imports.

What you're learning: Service synthesis—combining CRUD, relationships, and error handling.

Prompt 3: Review My Implementation

Review my database layer implementation for production readiness.
Check for:
1. Missing awaits
2. N+1 query potential
3. Transaction safety
4. Error handling gaps
5. Pool configuration issues

Here's my code:
[paste your implementation]

What you're learning: Code review—evaluating database code for production issues.

Safety Note

Before deploying, test your database layer with realistic data volumes. What works with 100 rows may fail with 100,000. Profile queries and add indexes based on actual usage patterns.


Reflect on Your Skill

This is the final skill reflection for the chapter. Your relational-db-agent skill should now be production-ready.

Test Your Skill

Using my relational-db-agent skill, I need a complete database layer.
Generate all components:
1. database.py (engine, session, get_session dependency)
2. Task model (full schema with all relationships)
3. TaskService (complete CRUD with transactions)
4. env.py for Alembic (with all imports)

This is the final test - the skill should generate production-quality code.

Evaluate Your Skill

Compare the generated code against this checklist:

  • Correct imports (sqlmodel.ext.asyncio.session, sqlalchemy.ext.asyncio)
  • Pool configuration (pre_ping, size, overflow)
  • JSONB columns with sa_column
  • All relationship types working
  • selectinload patterns
  • unique().all() usage
  • Transaction patterns with rollback
  • Alembic model imports

Final Improvement

If any checklist item is missing:

My relational-db-agent skill is missing [item].
Add this pattern permanently:
[specific code pattern]

This is the production version of my skill - it should generate
complete, correct code for any async SQLModel task.

Congratulations

You've built:

  1. A relational-db-agent skill that generates production database code
  2. A complete Task API database layer
  3. Understanding of async patterns, relationships, transactions, and migrations

Your skill is now a Digital FTE asset—reusable for future projects, sellable as expertise, and maintainable as patterns evolve.