Migrations with Alembic
Your models define schema in Python. Alembic translates model changes into database migrations—versioned scripts that modify your database schema safely.
For async SQLModel, you use Alembic's async template.
Initializing Alembic
Create the migrations directory:
alembic init -t async alembic
Output:
Creating directory /project/alembic ... done
Creating directory /project/alembic/versions ... done
Generating /project/alembic.ini ... done
Generating /project/alembic/env.py ... done
Generating /project/alembic/README ... done
Generating /project/alembic/script.py.mako ... done
What -t async does: Creates an env.py configured for async database operations instead of sync.
Directory Structure
project/
├── alembic/
│ ├── versions/ # Migration scripts go here
│ ├── env.py # Configuration (you'll edit this)
│ ├── script.py.mako # Template for new migrations
│ └── README
├── alembic.ini # Alembic settings
└── app/
└── models/ # Your SQLModel classes
Configure alembic.ini
Set your database URL:
# alembic.ini
[alembic]
script_location = alembic
sqlalchemy.url = postgresql+asyncpg://user:password@localhost:5432/taskdb
For environment variables:
# alembic.ini - use placeholder
sqlalchemy.url = driver://user:pass@localhost/dbname
Then override in env.py (shown below).
Configure env.py
This is the critical file. Replace the default with:
# alembic/env.py
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import create_async_engine
from alembic import context
# CRITICAL: Import all your models here!
from app.models.task import Task
from app.models.project import Project
from app.models.worker import Worker
from app.database import get_async_database_url
from sqlmodel import SQLModel
import os
# Alembic Config object
config = context.config
# Set up logging
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Use SQLModel's metadata
target_metadata = SQLModel.metadata
def get_url():
"""Get async database URL from environment."""
url = os.getenv("DATABASE_URL", config.get_main_option("sqlalchemy.url"))
return get_async_database_url(url)
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode."""
url = get_url()
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
"""Run migrations with connection."""
context.configure(
connection=connection,
target_metadata=target_metadata,
)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""Run migrations in 'online' mode with async engine."""
connectable = create_async_engine(
get_url(),
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""Entry point for online migrations."""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
Critical line:
from app.models.task import Task # Must import ALL models
Why import models? Alembic detects schema by comparing SQLModel.metadata to the database. If models aren't imported, they're not in metadata, and Alembic won't see them.
Generating Migrations
After changing models, generate a migration:
alembic revision --autogenerate -m "Add tasks table"
Output:
INFO [alembic.autogenerate.compare] Detected added table 'task'
INFO [alembic.autogenerate.compare] Detected added index 'ix_task_project_id' on '['project_id']'
Generating /project/alembic/versions/abc123_add_tasks_table.py ... done
The generated file:
# alembic/versions/abc123_add_tasks_table.py
"""Add tasks table
Revision ID: abc123
Revises:
Create Date: 2024-01-15 10:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
revision = 'abc123'
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_table(
'task',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('title', sa.String(length=500), nullable=False),
sa.Column('status', sa.String(length=50), nullable=False),
sa.Column('tags', postgresql.JSONB(), server_default='[]', nullable=False),
sa.Column('project_id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.ForeignKeyConstraint(['project_id'], ['project.id']),
)
op.create_index('ix_task_project_id', 'task', ['project_id'])
def downgrade() -> None:
op.drop_index('ix_task_project_id', table_name='task')
op.drop_table('task')
Running Migrations
Apply pending migrations:
alembic upgrade head
Output:
INFO [alembic.runtime.migration] Running upgrade -> abc123, Add tasks table
Other commands:
# Upgrade to specific revision
alembic upgrade abc123
# Downgrade one revision
alembic downgrade -1
# Downgrade to specific revision
alembic downgrade abc123
# Show current revision
alembic current
# Show migration history
alembic history
Adding Columns
When you add a field to a model:
class Task(SQLModel, table=True):
# ... existing fields ...
priority: str = Field(default="medium") # NEW
Generate and apply:
alembic revision --autogenerate -m "Add priority to tasks"
alembic upgrade head
Generated migration:
def upgrade() -> None:
op.add_column(
'task',
sa.Column('priority', sa.String(length=20), nullable=False, server_default='medium')
)
def downgrade() -> None:
op.drop_column('task', 'priority')
Data Migrations
Sometimes you need to transform existing data:
"""Add priority and set existing tasks to medium
Revision ID: def456
"""
from alembic import op
import sqlalchemy as sa
def upgrade() -> None:
# 1. Add column (nullable first)
op.add_column(
'task',
sa.Column('priority', sa.String(length=20), nullable=True)
)
# 2. Update existing rows
conn = op.get_bind()
conn.execute(sa.text("UPDATE task SET priority = 'medium' WHERE priority IS NULL"))
# 3. Make column non-nullable
op.alter_column('task', 'priority', nullable=False, server_default='medium')
def downgrade() -> None:
op.drop_column('task', 'priority')
Note: Data migrations use sync operations with op.get_bind(), not async.
JSONB Column Migration
Adding JSONB columns:
from sqlalchemy.dialects.postgresql import JSONB
def upgrade() -> None:
op.add_column(
'task',
sa.Column(
'metadata',
JSONB(),
nullable=False,
server_default='{}',
)
)
def downgrade() -> None:
op.drop_column('task', 'metadata')
Index Migrations
Add indexes for performance:
def upgrade() -> None:
op.create_index(
'ix_task_status_priority',
'task',
['status', 'priority'],
)
def downgrade() -> None:
op.drop_index('ix_task_status_priority', table_name='task')
Backup and Recovery
Before any migration, back up your data. Database disasters happen—hardware fails, migrations break, developers make mistakes.
Pre-Migration Backup
Always back up before running migrations:
# PostgreSQL backup
pg_dump -h localhost -U user -d taskdb -F c -f backup_before_migration.dump
# With timestamp
pg_dump -h localhost -U user -d taskdb -F c -f "backup_$(date +%Y%m%d_%H%M%S).dump"
Options explained:
| Flag | Purpose |
|---|---|
-F c | Custom format (compressed, most flexible) |
-F p | Plain SQL (human-readable) |
-F t | Tar format |
-f | Output file |
Automated Backup Script
Create a script for pre-migration backups:
#!/bin/bash
# scripts/backup-before-migration.sh
set -e # Exit on error
BACKUP_DIR="./backups"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="$BACKUP_DIR/pre_migration_$TIMESTAMP.dump"
mkdir -p "$BACKUP_DIR"
echo "Creating backup: $BACKUP_FILE"
pg_dump "$DATABASE_URL" -F c -f "$BACKUP_FILE"
echo "Backup complete. Size: $(du -h $BACKUP_FILE | cut -f1)"
echo "Running migrations..."
alembic upgrade head
echo "Migration complete. Keeping backup for 7 days."
find "$BACKUP_DIR" -name "*.dump" -mtime +7 -delete
Restore from Backup
If a migration fails:
# Restore entire database
pg_restore -h localhost -U user -d taskdb -c backup_before_migration.dump
# -c drops objects before recreating them
Restore specific tables:
pg_restore -h localhost -U user -d taskdb -t task backup.dump
Point-in-Time Recovery
For production, enable continuous archiving:
-- postgresql.conf
archive_mode = on
archive_command = 'cp %p /path/to/archive/%f'
This allows recovery to any point in time, not just backup snapshots.
Cloud Database Backups
| Provider | Backup Method |
|---|---|
| AWS RDS | Automated snapshots + manual snapshots |
| Neon | Branch databases for testing migrations |
| Supabase | Daily backups + point-in-time recovery |
| Railway | Automatic daily backups |
Neon branching for safe migrations:
# Create branch from production
neon branch create --name migration-test --parent main
# Test migration on branch
DATABASE_URL="$BRANCH_URL" alembic upgrade head
# If successful, merge or run on main
Backup Verification
Test your backups regularly:
# Create test database
createdb taskdb_test
# Restore backup
pg_restore -h localhost -U user -d taskdb_test backup.dump
# Verify data
psql taskdb_test -c "SELECT COUNT(*) FROM task;"
# Cleanup
dropdb taskdb_test
Golden rule: An untested backup is not a backup.
Troubleshooting
"Target database is not up to date"
alembic stamp head # Mark current DB as up to date
"Can't locate revision"
Your versions folder is missing files. Check git history or restore from backup.
Autogenerate misses changes
- Verify model imports in env.py
- Check
target_metadata = SQLModel.metadata - Ensure model has
table=True
PYTHONPATH issues
PYTHONPATH=. alembic upgrade head
Try With AI
Prompt 1: Add New Table
I added a Comment model to my app:
class Comment(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
content: str
task_id: int = Field(foreign_key="task.id")
created_by_id: int = Field(foreign_key="worker.id")
created_at: datetime = Field(default_factory=datetime.utcnow)
Walk me through:
1. What to add to env.py
2. Generating the migration
3. Reviewing and applying it
What you're learning: New table workflow—the complete process for adding models.
Prompt 2: Rename Column
I need to rename the "title" column in Task to "name".
This is a production database with existing data.
Show me the migration that:
1. Renames the column (preserving data)
2. Can be safely rolled back
What you're learning: Column modification—handling schema changes that affect existing data.
Prompt 3: Production Deployment
I have 10 pending migrations to deploy to production.
What's the safest process to:
1. Back up the database
2. Run migrations with minimal downtime
3. Verify success
4. Roll back if something fails
Include the actual commands.
What you're learning: Deployment process—applying migrations safely in production.
Safety Note
Always back up production databases before migrations. Test migrations on a copy of production data first. Never run untested migrations directly on production.
Reflect on Your Skill
You built a relational-db-agent skill in Lesson 0. Test its migration knowledge.
Test Your Skill
Using my relational-db-agent skill, show me:
1. How to initialize Alembic for async
2. What imports are needed in env.py
3. Commands to generate and apply a migration
Identify Gaps
Ask yourself:
- Did my skill mention
alembic init -t async? - Did it emphasize importing all models in env.py?
- Did it use
SQLModel.metadataas target_metadata? - Did it show
upgrade headcommand?
Improve Your Skill
If you found gaps:
My relational-db-agent skill doesn't explain model imports.
Add this critical note:
## Alembic Model Imports (CRITICAL)
In env.py, you MUST import every SQLModel class:
from app.models.task import Task
from app.models.project import Project
# ... all models
Without these imports, Alembic won't detect your tables!
Your skill now generates correct migration setup.