Skip to main content

Testing Database Code

Untested database code breaks in production. Tests catch bugs before users do.

Async database testing requires specific patterns. pytest-asyncio handles the event loop, fixtures manage state, and isolation prevents test pollution.

Test Setup

Installation

pip install pytest pytest-asyncio aiosqlite

Output:

Successfully installed pytest-8.0.0 pytest-asyncio-0.23.0 aiosqlite-0.19.0

Configuration

Create pytest.ini or add to pyproject.toml:

# pytest.ini
[pytest]
asyncio_mode = auto
asyncio_default_fixture_loop_scope = function

Or in pyproject.toml:

[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"

What asyncio_mode = auto does: Automatically marks async tests without needing @pytest.mark.asyncio on every test.

Test Database Engine

Never test against your production database. Use SQLite for fast, isolated tests.

# tests/conftest.py
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlmodel import SQLModel

# Test database - in-memory SQLite
TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:"

@pytest.fixture(scope="function")
async def engine():
"""Create a fresh test database for each test."""
engine = create_async_engine(
TEST_DATABASE_URL,
echo=False,
connect_args={"check_same_thread": False},
)

# Create all tables
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)

yield engine

# Cleanup
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.drop_all)

await engine.dispose()

Output (when test runs):

tests/test_tasks.py::test_create_task PASSED

Why in-memory SQLite?

ApproachSpeedIsolationPostgreSQL Compatibility
In-memory SQLiteFastestPer-testBasic types only
File SQLiteFastPer-fileBasic types only
Test PostgreSQLSlowerDepends on cleanupFull

Use SQLite for unit tests, PostgreSQL for integration tests.

Session Fixtures

Tests need database sessions. Create fixtures that handle setup and cleanup.

Basic Session Fixture

# tests/conftest.py
from sqlmodel.ext.asyncio.session import AsyncSession

@pytest.fixture
async def session(engine):
"""Provide a transactional session for each test."""
async_session = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)

async with async_session() as session:
yield session
# Rollback any uncommitted changes
await session.rollback()

Usage in tests:

async def test_create_task(session):
task = Task(title="Test task", project_id=1)
session.add(task)
await session.commit()
await session.refresh(task)

assert task.id is not None
assert task.title == "Test task"

Transactional Isolation

For perfect isolation, wrap each test in a transaction that rolls back:

# tests/conftest.py
from sqlalchemy.ext.asyncio import AsyncConnection

@pytest.fixture
async def session(engine):
"""Session with automatic rollback - no cleanup needed."""
async with engine.connect() as conn:
# Start a transaction
await conn.begin()

async with AsyncSession(bind=conn) as session:
yield session

# Rollback everything - test never affects database
await conn.rollback()

Why rollback? Each test starts with a clean database. No test pollution, no cleanup code needed.

Test Data Fixtures

Create fixtures for common test data:

# tests/conftest.py
from models.project import Project
from models.worker import Worker
from models.task import Task

@pytest.fixture
async def project(session):
"""Create a test project."""
project = Project(name="Test Project", status="active")
session.add(project)
await session.flush()
return project

@pytest.fixture
async def worker(session):
"""Create a test worker."""
worker = Worker(handle="@tester", type="human", email="test@example.com")
session.add(worker)
await session.flush()
return worker

@pytest.fixture
async def task(session, project, worker):
"""Create a test task with dependencies."""
task = Task(
title="Test Task",
project_id=project.id,
created_by_id=worker.id,
)
session.add(task)
await session.flush()
return task

Usage:

async def test_task_belongs_to_project(task, project):
assert task.project_id == project.id

async def test_task_has_creator(task, worker):
assert task.created_by_id == worker.id

Output:

tests/test_tasks.py::test_task_belongs_to_project PASSED
tests/test_tasks.py::test_task_has_creator PASSED

Testing CRUD Operations

Test Create

# tests/test_task_service.py
from services.task_service import TaskService
from schemas.task import TaskCreate

async def test_create_task(session, project, worker):
service = TaskService(session)
data = TaskCreate(title="New Task", project_id=project.id)

task = await service.create(data, created_by_id=worker.id)

assert task.id is not None
assert task.title == "New Task"
assert task.status == "pending" # Default value
assert task.created_by_id == worker.id

Test Read

async def test_get_task_by_id(session, task):
service = TaskService(session)

found = await service.get(task.id)

assert found is not None
assert found.id == task.id
assert found.title == task.title

async def test_get_nonexistent_task(session):
service = TaskService(session)

found = await service.get(9999)

assert found is None

Test Update

from schemas.task import TaskUpdate

async def test_update_task(session, task):
service = TaskService(session)
update_data = TaskUpdate(title="Updated Title", status="in_progress")

updated = await service.update(task, update_data)

assert updated.title == "Updated Title"
assert updated.status == "in_progress"
assert updated.updated_at > task.created_at

Test Delete

async def test_delete_task(session, task):
service = TaskService(session)
task_id = task.id

await service.delete(task)

found = await service.get(task_id)
assert found is None

Testing Query Filters

async def test_list_by_status(session, project, worker):
service = TaskService(session)

# Create tasks with different statuses
pending = Task(title="Pending", project_id=project.id,
created_by_id=worker.id, status="pending")
done = Task(title="Done", project_id=project.id,
created_by_id=worker.id, status="completed")
session.add_all([pending, done])
await session.flush()

# Query pending only
results = await service.list_by_project(project.id, status="pending")

assert len(results) == 1
assert results[0].title == "Pending"


async def test_list_with_pagination(session, project, worker):
service = TaskService(session)

# Create 25 tasks
for i in range(25):
session.add(Task(
title=f"Task {i}",
project_id=project.id,
created_by_id=worker.id,
))
await session.flush()

# Get first page
page_1 = await service.list_paginated(limit=10, offset=0)
assert len(page_1) == 10

# Get second page
page_2 = await service.list_paginated(limit=10, offset=10)
assert len(page_2) == 10

# Pages should have different tasks
page_1_ids = {t.id for t in page_1}
page_2_ids = {t.id for t in page_2}
assert page_1_ids.isdisjoint(page_2_ids)

Testing Relationships

async def test_task_with_subtasks(session, project, worker):
# Create parent task
parent = Task(title="Parent", project_id=project.id, created_by_id=worker.id)
session.add(parent)
await session.flush()

# Create child tasks
child1 = Task(title="Child 1", project_id=project.id,
created_by_id=worker.id, parent_task_id=parent.id)
child2 = Task(title="Child 2", project_id=project.id,
created_by_id=worker.id, parent_task_id=parent.id)
session.add_all([child1, child2])
await session.flush()

# Verify relationship
from sqlmodel import select
from sqlalchemy.orm import selectinload

stmt = select(Task).where(Task.id == parent.id).options(
selectinload(Task.subtasks)
)
result = await session.exec(stmt)
loaded_parent = result.one()

assert len(loaded_parent.subtasks) == 2

Testing Error Cases

import pytest
from sqlalchemy.exc import IntegrityError

async def test_duplicate_handle_fails(session):
worker1 = Worker(handle="@duplicate", type="human")
worker2 = Worker(handle="@duplicate", type="agent")

session.add(worker1)
await session.flush()

session.add(worker2)

with pytest.raises(IntegrityError):
await session.flush()


async def test_missing_required_field(session, project):
# Task without required created_by_id
task = Task(title="Missing Creator", project_id=project.id)
session.add(task)

with pytest.raises(IntegrityError):
await session.flush()

Test Organization

File Structure

tests/
├── conftest.py # Shared fixtures
├── test_models.py # Model validation tests
├── test_task_service.py # CRUD service tests
├── test_queries.py # Complex query tests
└── test_relationships.py # Relationship tests

Running Tests

# Run all tests
pytest

# Run with output
pytest -v

# Run specific file
pytest tests/test_task_service.py

# Run specific test
pytest tests/test_task_service.py::test_create_task

# Run with coverage
pytest --cov=services --cov-report=html

Output:

========================= test session starts ==========================
platform darwin -- Python 3.11.0, pytest-8.0.0, pluggy-1.3.0
asyncio: mode=auto
collected 15 items

tests/test_task_service.py::test_create_task PASSED
tests/test_task_service.py::test_get_task_by_id PASSED
tests/test_task_service.py::test_get_nonexistent_task PASSED
tests/test_task_service.py::test_update_task PASSED
tests/test_task_service.py::test_delete_task PASSED
tests/test_queries.py::test_list_by_status PASSED
...

========================= 15 passed in 0.42s ===========================

Complete Test Conftest

Here's a production-ready conftest.py:

# tests/conftest.py
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlmodel import SQLModel

from models.project import Project
from models.worker import Worker
from models.task import Task

TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:"


@pytest.fixture(scope="function")
async def engine():
"""Create a fresh test database for each test."""
engine = create_async_engine(
TEST_DATABASE_URL,
echo=False,
connect_args={"check_same_thread": False},
)

async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)

yield engine

async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.drop_all)

await engine.dispose()


@pytest.fixture
async def session(engine):
"""Provide a transactional session with automatic rollback."""
async with engine.connect() as conn:
await conn.begin()

async_session = sessionmaker(
bind=conn,
class_=AsyncSession,
expire_on_commit=False,
)

async with async_session() as session:
yield session

await conn.rollback()


@pytest.fixture
async def project(session) -> Project:
"""Create a test project."""
project = Project(name="Test Project", status="active")
session.add(project)
await session.flush()
return project


@pytest.fixture
async def worker(session) -> Worker:
"""Create a test worker."""
worker = Worker(
handle="@tester",
type="human",
email="test@example.com",
)
session.add(worker)
await session.flush()
return worker


@pytest.fixture
async def task(session, project, worker) -> Task:
"""Create a test task with project and worker."""
task = Task(
title="Test Task",
project_id=project.id,
created_by_id=worker.id,
status="pending",
)
session.add(task)
await session.flush()
return task

Try With AI

Prompt 1: Generate Test Cases

I have a TaskService with these methods:
- create(data, created_by_id)
- get(task_id)
- list_by_project(project_id, status=None)
- update(task, data)
- soft_delete(task)
- restore(task)

Generate comprehensive test cases covering:
1. Happy paths for each method
2. Edge cases (empty results, invalid IDs)
3. Error cases (integrity violations)

What you're learning: Test case design—covering happy paths, edge cases, and error handling.

Prompt 2: Debug Async Test

My test is failing with this error:
"RuntimeError: Task attached to a different loop"

Here's my test:
async def test_something(session):
result = await session.exec(select(Task))
...

What's wrong and how do I fix it?

What you're learning: Async debugging—understanding event loop issues in tests.

Prompt 3: Test Factory Pattern

I'm creating a lot of test data manually. Show me how to use
factory_boy with async SQLModel to generate test fixtures.

I need factories for:
- Worker (human and agent variants)
- Project (with random names)
- Task (with various statuses)

What you're learning: Test factories—generating complex test data efficiently.

Safety Note

Tests should never run against production databases. Always use environment variables to switch between test and production configurations. Add safeguards that prevent tests from running if DATABASE_URL points to production.


Reflect on Your Skill

You built a relational-db-agent skill in Lesson 0. Test its testing knowledge.

Test Your Skill

Using my relational-db-agent skill, generate a conftest.py with:
- Test engine fixture (in-memory SQLite)
- Session fixture with automatic rollback
- Project, Worker, and Task data fixtures

Identify Gaps

Ask yourself:

  • Did my skill use sqlite+aiosqlite:///:memory: for in-memory database?
  • Did it include await conn.rollback() for test isolation?
  • Did it use expire_on_commit=False in the session factory?
  • Did it properly chain fixtures (task depends on project and worker)?

Improve Your Skill

If you found gaps:

My relational-db-agent skill doesn't include testing patterns.
Add this knowledge:

For test fixtures:
1. Use in-memory SQLite: sqlite+aiosqlite:///:memory:
2. Wrap each test in a transaction that rolls back
3. Chain fixtures with dependencies
4. Use expire_on_commit=False for detached objects

Your skill now generates testable database code.