Data Curation Workflow
Your LLMOps pipeline begins with data. This lesson builds the automated workflow that transforms domain knowledge into training-ready datasets.
The Data Curation Challenge
Fine-tuning requires high-quality, domain-specific training data. But most domains lack curated datasets:
| Challenge | Impact | Solution |
|---|---|---|
| No existing dataset | Cannot train | Generate synthetic examples |
| Low quality data | Model learns bad patterns | Automated quality checks |
| Duplicate examples | Wasted training compute | Deduplication pipeline |
| Wrong format | Training fails | Format validation |
| Insufficient quantity | Underfitting | Synthetic augmentation |
Your data pipeline solves all these challenges automatically.
Complete Data Pipeline
┌─────────────────────────────────────────────────────────────────┐
│ DATA CURATION PIPELINE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ SEED │──▶│ GENERATE │──▶│ CLEAN │──▶│ FORMAT │ │
│ │ EXAMPLES │ │ SYNTHETIC│ │ DATA │ │ DATA │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ 5-10 manual 100-200 LLM Deduplicate JSONL format │
│ examples generated Validate Train/Val split │
│ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ QUALITY VALIDATION │ │
│ │ ├── Format check (100% valid) │ │
│ │ ├── Deduplication (< 5% duplicates) │ │
│ │ └── Token distribution (mean < 2048) │ │
│ └────────────────────────────────────────────────────────┘ │
│ │
│ OUTPUT: train.jsonl + validation.jsonl + quality_report.json │
│ │
└─────────────────────────────────────────────────────────────────┘
Stage 1: Seed Examples
Create 5-10 high-quality examples that demonstrate your domain:
# seed_examples.py
SEED_EXAMPLES = [
{
"messages": [
{"role": "user", "content": "Add a task: Buy groceries"},
{"role": "assistant", "content": None,
"tool_calls": [{
"type": "function",
"function": {
"name": "add_task",
"arguments": '{"title": "Buy groceries", "priority": "medium"}'
}
}]}
]
},
{
"messages": [
{"role": "user", "content": "What tasks are due today?"},
{"role": "assistant", "content": None,
"tool_calls": [{
"type": "function",
"function": {
"name": "get_tasks",
"arguments": '{"due_date": "today"}'
}
}]}
]
},
{
"messages": [
{"role": "user", "content": "Mark the groceries task as done"},
{"role": "assistant", "content": None,
"tool_calls": [{
"type": "function",
"function": {
"name": "update_task",
"arguments": '{"task_id": "search:groceries", "status": "completed"}'
}
}]}
]
},
# Add 2-7 more examples covering different operations
]
Seed Example Principles:
| Principle | Why | Example |
|---|---|---|
| Cover all operations | Model learns full capability | add, get, update, delete |
| Vary input patterns | Model generalizes | Formal, casual, abbreviated |
| Include edge cases | Model handles unusual inputs | Empty fields, special chars |
| Match production format | Training = inference | Same tool calling schema |
Stage 2: Synthetic Generation
Use an LLM to generate diverse examples from your seeds:
# synthetic_generator.py
import json
from anthropic import Anthropic
def generate_synthetic_examples(seed_examples: list, count: int = 200) -> list:
"""Generate synthetic training examples from seed examples."""
client = Anthropic()
synthetic_examples = []
# Create prompt with seed examples as demonstrations
prompt = f"""You are generating training data for a task management AI.
Here are example interactions:
{json.dumps(seed_examples[:3], indent=2)}
Generate {count} NEW, DIVERSE examples following the same format.
Vary:
- User phrasing (formal, casual, abbreviated)
- Task types (work, personal, urgent, routine)
- Operations (add, update, delete, query, list)
- Edge cases (empty fields, special characters, long titles)
Output as JSON array. Each example must have "messages" with user and assistant turns.
The assistant should use tool_calls, not text responses.
IMPORTANT: Generate realistic, diverse examples. No duplicates."""
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=8000,
messages=[{"role": "user", "content": prompt}]
)
# Parse response
try:
generated = json.loads(response.content[0].text)
synthetic_examples.extend(generated)
except json.JSONDecodeError:
# Handle partial JSON
pass
return synthetic_examples
Output:
[
{
"messages": [
{"role": "user", "content": "remind me to call mom tomorrow"},
{"role": "assistant", "content": null,
"tool_calls": [{"type": "function", "function": {
"name": "add_task",
"arguments": "{\"title\": \"Call mom\", \"due_date\": \"tomorrow\"}"
}}]}
]
},
{
"messages": [
{"role": "user", "content": "delete all completed tasks"},
{"role": "assistant", "content": null,
"tool_calls": [{"type": "function", "function": {
"name": "delete_tasks",
"arguments": "{\"filter\": {\"status\": \"completed\"}}"
}}]}
]
}
// ... 198 more examples
]
Cost Estimate:
Input tokens: ~2,000 (prompt)
Output tokens: ~50,000 (200 examples)
Claude Sonnet: $0.003/1K input + $0.015/1K output
Total: ~$0.76 per generation run
Stage 3: Data Cleaning
Clean and validate your combined dataset:
# data_cleaner.py
import json
import hashlib
from typing import List, Dict, Set
class DataCleaner:
def __init__(self):
self.seen_hashes: Set[str] = set()
self.format_errors: List[Dict] = []
def clean_dataset(self, examples: List[Dict]) -> List[Dict]:
"""Clean and deduplicate examples."""
cleaned = []
for i, example in enumerate(examples):
# Validate format
if not self._validate_format(example, i):
continue
# Check for duplicates
example_hash = self._compute_hash(example)
if example_hash in self.seen_hashes:
continue
self.seen_hashes.add(example_hash)
cleaned.append(example)
return cleaned
def _validate_format(self, example: Dict, index: int) -> bool:
"""Validate example follows required format."""
errors = []
# Check messages exist
if "messages" not in example:
errors.append("Missing 'messages' field")
else:
messages = example["messages"]
# Check minimum message count
if len(messages) < 2:
errors.append("Need at least 2 messages (user + assistant)")
# Check roles
if messages[0].get("role") != "user":
errors.append("First message must be from user")
if len(messages) > 1:
assistant = messages[1]
if assistant.get("role") != "assistant":
errors.append("Second message must be from assistant")
# Check tool_calls format
if "tool_calls" in assistant:
for tc in assistant["tool_calls"]:
if "function" not in tc:
errors.append("Tool call missing 'function'")
if errors:
self.format_errors.append({
"index": index,
"errors": errors,
"example": example
})
return False
return True
def _compute_hash(self, example: Dict) -> str:
"""Compute hash for deduplication."""
# Hash based on user message content (main semantic content)
user_content = example["messages"][0].get("content", "")
return hashlib.md5(user_content.encode()).hexdigest()
def get_quality_report(self, original_count: int, cleaned_count: int) -> Dict:
"""Generate quality report."""
return {
"original_count": original_count,
"cleaned_count": cleaned_count,
"removed_count": original_count - cleaned_count,
"duplicate_ratio": len(self.seen_hashes) / original_count if original_count > 0 else 0,
"format_error_count": len(self.format_errors),
"format_errors": self.format_errors[:10] # First 10 errors
}
Usage:
cleaner = DataCleaner()
all_examples = seed_examples + synthetic_examples
cleaned_examples = cleaner.clean_dataset(all_examples)
quality_report = cleaner.get_quality_report(len(all_examples), len(cleaned_examples))
print(f"Cleaned: {len(cleaned_examples)} examples")
print(f"Removed: {quality_report['removed_count']} examples")
print(f"Format errors: {quality_report['format_error_count']}")
Output:
Cleaned: 195 examples
Removed: 15 examples
Format errors: 3
Stage 4: Format for Training
Convert cleaned data to training format with train/validation split:
# data_formatter.py
import json
import random
from typing import List, Dict, Tuple
class DataFormatter:
def __init__(self, train_ratio: float = 0.8, seed: int = 42):
self.train_ratio = train_ratio
self.seed = seed
def format_and_split(self, examples: List[Dict]) -> Tuple[List[Dict], List[Dict]]:
"""Format examples and split into train/validation sets."""
# Shuffle with fixed seed for reproducibility
random.seed(self.seed)
shuffled = examples.copy()
random.shuffle(shuffled)
# Split
split_idx = int(len(shuffled) * self.train_ratio)
train_set = shuffled[:split_idx]
val_set = shuffled[split_idx:]
return train_set, val_set
def save_jsonl(self, examples: List[Dict], path: str):
"""Save examples as JSONL (one JSON per line)."""
with open(path, 'w') as f:
for example in examples:
f.write(json.dumps(example) + '\n')
def compute_token_stats(self, examples: List[Dict]) -> Dict:
"""Compute token statistics (approximate)."""
token_counts = []
for example in examples:
# Rough estimate: 4 chars per token
total_chars = 0
for msg in example.get("messages", []):
content = msg.get("content") or ""
total_chars += len(content)
if "tool_calls" in msg:
total_chars += len(json.dumps(msg["tool_calls"]))
token_counts.append(total_chars // 4)
return {
"min_tokens": min(token_counts) if token_counts else 0,
"max_tokens": max(token_counts) if token_counts else 0,
"mean_tokens": sum(token_counts) / len(token_counts) if token_counts else 0,
"total_tokens": sum(token_counts)
}
Usage:
formatter = DataFormatter(train_ratio=0.8)
train_set, val_set = formatter.format_and_split(cleaned_examples)
formatter.save_jsonl(train_set, "train.jsonl")
formatter.save_jsonl(val_set, "validation.jsonl")
token_stats = formatter.compute_token_stats(cleaned_examples)
print(f"Train examples: {len(train_set)}")
print(f"Validation examples: {len(val_set)}")
print(f"Mean tokens per example: {token_stats['mean_tokens']:.0f}")
Output:
Train examples: 156
Validation examples: 39
Mean tokens per example: 847
Complete Pipeline Script
Combine all stages into one executable pipeline:
# data_pipeline.py
import json
import argparse
from pathlib import Path
def run_data_pipeline(
seed_file: str,
output_dir: str,
synthetic_count: int = 200,
train_ratio: float = 0.8
) -> dict:
"""Run complete data curation pipeline."""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# Stage 1: Load seed examples
print("Stage 1: Loading seed examples...")
with open(seed_file) as f:
seed_examples = json.load(f)
print(f" Loaded {len(seed_examples)} seed examples")
# Stage 2: Generate synthetic examples
print(f"Stage 2: Generating {synthetic_count} synthetic examples...")
synthetic_examples = generate_synthetic_examples(seed_examples, synthetic_count)
print(f" Generated {len(synthetic_examples)} examples")
# Stage 3: Clean data
print("Stage 3: Cleaning data...")
cleaner = DataCleaner()
all_examples = seed_examples + synthetic_examples
cleaned_examples = cleaner.clean_dataset(all_examples)
quality_report = cleaner.get_quality_report(len(all_examples), len(cleaned_examples))
print(f" Cleaned: {len(cleaned_examples)} examples (removed {quality_report['removed_count']})")
# Stage 4: Format and split
print("Stage 4: Formatting and splitting...")
formatter = DataFormatter(train_ratio=train_ratio)
train_set, val_set = formatter.format_and_split(cleaned_examples)
# Save outputs
formatter.save_jsonl(train_set, output_path / "train.jsonl")
formatter.save_jsonl(val_set, output_path / "validation.jsonl")
# Compute final statistics
token_stats = formatter.compute_token_stats(cleaned_examples)
# Generate quality report
final_report = {
"pipeline_status": "success",
"seed_examples": len(seed_examples),
"synthetic_generated": len(synthetic_examples),
"after_cleaning": len(cleaned_examples),
"train_count": len(train_set),
"validation_count": len(val_set),
"quality": {
"duplicate_ratio": quality_report["duplicate_ratio"],
"format_errors": quality_report["format_error_count"],
},
"tokens": token_stats,
"quality_gate": {
"min_examples": len(cleaned_examples) >= 200,
"max_duplicates": quality_report["duplicate_ratio"] < 0.05,
"format_valid": quality_report["format_error_count"] == 0,
"passed": (len(cleaned_examples) >= 200 and
quality_report["duplicate_ratio"] < 0.05)
}
}
with open(output_path / "data_quality_report.json", "w") as f:
json.dump(final_report, f, indent=2)
print("\nPipeline complete!")
print(f" Train: {output_path / 'train.jsonl'}")
print(f" Validation: {output_path / 'validation.jsonl'}")
print(f" Quality gate: {'PASSED' if final_report['quality_gate']['passed'] else 'FAILED'}")
return final_report
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Data Curation Pipeline")
parser.add_argument("--seed-file", required=True, help="Path to seed examples JSON")
parser.add_argument("--output-dir", default="./data", help="Output directory")
parser.add_argument("--synthetic-count", type=int, default=200)
args = parser.parse_args()
report = run_data_pipeline(
args.seed_file,
args.output_dir,
args.synthetic_count
)
Run:
python data_pipeline.py --seed-file seeds.json --output-dir ./training_data
Output:
Stage 1: Loading seed examples...
Loaded 8 seed examples
Stage 2: Generating 200 synthetic examples...
Generated 200 examples
Stage 3: Cleaning data...
Cleaned: 195 examples (removed 13)
Stage 4: Formatting and splitting...
Pipeline complete!
Train: training_data/train.jsonl
Validation: training_data/validation.jsonl
Quality gate: PASSED
Quality Report Analysis
Your pipeline produces a quality report:
{
"pipeline_status": "success",
"seed_examples": 8,
"synthetic_generated": 200,
"after_cleaning": 195,
"train_count": 156,
"validation_count": 39,
"quality": {
"duplicate_ratio": 0.024,
"format_errors": 0
},
"tokens": {
"min_tokens": 120,
"max_tokens": 1847,
"mean_tokens": 847,
"total_tokens": 165165
},
"quality_gate": {
"min_examples": true,
"max_duplicates": true,
"format_valid": true,
"passed": true
}
}
Interpreting the Report:
| Metric | Value | Status | Action if Failed |
|---|---|---|---|
| Example count | 195 | Pass (>= 200) | Generate more synthetic |
| Duplicate ratio | 2.4% | Pass (< 5%) | Improve diversity prompt |
| Format errors | 0 | Pass (= 0) | Fix generation prompt |
| Mean tokens | 847 | OK (< 2048) | Check for truncation risk |
What You Built
Your data curation pipeline:
| Component | Purpose | Output |
|---|---|---|
| Seed Examples | Define domain patterns | 5-10 high-quality examples |
| Synthetic Generator | Scale dataset | 200+ diverse examples |
| Data Cleaner | Remove problems | Deduplicated, validated data |
| Formatter | Prepare for training | train.jsonl + validation.jsonl |
| Quality Report | Verify readiness | JSON with pass/fail status |
Total pipeline cost: ~$0.80 (Claude API for synthetic generation)
Try With AI
Prompt 1: Design Your Seed Examples
I'm creating training data for a fine-tuned model that will [describe your use case].
The model needs to handle these operations: [list operations].
Help me create 5-8 seed examples. For each example, show:
1. A realistic user input (vary phrasing: formal, casual, abbreviated)
2. The expected tool call response in proper JSON format
Make sure I cover all operations and include at least one edge case.
What you're learning: Seed example design—creating the foundation examples that define your domain.
Prompt 2: Debug Data Quality Issues
My data pipeline quality report shows:
- 200 examples generated
- 15 format errors
- 8% duplicate ratio (above 5% threshold)
Here's a sample format error:
[paste error example]
Help me diagnose:
1. What's causing the format errors?
2. How can I improve my generation prompt to reduce duplicates?
3. Should I adjust my cleaning thresholds?
What you're learning: Data quality debugging—identifying and fixing issues in automated pipelines.
Prompt 3: Expand Your Dataset
My current dataset has 200 examples covering basic task operations.
I want to expand it with:
- More complex multi-step operations
- Error handling scenarios
- Edge cases (empty inputs, special characters)
Generate 20 new examples for each category. Make them realistic
and diverse, following this format:
[paste your example format]
What you're learning: Dataset expansion—systematically growing your training data with targeted examples.
Safety Note
Synthetic data generation can produce biased or problematic examples. Always review a sample of generated data manually before training. Include diverse edge cases to prevent the model from learning narrow patterns that fail in production.