Capstone: Complete Task API Database Layer
You've learned each piece: engines, sessions, models, CRUD, relationships, transactions, and migrations. Now you'll combine them into a production-ready database layer for the Task API.
This is what you'd build for a real agent backend.
The Assignment
Build a complete database layer for the Task API with:
- Database Configuration - Async engine with proper pooling
- Models - Task, Project, Worker with all relationships
- Service Layer - CRUD operations with eager loading
- Transaction Handling - Multi-operation atomic updates
- Migrations - Alembic setup for schema management
Project Structure
task-api/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app
│ ├── database.py # Engine and session
│ ├── models/
│ │ ├── __init__.py
│ │ ├── task.py
│ │ ├── project.py
│ │ └── worker.py
│ ├── services/
│ │ ├── __init__.py
│ │ └── task_service.py
│ └── routers/
│ ├── __init__.py
│ └── tasks.py
├── alembic/
│ ├── versions/
│ └── env.py
├── alembic.ini
├── pyproject.toml
└── tests/
└── test_database.py
Part 1: Database Configuration
Create app/database.py:
Requirements:
- Async engine with
create_async_engine - Connection pooling (
pool_pre_ping=True, configurable pool size) - URL conversion for PostgreSQL and SQLite
- Session factory with
expire_on_commit=False get_session()dependencycreate_db_and_tables()for initialization
Reference pattern:
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlmodel.ext.asyncio.session import AsyncSession
Part 2: Models
Create three models in app/models/:
Worker Model
Fields:
id: Primary keyhandle: Unique string (e.g., "@john", "@researcher")type: "human" or "agent"email: Optional, for humanscreated_at: Timestamp
Relationships:
assigned_tasks: Tasks assigned to this workercreated_tasks: Tasks created by this worker
Project Model
Fields:
id: Primary keyname: Required stringdescription: Optional textstatus: Default "active"metadata: JSONB dictcreated_at,updated_at: Timestamps
Relationships:
tasks: All tasks in this project
Task Model
Fields:
id: Primary keytitle: Required string (max 500)description: Optional textstatus: Default "pending"priority: Default "medium"tags: JSONB listproject_id: FK to project (required)assignee_id: FK to worker (optional)created_by_id: FK to worker (required)parent_task_id: FK to self (optional)created_at,updated_at,due_date: Timestamps
Relationships:
project: The project this task belongs toassignee: Worker assigned to this taskcreated_by: Worker who created this taskparent: Parent task (self-referential)subtasks: Child tasks (self-referential)
Part 3: Service Layer
Create app/services/task_service.py:
Methods:
create(data, created_by_id): Create task with auditget(task_id): Get single task with assignee loadedlist_by_project(project_id, status): List with eager loadingupdate(task, data): Update with timestampdelete(task): Delete taskassign(task, worker_id): Assign task to worker
Requirements:
- Use
selectinloadfor relationships - Use
result.unique().all()pattern - Handle
IntegrityErrorappropriately - Use transactions for multi-step operations
Part 4: Transaction Handling
Implement these transactional operations in the service:
Create Project with Initial Tasks
async def create_project_with_tasks(
session: AsyncSession,
project_data: ProjectCreate,
task_titles: list[str],
created_by_id: int,
) -> Project:
"""Create project and initial tasks atomically."""
# Your implementation
Requirements:
- Create project, flush to get ID
- Create all tasks with project_id
- Single commit for everything
- Rollback on any failure
Bulk Status Update
async def complete_project_tasks(
session: AsyncSession,
project_id: int,
) -> int:
"""Mark all project tasks as completed."""
# Your implementation
Requirements:
- Update all tasks in single query
- Update
updated_attimestamp - Return count of affected rows
Part 5: Alembic Setup
Initialize and configure Alembic:
- Run
alembic init -t async alembic - Configure
alembic.iniwith your database URL - Update
env.py:- Import all models
- Set
target_metadata = SQLModel.metadata - Configure async migrations
- Generate initial migration
- Apply migration
Verify with:
alembic upgrade head
alembic current
Part 6: Multi-Tenancy (Extension)
Production SaaS applications serve multiple customers (tenants) from a single database. Here are the patterns.
Row-Level Multi-Tenancy
Add tenant_id to every model:
class Task(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
tenant_id: int = Field(foreign_key="tenant.id", index=True) # Required!
title: str
# ... other fields
class Tenant(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str
subdomain: str = Field(unique=True) # e.g., "acme" for acme.taskapi.com
created_at: datetime = Field(default_factory=datetime.utcnow)
Enforce tenant isolation in every query:
class TaskService:
def __init__(self, session: AsyncSession, tenant_id: int):
self.session = session
self.tenant_id = tenant_id # From authenticated user
async def list_tasks(self) -> list[Task]:
stmt = (
select(Task)
.where(Task.tenant_id == self.tenant_id) # Always filter!
.options(selectinload(Task.assignee))
)
result = await self.session.exec(stmt)
return result.unique().all()
async def get(self, task_id: int) -> Task | None:
stmt = (
select(Task)
.where(Task.id == task_id)
.where(Task.tenant_id == self.tenant_id) # Prevent cross-tenant access!
)
result = await self.session.exec(stmt)
return result.first()
Tenant Context Middleware
Pass tenant through request context:
from contextvars import ContextVar
current_tenant_id: ContextVar[int] = ContextVar("current_tenant_id")
@app.middleware("http")
async def tenant_middleware(request: Request, call_next):
# Extract tenant from subdomain or header
host = request.headers.get("host", "")
subdomain = host.split(".")[0]
async with AsyncSession(engine) as session:
stmt = select(Tenant).where(Tenant.subdomain == subdomain)
result = await session.exec(stmt)
tenant = result.first()
if not tenant:
return JSONResponse(status_code=404, content={"error": "Tenant not found"})
token = current_tenant_id.set(tenant.id)
try:
return await call_next(request)
finally:
current_tenant_id.reset(token)
Schema-Per-Tenant
For stronger isolation, use PostgreSQL schemas:
# Each tenant gets a schema: tenant_123.task, tenant_123.project
async def create_tenant_schema(engine, tenant_id: int):
schema = f"tenant_{tenant_id}"
async with engine.begin() as conn:
await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema}"))
# Create tables in schema
await conn.execute(text(f"SET search_path TO {schema}"))
await conn.run_sync(SQLModel.metadata.create_all)
| Pattern | Isolation | Complexity | Use When |
|---|---|---|---|
| Row-level | Logical | Low | Most SaaS apps, shared infrastructure |
| Schema-per-tenant | Stronger | Medium | Compliance requirements, data separation |
| Database-per-tenant | Complete | High | Enterprise, regulatory mandates |
Security Checklist for Multi-Tenancy
- Every table has
tenant_idcolumn - Every query filters by
tenant_id - Service layer requires tenant context
- No way to access data without tenant filter
- Indexes include
tenant_idfor performance
Validation Checklist
Before submitting, verify:
Configuration
- Engine uses
pool_pre_ping=True - Session uses
expire_on_commit=False - URL conversion handles postgresql and sqlite
Models
- All models have
table=True - JSONB columns use
sa_column=Column(JSONB, ...) - Foreign keys have
index=True - Self-referential uses
remote_sideinsa_relationship_kwargs - Multiple FKs to same table use
foreign_keysinsa_relationship_kwargs
Service Layer
- All queries use
await - List queries use
selectinload - Results use
unique().all() - Create uses
flush()before commit to get ID - Update sets
updated_at - Errors trigger
rollback()
Transactions
- Multi-step operations use single commit
- Failures rollback everything
-
IntegrityErrorcaught and handled
Migrations
- All models imported in env.py
-
target_metadataset correctly - Initial migration generated
- Migration applies successfully
Reference Implementation
Study the reference implementation for patterns:
/Users/mjs/Documents/code/mjunaidca/taskforce_agent1/apps/api/src/taskflow_api/
├── database.py # Engine and session patterns
├── models/
│ └── task.py # Model with relationships
└── routers/
└── tasks.py # CRUD with eager loading
Try With AI
Prompt 1: Generate Complete Model
Using my relational-db-agent skill, generate the complete Task model with:
- All fields (id, title, description, status, priority, tags, project_id,
assignee_id, created_by_id, parent_task_id, timestamps)
- JSONB for tags
- All relationships (project, assignee, created_by, parent, subtasks)
- Proper sa_relationship_kwargs for multi-FK and self-referential
Include all imports.
What you're learning: Model synthesis—combining all relationship patterns in one model.
Prompt 2: Generate Service Layer
Using my relational-db-agent skill, generate TaskService with:
- create() with flush/commit and IntegrityError handling
- get() with selectinload for assignee
- list_by_project() with eager loading and optional status filter
- update() with updated_at timestamp
- delete() with proper error handling
Show complete implementation with imports.
What you're learning: Service synthesis—combining CRUD, relationships, and error handling.
Prompt 3: Review My Implementation
Review my database layer implementation for production readiness.
Check for:
1. Missing awaits
2. N+1 query potential
3. Transaction safety
4. Error handling gaps
5. Pool configuration issues
Here's my code:
[paste your implementation]
What you're learning: Code review—evaluating database code for production issues.
Safety Note
Before deploying, test your database layer with realistic data volumes. What works with 100 rows may fail with 100,000. Profile queries and add indexes based on actual usage patterns.
Reflect on Your Skill
This is the final skill reflection for the chapter. Your relational-db-agent skill should now be production-ready.
Test Your Skill
Using my relational-db-agent skill, I need a complete database layer.
Generate all components:
1. database.py (engine, session, get_session dependency)
2. Task model (full schema with all relationships)
3. TaskService (complete CRUD with transactions)
4. env.py for Alembic (with all imports)
This is the final test - the skill should generate production-quality code.
Evaluate Your Skill
Compare the generated code against this checklist:
- Correct imports (sqlmodel.ext.asyncio.session, sqlalchemy.ext.asyncio)
- Pool configuration (pre_ping, size, overflow)
- JSONB columns with sa_column
- All relationship types working
- selectinload patterns
- unique().all() usage
- Transaction patterns with rollback
- Alembic model imports
Final Improvement
If any checklist item is missing:
My relational-db-agent skill is missing [item].
Add this pattern permanently:
[specific code pattern]
This is the production version of my skill - it should generate
complete, correct code for any async SQLModel task.
Congratulations
You've built:
- A
relational-db-agentskill that generates production database code - A complete Task API database layer
- Understanding of async patterns, relationships, transactions, and migrations
Your skill is now a Digital FTE asset—reusable for future projects, sellable as expertise, and maintainable as patterns evolve.