Testing Database Code
Untested database code breaks in production. Tests catch bugs before users do.
Async database testing requires specific patterns. pytest-asyncio handles the event loop, fixtures manage state, and isolation prevents test pollution.
Test Setup
Installation
pip install pytest pytest-asyncio aiosqlite
Output:
Successfully installed pytest-8.0.0 pytest-asyncio-0.23.0 aiosqlite-0.19.0
Configuration
Create pytest.ini or add to pyproject.toml:
# pytest.ini
[pytest]
asyncio_mode = auto
asyncio_default_fixture_loop_scope = function
Or in pyproject.toml:
[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
What asyncio_mode = auto does: Automatically marks async tests without needing @pytest.mark.asyncio on every test.
Test Database Engine
Never test against your production database. Use SQLite for fast, isolated tests.
# tests/conftest.py
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlmodel import SQLModel
# Test database - in-memory SQLite
TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:"
@pytest.fixture(scope="function")
async def engine():
"""Create a fresh test database for each test."""
engine = create_async_engine(
TEST_DATABASE_URL,
echo=False,
connect_args={"check_same_thread": False},
)
# Create all tables
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
yield engine
# Cleanup
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.drop_all)
await engine.dispose()
Output (when test runs):
tests/test_tasks.py::test_create_task PASSED
Why in-memory SQLite?
| Approach | Speed | Isolation | PostgreSQL Compatibility |
|---|---|---|---|
| In-memory SQLite | Fastest | Per-test | Basic types only |
| File SQLite | Fast | Per-file | Basic types only |
| Test PostgreSQL | Slower | Depends on cleanup | Full |
Use SQLite for unit tests, PostgreSQL for integration tests.
Session Fixtures
Tests need database sessions. Create fixtures that handle setup and cleanup.
Basic Session Fixture
# tests/conftest.py
from sqlmodel.ext.asyncio.session import AsyncSession
@pytest.fixture
async def session(engine):
"""Provide a transactional session for each test."""
async_session = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
async with async_session() as session:
yield session
# Rollback any uncommitted changes
await session.rollback()
Usage in tests:
async def test_create_task(session):
task = Task(title="Test task", project_id=1)
session.add(task)
await session.commit()
await session.refresh(task)
assert task.id is not None
assert task.title == "Test task"
Transactional Isolation
For perfect isolation, wrap each test in a transaction that rolls back:
# tests/conftest.py
from sqlalchemy.ext.asyncio import AsyncConnection
@pytest.fixture
async def session(engine):
"""Session with automatic rollback - no cleanup needed."""
async with engine.connect() as conn:
# Start a transaction
await conn.begin()
async with AsyncSession(bind=conn) as session:
yield session
# Rollback everything - test never affects database
await conn.rollback()
Why rollback? Each test starts with a clean database. No test pollution, no cleanup code needed.
Test Data Fixtures
Create fixtures for common test data:
# tests/conftest.py
from models.project import Project
from models.worker import Worker
from models.task import Task
@pytest.fixture
async def project(session):
"""Create a test project."""
project = Project(name="Test Project", status="active")
session.add(project)
await session.flush()
return project
@pytest.fixture
async def worker(session):
"""Create a test worker."""
worker = Worker(handle="@tester", type="human", email="test@example.com")
session.add(worker)
await session.flush()
return worker
@pytest.fixture
async def task(session, project, worker):
"""Create a test task with dependencies."""
task = Task(
title="Test Task",
project_id=project.id,
created_by_id=worker.id,
)
session.add(task)
await session.flush()
return task
Usage:
async def test_task_belongs_to_project(task, project):
assert task.project_id == project.id
async def test_task_has_creator(task, worker):
assert task.created_by_id == worker.id
Output:
tests/test_tasks.py::test_task_belongs_to_project PASSED
tests/test_tasks.py::test_task_has_creator PASSED
Testing CRUD Operations
Test Create
# tests/test_task_service.py
from services.task_service import TaskService
from schemas.task import TaskCreate
async def test_create_task(session, project, worker):
service = TaskService(session)
data = TaskCreate(title="New Task", project_id=project.id)
task = await service.create(data, created_by_id=worker.id)
assert task.id is not None
assert task.title == "New Task"
assert task.status == "pending" # Default value
assert task.created_by_id == worker.id
Test Read
async def test_get_task_by_id(session, task):
service = TaskService(session)
found = await service.get(task.id)
assert found is not None
assert found.id == task.id
assert found.title == task.title
async def test_get_nonexistent_task(session):
service = TaskService(session)
found = await service.get(9999)
assert found is None
Test Update
from schemas.task import TaskUpdate
async def test_update_task(session, task):
service = TaskService(session)
update_data = TaskUpdate(title="Updated Title", status="in_progress")
updated = await service.update(task, update_data)
assert updated.title == "Updated Title"
assert updated.status == "in_progress"
assert updated.updated_at > task.created_at
Test Delete
async def test_delete_task(session, task):
service = TaskService(session)
task_id = task.id
await service.delete(task)
found = await service.get(task_id)
assert found is None
Testing Query Filters
async def test_list_by_status(session, project, worker):
service = TaskService(session)
# Create tasks with different statuses
pending = Task(title="Pending", project_id=project.id,
created_by_id=worker.id, status="pending")
done = Task(title="Done", project_id=project.id,
created_by_id=worker.id, status="completed")
session.add_all([pending, done])
await session.flush()
# Query pending only
results = await service.list_by_project(project.id, status="pending")
assert len(results) == 1
assert results[0].title == "Pending"
async def test_list_with_pagination(session, project, worker):
service = TaskService(session)
# Create 25 tasks
for i in range(25):
session.add(Task(
title=f"Task {i}",
project_id=project.id,
created_by_id=worker.id,
))
await session.flush()
# Get first page
page_1 = await service.list_paginated(limit=10, offset=0)
assert len(page_1) == 10
# Get second page
page_2 = await service.list_paginated(limit=10, offset=10)
assert len(page_2) == 10
# Pages should have different tasks
page_1_ids = {t.id for t in page_1}
page_2_ids = {t.id for t in page_2}
assert page_1_ids.isdisjoint(page_2_ids)
Testing Relationships
async def test_task_with_subtasks(session, project, worker):
# Create parent task
parent = Task(title="Parent", project_id=project.id, created_by_id=worker.id)
session.add(parent)
await session.flush()
# Create child tasks
child1 = Task(title="Child 1", project_id=project.id,
created_by_id=worker.id, parent_task_id=parent.id)
child2 = Task(title="Child 2", project_id=project.id,
created_by_id=worker.id, parent_task_id=parent.id)
session.add_all([child1, child2])
await session.flush()
# Verify relationship
from sqlmodel import select
from sqlalchemy.orm import selectinload
stmt = select(Task).where(Task.id == parent.id).options(
selectinload(Task.subtasks)
)
result = await session.exec(stmt)
loaded_parent = result.one()
assert len(loaded_parent.subtasks) == 2
Testing Error Cases
import pytest
from sqlalchemy.exc import IntegrityError
async def test_duplicate_handle_fails(session):
worker1 = Worker(handle="@duplicate", type="human")
worker2 = Worker(handle="@duplicate", type="agent")
session.add(worker1)
await session.flush()
session.add(worker2)
with pytest.raises(IntegrityError):
await session.flush()
async def test_missing_required_field(session, project):
# Task without required created_by_id
task = Task(title="Missing Creator", project_id=project.id)
session.add(task)
with pytest.raises(IntegrityError):
await session.flush()
Test Organization
File Structure
tests/
├── conftest.py # Shared fixtures
├── test_models.py # Model validation tests
├── test_task_service.py # CRUD service tests
├── test_queries.py # Complex query tests
└── test_relationships.py # Relationship tests
Running Tests
# Run all tests
pytest
# Run with output
pytest -v
# Run specific file
pytest tests/test_task_service.py
# Run specific test
pytest tests/test_task_service.py::test_create_task
# Run with coverage
pytest --cov=services --cov-report=html
Output:
========================= test session starts ==========================
platform darwin -- Python 3.11.0, pytest-8.0.0, pluggy-1.3.0
asyncio: mode=auto
collected 15 items
tests/test_task_service.py::test_create_task PASSED
tests/test_task_service.py::test_get_task_by_id PASSED
tests/test_task_service.py::test_get_nonexistent_task PASSED
tests/test_task_service.py::test_update_task PASSED
tests/test_task_service.py::test_delete_task PASSED
tests/test_queries.py::test_list_by_status PASSED
...
========================= 15 passed in 0.42s ===========================
Complete Test Conftest
Here's a production-ready conftest.py:
# tests/conftest.py
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlmodel import SQLModel
from models.project import Project
from models.worker import Worker
from models.task import Task
TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:"
@pytest.fixture(scope="function")
async def engine():
"""Create a fresh test database for each test."""
engine = create_async_engine(
TEST_DATABASE_URL,
echo=False,
connect_args={"check_same_thread": False},
)
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
yield engine
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.drop_all)
await engine.dispose()
@pytest.fixture
async def session(engine):
"""Provide a transactional session with automatic rollback."""
async with engine.connect() as conn:
await conn.begin()
async_session = sessionmaker(
bind=conn,
class_=AsyncSession,
expire_on_commit=False,
)
async with async_session() as session:
yield session
await conn.rollback()
@pytest.fixture
async def project(session) -> Project:
"""Create a test project."""
project = Project(name="Test Project", status="active")
session.add(project)
await session.flush()
return project
@pytest.fixture
async def worker(session) -> Worker:
"""Create a test worker."""
worker = Worker(
handle="@tester",
type="human",
email="test@example.com",
)
session.add(worker)
await session.flush()
return worker
@pytest.fixture
async def task(session, project, worker) -> Task:
"""Create a test task with project and worker."""
task = Task(
title="Test Task",
project_id=project.id,
created_by_id=worker.id,
status="pending",
)
session.add(task)
await session.flush()
return task
Try With AI
Prompt 1: Generate Test Cases
I have a TaskService with these methods:
- create(data, created_by_id)
- get(task_id)
- list_by_project(project_id, status=None)
- update(task, data)
- soft_delete(task)
- restore(task)
Generate comprehensive test cases covering:
1. Happy paths for each method
2. Edge cases (empty results, invalid IDs)
3. Error cases (integrity violations)
What you're learning: Test case design—covering happy paths, edge cases, and error handling.
Prompt 2: Debug Async Test
My test is failing with this error:
"RuntimeError: Task attached to a different loop"
Here's my test:
async def test_something(session):
result = await session.exec(select(Task))
...
What's wrong and how do I fix it?
What you're learning: Async debugging—understanding event loop issues in tests.
Prompt 3: Test Factory Pattern
I'm creating a lot of test data manually. Show me how to use
factory_boy with async SQLModel to generate test fixtures.
I need factories for:
- Worker (human and agent variants)
- Project (with random names)
- Task (with various statuses)
What you're learning: Test factories—generating complex test data efficiently.
Safety Note
Tests should never run against production databases. Always use environment variables to switch between test and production configurations. Add safeguards that prevent tests from running if DATABASE_URL points to production.
Reflect on Your Skill
You built a relational-db-agent skill in Lesson 0. Test its testing knowledge.
Test Your Skill
Using my relational-db-agent skill, generate a conftest.py with:
- Test engine fixture (in-memory SQLite)
- Session fixture with automatic rollback
- Project, Worker, and Task data fixtures
Identify Gaps
Ask yourself:
- Did my skill use
sqlite+aiosqlite:///:memory:for in-memory database? - Did it include
await conn.rollback()for test isolation? - Did it use
expire_on_commit=Falsein the session factory? - Did it properly chain fixtures (task depends on project and worker)?
Improve Your Skill
If you found gaps:
My relational-db-agent skill doesn't include testing patterns.
Add this knowledge:
For test fixtures:
1. Use in-memory SQLite: sqlite+aiosqlite:///:memory:
2. Wrap each test in a transaction that rolls back
3. Chain fixtures with dependencies
4. Use expire_on_commit=False for detached objects
Your skill now generates testable database code.