Skip to main content

Migrations with Alembic

Your models define schema in Python. Alembic translates model changes into database migrations—versioned scripts that modify your database schema safely.

For async SQLModel, you use Alembic's async template.

Initializing Alembic

Create the migrations directory:

alembic init -t async alembic

Output:

Creating directory /project/alembic ... done
Creating directory /project/alembic/versions ... done
Generating /project/alembic.ini ... done
Generating /project/alembic/env.py ... done
Generating /project/alembic/README ... done
Generating /project/alembic/script.py.mako ... done

What -t async does: Creates an env.py configured for async database operations instead of sync.

Directory Structure

project/
├── alembic/
│ ├── versions/ # Migration scripts go here
│ ├── env.py # Configuration (you'll edit this)
│ ├── script.py.mako # Template for new migrations
│ └── README
├── alembic.ini # Alembic settings
└── app/
└── models/ # Your SQLModel classes

Configure alembic.ini

Set your database URL:

# alembic.ini
[alembic]
script_location = alembic
sqlalchemy.url = postgresql+asyncpg://user:password@localhost:5432/taskdb

For environment variables:

# alembic.ini - use placeholder
sqlalchemy.url = driver://user:pass@localhost/dbname

Then override in env.py (shown below).

Configure env.py

This is the critical file. Replace the default with:

# alembic/env.py
import asyncio
from logging.config import fileConfig

from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import create_async_engine

from alembic import context

# CRITICAL: Import all your models here!
from app.models.task import Task
from app.models.project import Project
from app.models.worker import Worker
from app.database import get_async_database_url

from sqlmodel import SQLModel
import os

# Alembic Config object
config = context.config

# Set up logging
if config.config_file_name is not None:
fileConfig(config.config_file_name)

# Use SQLModel's metadata
target_metadata = SQLModel.metadata


def get_url():
"""Get async database URL from environment."""
url = os.getenv("DATABASE_URL", config.get_main_option("sqlalchemy.url"))
return get_async_database_url(url)


def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode."""
url = get_url()
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)

with context.begin_transaction():
context.run_migrations()


def do_run_migrations(connection: Connection) -> None:
"""Run migrations with connection."""
context.configure(
connection=connection,
target_metadata=target_metadata,
)

with context.begin_transaction():
context.run_migrations()


async def run_async_migrations() -> None:
"""Run migrations in 'online' mode with async engine."""
connectable = create_async_engine(
get_url(),
poolclass=pool.NullPool,
)

async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)

await connectable.dispose()


def run_migrations_online() -> None:
"""Entry point for online migrations."""
asyncio.run(run_async_migrations())


if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

Critical line:

from app.models.task import Task  # Must import ALL models

Why import models? Alembic detects schema by comparing SQLModel.metadata to the database. If models aren't imported, they're not in metadata, and Alembic won't see them.

Generating Migrations

After changing models, generate a migration:

alembic revision --autogenerate -m "Add tasks table"

Output:

INFO  [alembic.autogenerate.compare] Detected added table 'task'
INFO [alembic.autogenerate.compare] Detected added index 'ix_task_project_id' on '['project_id']'
Generating /project/alembic/versions/abc123_add_tasks_table.py ... done

The generated file:

# alembic/versions/abc123_add_tasks_table.py
"""Add tasks table

Revision ID: abc123
Revises:
Create Date: 2024-01-15 10:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

revision = 'abc123'
down_revision = None
branch_labels = None
depends_on = None


def upgrade() -> None:
op.create_table(
'task',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('title', sa.String(length=500), nullable=False),
sa.Column('status', sa.String(length=50), nullable=False),
sa.Column('tags', postgresql.JSONB(), server_default='[]', nullable=False),
sa.Column('project_id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.ForeignKeyConstraint(['project_id'], ['project.id']),
)
op.create_index('ix_task_project_id', 'task', ['project_id'])


def downgrade() -> None:
op.drop_index('ix_task_project_id', table_name='task')
op.drop_table('task')

Running Migrations

Apply pending migrations:

alembic upgrade head

Output:

INFO  [alembic.runtime.migration] Running upgrade  -> abc123, Add tasks table

Other commands:

# Upgrade to specific revision
alembic upgrade abc123

# Downgrade one revision
alembic downgrade -1

# Downgrade to specific revision
alembic downgrade abc123

# Show current revision
alembic current

# Show migration history
alembic history

Adding Columns

When you add a field to a model:

class Task(SQLModel, table=True):
# ... existing fields ...
priority: str = Field(default="medium") # NEW

Generate and apply:

alembic revision --autogenerate -m "Add priority to tasks"
alembic upgrade head

Generated migration:

def upgrade() -> None:
op.add_column(
'task',
sa.Column('priority', sa.String(length=20), nullable=False, server_default='medium')
)

def downgrade() -> None:
op.drop_column('task', 'priority')

Data Migrations

Sometimes you need to transform existing data:

"""Add priority and set existing tasks to medium

Revision ID: def456
"""
from alembic import op
import sqlalchemy as sa


def upgrade() -> None:
# 1. Add column (nullable first)
op.add_column(
'task',
sa.Column('priority', sa.String(length=20), nullable=True)
)

# 2. Update existing rows
conn = op.get_bind()
conn.execute(sa.text("UPDATE task SET priority = 'medium' WHERE priority IS NULL"))

# 3. Make column non-nullable
op.alter_column('task', 'priority', nullable=False, server_default='medium')


def downgrade() -> None:
op.drop_column('task', 'priority')

Note: Data migrations use sync operations with op.get_bind(), not async.

JSONB Column Migration

Adding JSONB columns:

from sqlalchemy.dialects.postgresql import JSONB

def upgrade() -> None:
op.add_column(
'task',
sa.Column(
'metadata',
JSONB(),
nullable=False,
server_default='{}',
)
)

def downgrade() -> None:
op.drop_column('task', 'metadata')

Index Migrations

Add indexes for performance:

def upgrade() -> None:
op.create_index(
'ix_task_status_priority',
'task',
['status', 'priority'],
)

def downgrade() -> None:
op.drop_index('ix_task_status_priority', table_name='task')

Backup and Recovery

Before any migration, back up your data. Database disasters happen—hardware fails, migrations break, developers make mistakes.

Pre-Migration Backup

Always back up before running migrations:

# PostgreSQL backup
pg_dump -h localhost -U user -d taskdb -F c -f backup_before_migration.dump

# With timestamp
pg_dump -h localhost -U user -d taskdb -F c -f "backup_$(date +%Y%m%d_%H%M%S).dump"

Options explained:

FlagPurpose
-F cCustom format (compressed, most flexible)
-F pPlain SQL (human-readable)
-F tTar format
-fOutput file

Automated Backup Script

Create a script for pre-migration backups:

#!/bin/bash
# scripts/backup-before-migration.sh

set -e # Exit on error

BACKUP_DIR="./backups"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="$BACKUP_DIR/pre_migration_$TIMESTAMP.dump"

mkdir -p "$BACKUP_DIR"

echo "Creating backup: $BACKUP_FILE"
pg_dump "$DATABASE_URL" -F c -f "$BACKUP_FILE"

echo "Backup complete. Size: $(du -h $BACKUP_FILE | cut -f1)"
echo "Running migrations..."
alembic upgrade head

echo "Migration complete. Keeping backup for 7 days."
find "$BACKUP_DIR" -name "*.dump" -mtime +7 -delete

Restore from Backup

If a migration fails:

# Restore entire database
pg_restore -h localhost -U user -d taskdb -c backup_before_migration.dump

# -c drops objects before recreating them

Restore specific tables:

pg_restore -h localhost -U user -d taskdb -t task backup.dump

Point-in-Time Recovery

For production, enable continuous archiving:

-- postgresql.conf
archive_mode = on
archive_command = 'cp %p /path/to/archive/%f'

This allows recovery to any point in time, not just backup snapshots.

Cloud Database Backups

ProviderBackup Method
AWS RDSAutomated snapshots + manual snapshots
NeonBranch databases for testing migrations
SupabaseDaily backups + point-in-time recovery
RailwayAutomatic daily backups

Neon branching for safe migrations:

# Create branch from production
neon branch create --name migration-test --parent main

# Test migration on branch
DATABASE_URL="$BRANCH_URL" alembic upgrade head

# If successful, merge or run on main

Backup Verification

Test your backups regularly:

# Create test database
createdb taskdb_test

# Restore backup
pg_restore -h localhost -U user -d taskdb_test backup.dump

# Verify data
psql taskdb_test -c "SELECT COUNT(*) FROM task;"

# Cleanup
dropdb taskdb_test

Golden rule: An untested backup is not a backup.

Troubleshooting

"Target database is not up to date"

alembic stamp head  # Mark current DB as up to date

"Can't locate revision"

Your versions folder is missing files. Check git history or restore from backup.

Autogenerate misses changes

  1. Verify model imports in env.py
  2. Check target_metadata = SQLModel.metadata
  3. Ensure model has table=True

PYTHONPATH issues

PYTHONPATH=. alembic upgrade head

Try With AI

Prompt 1: Add New Table

I added a Comment model to my app:

class Comment(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
content: str
task_id: int = Field(foreign_key="task.id")
created_by_id: int = Field(foreign_key="worker.id")
created_at: datetime = Field(default_factory=datetime.utcnow)

Walk me through:
1. What to add to env.py
2. Generating the migration
3. Reviewing and applying it

What you're learning: New table workflow—the complete process for adding models.

Prompt 2: Rename Column

I need to rename the "title" column in Task to "name".
This is a production database with existing data.

Show me the migration that:
1. Renames the column (preserving data)
2. Can be safely rolled back

What you're learning: Column modification—handling schema changes that affect existing data.

Prompt 3: Production Deployment

I have 10 pending migrations to deploy to production.
What's the safest process to:
1. Back up the database
2. Run migrations with minimal downtime
3. Verify success
4. Roll back if something fails

Include the actual commands.

What you're learning: Deployment process—applying migrations safely in production.

Safety Note

Always back up production databases before migrations. Test migrations on a copy of production data first. Never run untested migrations directly on production.


Reflect on Your Skill

You built a relational-db-agent skill in Lesson 0. Test its migration knowledge.

Test Your Skill

Using my relational-db-agent skill, show me:
1. How to initialize Alembic for async
2. What imports are needed in env.py
3. Commands to generate and apply a migration

Identify Gaps

Ask yourself:

  • Did my skill mention alembic init -t async?
  • Did it emphasize importing all models in env.py?
  • Did it use SQLModel.metadata as target_metadata?
  • Did it show upgrade head command?

Improve Your Skill

If you found gaps:

My relational-db-agent skill doesn't explain model imports.
Add this critical note:

## Alembic Model Imports (CRITICAL)
In env.py, you MUST import every SQLModel class:
from app.models.task import Task
from app.models.project import Project
# ... all models

Without these imports, Alembic won't detect your tables!

Your skill now generates correct migration setup.