Skip to main content

Data Curation Workflow

Your LLMOps pipeline begins with data. This lesson builds the automated workflow that transforms domain knowledge into training-ready datasets.

The Data Curation Challenge

Fine-tuning requires high-quality, domain-specific training data. But most domains lack curated datasets:

ChallengeImpactSolution
No existing datasetCannot trainGenerate synthetic examples
Low quality dataModel learns bad patternsAutomated quality checks
Duplicate examplesWasted training computeDeduplication pipeline
Wrong formatTraining failsFormat validation
Insufficient quantityUnderfittingSynthetic augmentation

Your data pipeline solves all these challenges automatically.

Complete Data Pipeline

┌─────────────────────────────────────────────────────────────────┐
│ DATA CURATION PIPELINE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ SEED │──▶│ GENERATE │──▶│ CLEAN │──▶│ FORMAT │ │
│ │ EXAMPLES │ │ SYNTHETIC│ │ DATA │ │ DATA │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ 5-10 manual 100-200 LLM Deduplicate JSONL format │
│ examples generated Validate Train/Val split │
│ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ QUALITY VALIDATION │ │
│ │ ├── Format check (100% valid) │ │
│ │ ├── Deduplication (< 5% duplicates) │ │
│ │ └── Token distribution (mean < 2048) │ │
│ └────────────────────────────────────────────────────────┘ │
│ │
│ OUTPUT: train.jsonl + validation.jsonl + quality_report.json │
│ │
└─────────────────────────────────────────────────────────────────┘

Stage 1: Seed Examples

Create 5-10 high-quality examples that demonstrate your domain:

# seed_examples.py
SEED_EXAMPLES = [
{
"messages": [
{"role": "user", "content": "Add a task: Buy groceries"},
{"role": "assistant", "content": None,
"tool_calls": [{
"type": "function",
"function": {
"name": "add_task",
"arguments": '{"title": "Buy groceries", "priority": "medium"}'
}
}]}
]
},
{
"messages": [
{"role": "user", "content": "What tasks are due today?"},
{"role": "assistant", "content": None,
"tool_calls": [{
"type": "function",
"function": {
"name": "get_tasks",
"arguments": '{"due_date": "today"}'
}
}]}
]
},
{
"messages": [
{"role": "user", "content": "Mark the groceries task as done"},
{"role": "assistant", "content": None,
"tool_calls": [{
"type": "function",
"function": {
"name": "update_task",
"arguments": '{"task_id": "search:groceries", "status": "completed"}'
}
}]}
]
},
# Add 2-7 more examples covering different operations
]

Seed Example Principles:

PrincipleWhyExample
Cover all operationsModel learns full capabilityadd, get, update, delete
Vary input patternsModel generalizesFormal, casual, abbreviated
Include edge casesModel handles unusual inputsEmpty fields, special chars
Match production formatTraining = inferenceSame tool calling schema

Stage 2: Synthetic Generation

Use an LLM to generate diverse examples from your seeds:

# synthetic_generator.py
import json
from anthropic import Anthropic

def generate_synthetic_examples(seed_examples: list, count: int = 200) -> list:
"""Generate synthetic training examples from seed examples."""

client = Anthropic()
synthetic_examples = []

# Create prompt with seed examples as demonstrations
prompt = f"""You are generating training data for a task management AI.

Here are example interactions:

{json.dumps(seed_examples[:3], indent=2)}

Generate {count} NEW, DIVERSE examples following the same format.
Vary:
- User phrasing (formal, casual, abbreviated)
- Task types (work, personal, urgent, routine)
- Operations (add, update, delete, query, list)
- Edge cases (empty fields, special characters, long titles)

Output as JSON array. Each example must have "messages" with user and assistant turns.
The assistant should use tool_calls, not text responses.

IMPORTANT: Generate realistic, diverse examples. No duplicates."""

response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=8000,
messages=[{"role": "user", "content": prompt}]
)

# Parse response
try:
generated = json.loads(response.content[0].text)
synthetic_examples.extend(generated)
except json.JSONDecodeError:
# Handle partial JSON
pass

return synthetic_examples

Output:

[
{
"messages": [
{"role": "user", "content": "remind me to call mom tomorrow"},
{"role": "assistant", "content": null,
"tool_calls": [{"type": "function", "function": {
"name": "add_task",
"arguments": "{\"title\": \"Call mom\", \"due_date\": \"tomorrow\"}"
}}]}
]
},
{
"messages": [
{"role": "user", "content": "delete all completed tasks"},
{"role": "assistant", "content": null,
"tool_calls": [{"type": "function", "function": {
"name": "delete_tasks",
"arguments": "{\"filter\": {\"status\": \"completed\"}}"
}}]}
]
}
// ... 198 more examples
]

Cost Estimate:

Input tokens: ~2,000 (prompt)
Output tokens: ~50,000 (200 examples)
Claude Sonnet: $0.003/1K input + $0.015/1K output
Total: ~$0.76 per generation run

Stage 3: Data Cleaning

Clean and validate your combined dataset:

# data_cleaner.py
import json
import hashlib
from typing import List, Dict, Set

class DataCleaner:
def __init__(self):
self.seen_hashes: Set[str] = set()
self.format_errors: List[Dict] = []

def clean_dataset(self, examples: List[Dict]) -> List[Dict]:
"""Clean and deduplicate examples."""
cleaned = []

for i, example in enumerate(examples):
# Validate format
if not self._validate_format(example, i):
continue

# Check for duplicates
example_hash = self._compute_hash(example)
if example_hash in self.seen_hashes:
continue

self.seen_hashes.add(example_hash)
cleaned.append(example)

return cleaned

def _validate_format(self, example: Dict, index: int) -> bool:
"""Validate example follows required format."""
errors = []

# Check messages exist
if "messages" not in example:
errors.append("Missing 'messages' field")
else:
messages = example["messages"]

# Check minimum message count
if len(messages) < 2:
errors.append("Need at least 2 messages (user + assistant)")

# Check roles
if messages[0].get("role") != "user":
errors.append("First message must be from user")

if len(messages) > 1:
assistant = messages[1]
if assistant.get("role") != "assistant":
errors.append("Second message must be from assistant")

# Check tool_calls format
if "tool_calls" in assistant:
for tc in assistant["tool_calls"]:
if "function" not in tc:
errors.append("Tool call missing 'function'")

if errors:
self.format_errors.append({
"index": index,
"errors": errors,
"example": example
})
return False

return True

def _compute_hash(self, example: Dict) -> str:
"""Compute hash for deduplication."""
# Hash based on user message content (main semantic content)
user_content = example["messages"][0].get("content", "")
return hashlib.md5(user_content.encode()).hexdigest()

def get_quality_report(self, original_count: int, cleaned_count: int) -> Dict:
"""Generate quality report."""
return {
"original_count": original_count,
"cleaned_count": cleaned_count,
"removed_count": original_count - cleaned_count,
"duplicate_ratio": len(self.seen_hashes) / original_count if original_count > 0 else 0,
"format_error_count": len(self.format_errors),
"format_errors": self.format_errors[:10] # First 10 errors
}

Usage:

cleaner = DataCleaner()
all_examples = seed_examples + synthetic_examples

cleaned_examples = cleaner.clean_dataset(all_examples)
quality_report = cleaner.get_quality_report(len(all_examples), len(cleaned_examples))

print(f"Cleaned: {len(cleaned_examples)} examples")
print(f"Removed: {quality_report['removed_count']} examples")
print(f"Format errors: {quality_report['format_error_count']}")

Output:

Cleaned: 195 examples
Removed: 15 examples
Format errors: 3

Stage 4: Format for Training

Convert cleaned data to training format with train/validation split:

# data_formatter.py
import json
import random
from typing import List, Dict, Tuple

class DataFormatter:
def __init__(self, train_ratio: float = 0.8, seed: int = 42):
self.train_ratio = train_ratio
self.seed = seed

def format_and_split(self, examples: List[Dict]) -> Tuple[List[Dict], List[Dict]]:
"""Format examples and split into train/validation sets."""

# Shuffle with fixed seed for reproducibility
random.seed(self.seed)
shuffled = examples.copy()
random.shuffle(shuffled)

# Split
split_idx = int(len(shuffled) * self.train_ratio)
train_set = shuffled[:split_idx]
val_set = shuffled[split_idx:]

return train_set, val_set

def save_jsonl(self, examples: List[Dict], path: str):
"""Save examples as JSONL (one JSON per line)."""
with open(path, 'w') as f:
for example in examples:
f.write(json.dumps(example) + '\n')

def compute_token_stats(self, examples: List[Dict]) -> Dict:
"""Compute token statistics (approximate)."""
token_counts = []

for example in examples:
# Rough estimate: 4 chars per token
total_chars = 0
for msg in example.get("messages", []):
content = msg.get("content") or ""
total_chars += len(content)
if "tool_calls" in msg:
total_chars += len(json.dumps(msg["tool_calls"]))

token_counts.append(total_chars // 4)

return {
"min_tokens": min(token_counts) if token_counts else 0,
"max_tokens": max(token_counts) if token_counts else 0,
"mean_tokens": sum(token_counts) / len(token_counts) if token_counts else 0,
"total_tokens": sum(token_counts)
}

Usage:

formatter = DataFormatter(train_ratio=0.8)
train_set, val_set = formatter.format_and_split(cleaned_examples)

formatter.save_jsonl(train_set, "train.jsonl")
formatter.save_jsonl(val_set, "validation.jsonl")

token_stats = formatter.compute_token_stats(cleaned_examples)
print(f"Train examples: {len(train_set)}")
print(f"Validation examples: {len(val_set)}")
print(f"Mean tokens per example: {token_stats['mean_tokens']:.0f}")

Output:

Train examples: 156
Validation examples: 39
Mean tokens per example: 847

Complete Pipeline Script

Combine all stages into one executable pipeline:

# data_pipeline.py
import json
import argparse
from pathlib import Path

def run_data_pipeline(
seed_file: str,
output_dir: str,
synthetic_count: int = 200,
train_ratio: float = 0.8
) -> dict:
"""Run complete data curation pipeline."""

output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)

# Stage 1: Load seed examples
print("Stage 1: Loading seed examples...")
with open(seed_file) as f:
seed_examples = json.load(f)
print(f" Loaded {len(seed_examples)} seed examples")

# Stage 2: Generate synthetic examples
print(f"Stage 2: Generating {synthetic_count} synthetic examples...")
synthetic_examples = generate_synthetic_examples(seed_examples, synthetic_count)
print(f" Generated {len(synthetic_examples)} examples")

# Stage 3: Clean data
print("Stage 3: Cleaning data...")
cleaner = DataCleaner()
all_examples = seed_examples + synthetic_examples
cleaned_examples = cleaner.clean_dataset(all_examples)
quality_report = cleaner.get_quality_report(len(all_examples), len(cleaned_examples))
print(f" Cleaned: {len(cleaned_examples)} examples (removed {quality_report['removed_count']})")

# Stage 4: Format and split
print("Stage 4: Formatting and splitting...")
formatter = DataFormatter(train_ratio=train_ratio)
train_set, val_set = formatter.format_and_split(cleaned_examples)

# Save outputs
formatter.save_jsonl(train_set, output_path / "train.jsonl")
formatter.save_jsonl(val_set, output_path / "validation.jsonl")

# Compute final statistics
token_stats = formatter.compute_token_stats(cleaned_examples)

# Generate quality report
final_report = {
"pipeline_status": "success",
"seed_examples": len(seed_examples),
"synthetic_generated": len(synthetic_examples),
"after_cleaning": len(cleaned_examples),
"train_count": len(train_set),
"validation_count": len(val_set),
"quality": {
"duplicate_ratio": quality_report["duplicate_ratio"],
"format_errors": quality_report["format_error_count"],
},
"tokens": token_stats,
"quality_gate": {
"min_examples": len(cleaned_examples) >= 200,
"max_duplicates": quality_report["duplicate_ratio"] < 0.05,
"format_valid": quality_report["format_error_count"] == 0,
"passed": (len(cleaned_examples) >= 200 and
quality_report["duplicate_ratio"] < 0.05)
}
}

with open(output_path / "data_quality_report.json", "w") as f:
json.dump(final_report, f, indent=2)

print("\nPipeline complete!")
print(f" Train: {output_path / 'train.jsonl'}")
print(f" Validation: {output_path / 'validation.jsonl'}")
print(f" Quality gate: {'PASSED' if final_report['quality_gate']['passed'] else 'FAILED'}")

return final_report

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Data Curation Pipeline")
parser.add_argument("--seed-file", required=True, help="Path to seed examples JSON")
parser.add_argument("--output-dir", default="./data", help="Output directory")
parser.add_argument("--synthetic-count", type=int, default=200)
args = parser.parse_args()

report = run_data_pipeline(
args.seed_file,
args.output_dir,
args.synthetic_count
)

Run:

python data_pipeline.py --seed-file seeds.json --output-dir ./training_data

Output:

Stage 1: Loading seed examples...
Loaded 8 seed examples
Stage 2: Generating 200 synthetic examples...
Generated 200 examples
Stage 3: Cleaning data...
Cleaned: 195 examples (removed 13)
Stage 4: Formatting and splitting...

Pipeline complete!
Train: training_data/train.jsonl
Validation: training_data/validation.jsonl
Quality gate: PASSED

Quality Report Analysis

Your pipeline produces a quality report:

{
"pipeline_status": "success",
"seed_examples": 8,
"synthetic_generated": 200,
"after_cleaning": 195,
"train_count": 156,
"validation_count": 39,
"quality": {
"duplicate_ratio": 0.024,
"format_errors": 0
},
"tokens": {
"min_tokens": 120,
"max_tokens": 1847,
"mean_tokens": 847,
"total_tokens": 165165
},
"quality_gate": {
"min_examples": true,
"max_duplicates": true,
"format_valid": true,
"passed": true
}
}

Interpreting the Report:

MetricValueStatusAction if Failed
Example count195Pass (>= 200)Generate more synthetic
Duplicate ratio2.4%Pass (< 5%)Improve diversity prompt
Format errors0Pass (= 0)Fix generation prompt
Mean tokens847OK (< 2048)Check for truncation risk

What You Built

Your data curation pipeline:

ComponentPurposeOutput
Seed ExamplesDefine domain patterns5-10 high-quality examples
Synthetic GeneratorScale dataset200+ diverse examples
Data CleanerRemove problemsDeduplicated, validated data
FormatterPrepare for trainingtrain.jsonl + validation.jsonl
Quality ReportVerify readinessJSON with pass/fail status

Total pipeline cost: ~$0.80 (Claude API for synthetic generation)

Try With AI

Prompt 1: Design Your Seed Examples

I'm creating training data for a fine-tuned model that will [describe your use case].
The model needs to handle these operations: [list operations].

Help me create 5-8 seed examples. For each example, show:
1. A realistic user input (vary phrasing: formal, casual, abbreviated)
2. The expected tool call response in proper JSON format

Make sure I cover all operations and include at least one edge case.

What you're learning: Seed example design—creating the foundation examples that define your domain.

Prompt 2: Debug Data Quality Issues

My data pipeline quality report shows:
- 200 examples generated
- 15 format errors
- 8% duplicate ratio (above 5% threshold)

Here's a sample format error:
[paste error example]

Help me diagnose:
1. What's causing the format errors?
2. How can I improve my generation prompt to reduce duplicates?
3. Should I adjust my cleaning thresholds?

What you're learning: Data quality debugging—identifying and fixing issues in automated pipelines.

Prompt 3: Expand Your Dataset

My current dataset has 200 examples covering basic task operations.
I want to expand it with:
- More complex multi-step operations
- Error handling scenarios
- Edge cases (empty inputs, special characters)

Generate 20 new examples for each category. Make them realistic
and diverse, following this format:
[paste your example format]

What you're learning: Dataset expansion—systematically growing your training data with targeted examples.

Safety Note

Synthetic data generation can produce biased or problematic examples. Always review a sample of generated data manually before training. Include diverse edge cases to prevent the model from learning narrow patterns that fail in production.