Skip to main content

RAG for Task API

In Chapter 40, you built a Task API with full CRUD operations. Users can filter tasks by status, priority, and exact field matches. But what happens when a user asks: "Show me tasks related to container deployment" or "Find anything about database migrations"?

Exact matches fail. Your API only finds tasks where status="pending" or priority="high". It cannot understand that "container deployment" relates to tasks about Docker, Kubernetes, or orchestration.

This lesson bridges that gap. You will add semantic search to your Task API, enabling users to find tasks by meaning rather than exact keywords. The same RAG patterns you learned in previous lessons now become production endpoints.

The Integration Architecture

Your Task API extension follows this flow:

User Query: "database migration tasks"


┌──────────────────────────────────┐
│ /tasks/search/semantic │
│ FastAPI Endpoint │
└──────────────────────────────────┘


┌──────────────────────────────────┐
│ OpenAI Embeddings │
│ text-embedding-3-small │
└──────────────────────────────────┘


┌──────────────────────────────────┐
│ Qdrant Vector Store │
│ Collection: task_descriptions │
└──────────────────────────────────┘


┌──────────────────────────────────┐
│ Ranked Results │
│ [task_id, title, score] │
└──────────────────────────────────┘

Step 1: Add Dependencies

Your Task API needs LangChain and Qdrant libraries:

uv add langchain-qdrant langchain-openai qdrant-client

Add the OpenAI API key to your .env:

# .env
OPENAI_API_KEY=sk-your-key-here
DATABASE_URL=postgresql://...

Update your Settings class.

Create config.py:

from pydantic_settings import BaseSettings
from functools import lru_cache


class Settings(BaseSettings):
"""Application settings loaded from environment."""
database_url: str
openai_api_key: str
qdrant_url: str = "http://localhost:6333"

class Config:
env_file = ".env"


@lru_cache
def get_settings() -> Settings:
return Settings()

Output (if OPENAI_API_KEY missing):

pydantic_settings.sources.SettingsError: error loading settings
openai_api_key
Field required [type=missing]

Step 2: Create the Vector Store Service

Separate your RAG logic from your endpoints.

Create services/vector_store.py:

from langchain_openai import OpenAIEmbeddings
from langchain_qdrant import QdrantVectorStore
from langchain_core.documents import Document
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams
from typing import Optional
from uuid import uuid4

from config import get_settings


class TaskVectorStore:
"""Manages task embeddings in Qdrant."""

def __init__(self):
settings = get_settings()
self.embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
openai_api_key=settings.openai_api_key
)
self.client = QdrantClient(url=settings.qdrant_url)
self.collection_name = "task_descriptions"
self._ensure_collection()

def _ensure_collection(self):
"""Create collection if it doesn't exist."""
collections = self.client.get_collections().collections
exists = any(c.name == self.collection_name for c in collections)

if not exists:
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=1536, # text-embedding-3-small dimensions
distance=Distance.COSINE
)
)

@property
def vector_store(self) -> QdrantVectorStore:
return QdrantVectorStore(
client=self.client,
collection_name=self.collection_name,
embedding=self.embeddings
)

Output (on first initialization):

Collection 'task_descriptions' created with 1536-dimensional vectors

Step 3: Index Task Descriptions

When tasks are created or updated, index them for semantic search. Add these methods to TaskVectorStore:

class TaskVectorStore:
# ... previous methods ...

def index_task(self, task_id: int, title: str, description: str) -> str:
"""Index a single task for semantic search."""
content = f"Task: {title}\n\n{description}"

document = Document(
page_content=content,
metadata={
"task_id": task_id,
"title": title
}
)

doc_id = str(uuid4())
self.vector_store.add_documents(
documents=[document],
ids=[doc_id]
)
return doc_id

def index_tasks_batch(self, tasks: list[dict]) -> list[str]:
"""Index multiple tasks efficiently."""
documents = [
Document(
page_content=f"Task: {task['title']}\n\n{task['description']}",
metadata={
"task_id": task["id"],
"title": task["title"],
"status": task.get("status", "pending"),
"priority": task.get("priority", "medium")
}
)
for task in tasks
]

doc_ids = [str(uuid4()) for _ in documents]
self.vector_store.add_documents(documents=documents, ids=doc_ids)
return doc_ids

Output (indexing 3 tasks):

>>> store = TaskVectorStore()
>>> tasks = [
... {"id": 1, "title": "Setup Docker", "description": "Install Docker Desktop..."},
... {"id": 2, "title": "Deploy API", "description": "Deploy FastAPI to Railway..."},
... {"id": 3, "title": "Configure database", "description": "Set up PostgreSQL..."}
... ]
>>> store.index_tasks_batch(tasks)
['a1b2c3d4-...', 'e5f6g7h8-...', 'i9j0k1l2-...']

Add the search method that powers your endpoint. Add this import and method to services/vector_store.py:

from qdrant_client import models


class TaskVectorStore:
# ... previous methods ...

def semantic_search(
self,
query: str,
k: int = 5,
status_filter: Optional[str] = None,
priority_filter: Optional[str] = None
) -> list[dict]:
"""Search tasks by semantic meaning with optional filters."""

# Build filter conditions
filter_conditions = []

if status_filter:
filter_conditions.append(
models.FieldCondition(
key="metadata.status",
match=models.MatchValue(value=status_filter)
)
)

if priority_filter:
filter_conditions.append(
models.FieldCondition(
key="metadata.priority",
match=models.MatchValue(value=priority_filter)
)
)

# Create filter if conditions exist
search_filter = None
if filter_conditions:
search_filter = models.Filter(must=filter_conditions)

# Perform search with scores
results = self.vector_store.similarity_search_with_score(
query=query,
k=k,
filter=search_filter
)

# Format results
return [
{
"task_id": doc.metadata.get("task_id"),
"title": doc.metadata.get("title"),
"relevance_score": round(float(score), 3),
"snippet": doc.page_content[:200]
}
for doc, score in results
]

Output (semantic search):

>>> results = store.semantic_search("container orchestration", k=3)
>>> for r in results:
... print(f"{r['relevance_score']:.3f} - {r['title']}")
0.847 - Setup Docker
0.721 - Deploy API
0.534 - Configure database

Step 5: Create the FastAPI Endpoint

Wire the vector store service into your API.

Create routers/semantic_search.py:

from fastapi import APIRouter, Depends, Query
from pydantic import BaseModel
from typing import Optional

from services.vector_store import TaskVectorStore


router = APIRouter(prefix="/tasks/search", tags=["semantic-search"])


class SemanticSearchResult(BaseModel):
"""Individual search result with relevance score."""
task_id: int
title: str
relevance_score: float
snippet: str


class SemanticSearchResponse(BaseModel):
"""Response containing ranked search results."""
query: str
results: list[SemanticSearchResult]
count: int


def get_vector_store() -> TaskVectorStore:
"""Dependency for vector store access."""
return TaskVectorStore()


@router.post("/semantic", response_model=SemanticSearchResponse)
async def semantic_search(
query: str,
k: int = Query(default=5, ge=1, le=20, description="Number of results"),
status: Optional[str] = Query(default=None, description="Filter by status"),
priority: Optional[str] = Query(default=None, description="Filter by priority"),
vector_store: TaskVectorStore = Depends(get_vector_store)
) -> SemanticSearchResponse:
"""
Search tasks by semantic meaning.

Unlike exact-match filters, this endpoint understands that:
- "deployment" relates to Docker, Kubernetes, CI/CD
- "database issues" relates to migrations, queries, connections
- "authentication" relates to login, JWT, OAuth
"""
results = vector_store.semantic_search(
query=query,
k=k,
status_filter=status,
priority_filter=priority
)

return SemanticSearchResponse(
query=query,
results=[SemanticSearchResult(**r) for r in results],
count=len(results)
)

Output (API request):

curl -X POST "http://localhost:8000/tasks/search/semantic?query=container%20deployment&k=3"
{
"query": "container deployment",
"results": [
{
"task_id": 1,
"title": "Setup Docker",
"relevance_score": 0.847,
"snippet": "Task: Setup Docker\n\nInstall Docker Desktop and configure..."
},
{
"task_id": 2,
"title": "Deploy API",
"relevance_score": 0.721,
"snippet": "Task: Deploy API\n\nDeploy FastAPI application to Railway..."
}
],
"count": 2
}

Step 6: Register the Router

Add the semantic search router to your main app.

Update main.py:

from fastapi import FastAPI
from routers import tasks, semantic_search

app = FastAPI(title="Task API with Semantic Search")

app.include_router(tasks.router)
app.include_router(semantic_search.router)


@app.get("/health")
async def health_check():
return {"status": "healthy", "features": ["crud", "semantic_search"]}

Output:

{
"status": "healthy",
"features": ["crud", "semantic_search"]
}

Step 7: Auto-Index on Task Creation

Integrate indexing with your existing CRUD operations.

Update routers/tasks.py:

from fastapi import APIRouter, Depends, HTTPException
from services.vector_store import TaskVectorStore

router = APIRouter(prefix="/tasks", tags=["tasks"])


@router.post("/", response_model=TaskResponse)
async def create_task(
task: TaskCreate,
db: Session = Depends(get_db),
vector_store: TaskVectorStore = Depends(get_vector_store)
) -> TaskResponse:
"""Create task and index for semantic search."""

# Create in database
db_task = Task(**task.model_dump())
db.add(db_task)
db.commit()
db.refresh(db_task)

# Index for semantic search
vector_store.index_task(
task_id=db_task.id,
title=db_task.title,
description=db_task.description or ""
)

return TaskResponse.model_validate(db_task)

Output (creating a task):

curl -X POST "http://localhost:8000/tasks/" \
-H "Content-Type: application/json" \
-d '{"title": "Setup Kubernetes", "description": "Configure K8s cluster for production deployment"}'
{
"id": 4,
"title": "Setup Kubernetes",
"description": "Configure K8s cluster for production deployment",
"status": "pending"
}

Now searching "container orchestration" also finds this new task.

The power of this integration: semantic similarity AND structured constraints.

# Find deployment-related tasks that are still pending
curl -X POST "http://localhost:8000/tasks/search/semantic?query=deployment&status=pending"

# Find high-priority database tasks
curl -X POST "http://localhost:8000/tasks/search/semantic?query=database&priority=high"

Output (filtered search):

{
"query": "deployment",
"results": [
{
"task_id": 4,
"title": "Setup Kubernetes",
"relevance_score": 0.892,
"snippet": "Task: Setup Kubernetes\n\nConfigure K8s cluster..."
}
],
"count": 1
}

Tasks with status="completed" are excluded even if semantically relevant.

Common Mistakes

Mistake 1: Not indexing existing tasks

When you add semantic search to an existing API, previous tasks won't appear in results:

# Migration script to index existing tasks
async def backfill_vector_store():
"""Index all existing tasks into Qdrant."""
db = get_db()
vector_store = TaskVectorStore()

tasks = db.query(Task).all()
task_dicts = [
{
"id": t.id,
"title": t.title,
"description": t.description or "",
"status": t.status,
"priority": t.priority
}
for t in tasks
]

vector_store.index_tasks_batch(task_dicts)
print(f"Indexed {len(task_dicts)} existing tasks")

Mistake 2: Missing metadata for filters

If you don't include status/priority in metadata, filters silently return empty results:

# Wrong - no metadata
Document(page_content=content, metadata={"task_id": task_id})

# Correct - include filterable fields
Document(
page_content=content,
metadata={
"task_id": task_id,
"title": title,
"status": status, # Required for status filter
"priority": priority # Required for priority filter
}
)

Mistake 3: Synchronous calls blocking the event loop

Qdrant operations can be slow. For production, use async:

# For high-traffic APIs, consider background indexing
from fastapi import BackgroundTasks

@router.post("/")
async def create_task(
task: TaskCreate,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db)
):
db_task = Task(**task.model_dump())
db.add(db_task)
db.commit()
db.refresh(db_task)

# Index in background - doesn't block response
background_tasks.add_task(
index_task_background,
task_id=db_task.id,
title=db_task.title,
description=db_task.description
)

return TaskResponse.model_validate(db_task)

Security Note

Your semantic search endpoint accepts user input as the query parameter. Ensure you:

  • Rate limit the endpoint (embedding calls cost money)
  • Validate query length (prevent abuse with massive queries)
  • Log queries for monitoring (helps identify problematic patterns)
@router.post("/semantic")
async def semantic_search(
query: str = Query(..., min_length=3, max_length=500),
# ... rest of parameters
):
# Query is validated by FastAPI
pass

Try With AI

After completing the integration, explore these extensions.

Prompt 1: Adding Hybrid Search

My semantic search works well for conceptual queries like "deployment tasks" but
misses exact keyword matches. A user searching "JWT" expects tasks with "JWT"
in the title even if semantically different.

How do I add hybrid search that combines:
1. Vector similarity (semantic meaning)
2. BM25 keyword matching (exact terms)

Show me how to modify my TaskVectorStore to use Qdrant's hybrid search.

What you're learning: Pure semantic search can miss exact matches. Hybrid search combines both approaches for better recall without sacrificing precision.

Prompt 2: Re-Ranking Results

My semantic search returns 10 results but the most relevant one sometimes
appears at position 4 or 5. I want to use a re-ranking model to improve
result ordering.

Can I add a re-ranker that takes my initial Qdrant results and reorders
them using a cross-encoder model? Show me how to integrate this into my
FastAPI endpoint.

What you're learning: Initial retrieval is fast but approximate. Re-ranking is slower but more accurate. The two-stage pattern balances speed and quality.

Prompt 3: Handling Task Updates and Deletes

When a task is updated or deleted, my vector store gets out of sync with
the database. How do I keep them synchronized?

Specifically:
1. Update task description -> update vector embedding
2. Delete task -> remove from Qdrant collection

Show me the patterns for maintaining consistency.

What you're learning: Vector stores need lifecycle management just like databases. Keeping them synchronized is critical for accurate search results.


Reflect on Your Skill

You built a rag-deployment skill in Lesson 0. Test and improve it based on what you learned about API integration.

Test Your Skill

Using my rag-deployment skill, help me add semantic search to an existing
FastAPI application. The app has User and Project models. I want to search
projects by semantic meaning.

Identify Gaps

Ask yourself:

  • Did my skill include FastAPI integration patterns?
  • Did it handle the indexing lifecycle (create, update, delete)?
  • Did it show how to combine structured filters with vector search?
  • Did it address production concerns like rate limiting and background indexing?

Improve Your Skill

If you found gaps:

My rag-deployment skill is missing FastAPI integration patterns.
Update it to include:
1. Vector store service class with dependency injection
2. Semantic search endpoint with Pydantic models
3. Auto-indexing on CRUD operations
4. Combined filter + vector search patterns
5. Background task indexing for performance