Skip to main content

Data Cleaning and Validation

You've generated 500 synthetic examples. But not all of them are training-ready. Some are duplicates, some have format errors, and some are simply low quality.

This lesson builds a cleaning pipeline that transforms raw generated data into a polished training dataset. By the end, you'll have scripts that automate quality control.


The Cleaning Pipeline

The pipeline has three stages:

Raw Generated Data (500 examples)


┌──────────────┐
│ STAGE 1 │
│ Schema │──▶ Remove format violations
│ Validation │
└──────────────┘


┌──────────────┐
│ STAGE 2 │
│ Deduplica- │──▶ Remove near-duplicates
│ tion │
└──────────────┘


┌──────────────┐
│ STAGE 3 │
│ Quality │──▶ Filter low-quality examples
│ Scoring │
└──────────────┘


Cleaned Training Data (400-450 examples)

Each stage catches different problems:

  • Schema validation catches structural issues (wrong format, missing fields)
  • Deduplication catches repetition (same example multiple times)
  • Quality scoring catches semantic issues (correct format but poor content)

Stage 1: Schema Validation

First, validate that every example matches your expected format.

Pydantic Schema for ShareGPT

from pydantic import BaseModel, validator, ValidationError
from typing import List, Literal

class ShareGPTMessage(BaseModel):
"""A single message in a ShareGPT conversation."""
from_: Literal["human", "gpt"]
value: str

class Config:
fields = {"from_": "from"}
extra = "forbid" # Reject unexpected fields

@validator("value")
def value_not_empty(cls, v):
if not v.strip():
raise ValueError("Message cannot be empty")
return v.strip()


class ShareGPTConversation(BaseModel):
"""A complete ShareGPT conversation."""
conversations: List[ShareGPTMessage]

@validator("conversations")
def validate_structure(cls, v):
if len(v) < 2:
raise ValueError("Conversation must have at least 2 messages")

# First message must be from human
if v[0].from_ != "human":
raise ValueError("First message must be from human")

# Last message must be from gpt (what we're training)
if v[-1].from_ != "gpt":
raise ValueError("Last message must be from gpt")

# Messages must alternate
for i, msg in enumerate(v):
expected = "human" if i % 2 == 0 else "gpt"
if msg.from_ != expected:
raise ValueError(f"Message {i} should be from {expected}")

return v


def validate_example(raw_example: dict) -> tuple[ShareGPTConversation | None, str | None]:
"""Validate a single example. Returns (validated, error_message)."""
try:
validated = ShareGPTConversation(**raw_example)
return validated, None
except ValidationError as e:
return None, str(e)


def validate_dataset(examples: List[dict]) -> tuple[List[dict], List[tuple[dict, str]]]:
"""Validate entire dataset. Returns (valid_examples, invalid_with_reasons)."""
valid = []
invalid = []

for ex in examples:
validated, error = validate_example(ex)
if validated:
valid.append(validated.dict())
else:
invalid.append((ex, error))

return valid, invalid

Output when run on sample data:

examples = [
{"conversations": [
{"from": "human", "value": "Create task"},
{"from": "gpt", "value": "Created task: 'task'"}
]},
{"conversations": [
{"from": "gpt", "value": "Wrong order"}, # Error: human first
{"from": "human", "value": "Should be first"}
]},
{"conversations": [
{"from": "human", "value": "Missing response"}
# Error: no gpt response
]},
]

valid, invalid = validate_dataset(examples)
print(f"Valid: {len(valid)}, Invalid: {len(invalid)}")
for ex, error in invalid:
print(f" Error: {error[:80]}...")

Output:

Valid: 1, Invalid: 2
Error: 1 validation error for ShareGPTConversation
conversations
First message ...
Error: 1 validation error for ShareGPTConversation
conversations
Last message must be ...

Stage 2: Deduplication

Synthetic generation often produces near-duplicate examples. These waste training capacity and bias the model toward repeated patterns.

Exact Deduplication

First, remove exact duplicates:

import hashlib
import json

def exact_dedup(examples: List[dict]) -> List[dict]:
"""Remove exact duplicate examples based on content hash."""
seen_hashes = set()
unique = []

for ex in examples:
# Hash the entire example
content = json.dumps(ex, sort_keys=True)
content_hash = hashlib.md5(content.encode()).hexdigest()

if content_hash not in seen_hashes:
seen_hashes.add(content_hash)
unique.append(ex)

print(f"Exact dedup: {len(examples)} -> {len(unique)} ({len(examples) - len(unique)} duplicates removed)")
return unique

Output:

Exact dedup: 500 -> 497 (3 duplicates removed)

Semantic Deduplication

Near-duplicates are harder. Use embeddings to find similar examples:

from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

def semantic_dedup(
examples: List[dict],
similarity_threshold: float = 0.92,
model_name: str = "all-MiniLM-L6-v2"
) -> List[dict]:
"""Remove semantically similar examples using embeddings."""

# Load embedding model
model = SentenceTransformer(model_name)

# Extract text to embed (concatenate all messages)
texts = []
for ex in examples:
text = " ".join(msg["value"] for msg in ex["conversations"])
texts.append(text)

# Generate embeddings
print("Generating embeddings...")
embeddings = model.encode(texts, show_progress_bar=True)

# Find examples to keep (not too similar to any previous)
keep_indices = []
keep_embeddings = []

for i, emb in enumerate(embeddings):
if not keep_embeddings:
# First example always kept
keep_indices.append(i)
keep_embeddings.append(emb)
continue

# Check similarity against all kept examples
similarities = cosine_similarity([emb], keep_embeddings)[0]
max_sim = np.max(similarities)

if max_sim < similarity_threshold:
# Sufficiently different, keep it
keep_indices.append(i)
keep_embeddings.append(emb)

unique = [examples[i] for i in keep_indices]
removed = len(examples) - len(unique)
print(f"Semantic dedup: {len(examples)} -> {len(unique)} ({removed} near-duplicates removed)")

return unique

Output:

Generating embeddings...
Batches: 100%|██████████| 16/16 [00:02<00:00, 5.33batch/s]
Semantic dedup: 497 -> 463 (34 near-duplicates removed)

Stage 3: Quality Scoring with LLM-as-Judge

Format validation catches structural issues. Deduplication catches repetition. But neither catches semantically poor examples—responses that are technically correct but unhelpful or inconsistent.

LLM-as-Judge uses a language model to evaluate quality:

import openai
import json
from typing import List

def score_example(client: openai.OpenAI, example: dict) -> tuple[int, str]:
"""Score a single example from 1-5 with explanation."""

# Format example for evaluation
conversation_text = "\n".join(
f"{msg['from'].upper()}: {msg['value']}"
for msg in example["conversations"]
)

prompt = f"""Evaluate this training example for a Task API assistant.

CONVERSATION:
{conversation_text}

SCORING CRITERIA:
1. Format compliance: Does the assistant response follow the expected format?
2. Accuracy: Is the response correct given the request?
3. Helpfulness: Is the response useful and complete?
4. Naturalness: Does the conversation flow naturally?
5. Edge case handling: Does it handle the request appropriately?

Score from 1-5:
- 5: Excellent example, use for training
- 4: Good example, minor improvements possible
- 3: Acceptable, but has issues
- 2: Poor quality, should not use
- 1: Unusable, format violations or errors

Return JSON: {{"score": <1-5>, "reason": "<brief explanation>"}}"""

response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You evaluate training data quality. Return only valid JSON."},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"},
temperature=0.1, # Low temperature for consistent scoring
max_tokens=100
)

result = json.loads(response.choices[0].message.content)
return result["score"], result["reason"]


def filter_by_quality(
client: openai.OpenAI,
examples: List[dict],
min_score: int = 4,
sample_size: int | None = None
) -> tuple[List[dict], List[tuple[dict, int, str]]]:
"""Filter examples by LLM-as-Judge quality score."""

# Optionally sample for cost control
to_score = examples if sample_size is None else examples[:sample_size]

passed = []
failed = []

print(f"Scoring {len(to_score)} examples...")
for i, ex in enumerate(to_score):
score, reason = score_example(client, ex)

if score >= min_score:
passed.append(ex)
else:
failed.append((ex, score, reason))

if (i + 1) % 50 == 0:
print(f" Scored {i + 1}/{len(to_score)}...")

print(f"Quality filter: {len(to_score)} -> {len(passed)} (min_score={min_score})")
return passed, failed

Output:

Scoring 463 examples...
Scored 50/463...
Scored 100/463...
...
Quality filter: 463 -> 421 (min_score=4)

Cost Control for Quality Scoring

Scoring 500 examples with GPT-4o-mini costs ~$0.05. For tighter budgets:

# Option 1: Sample scoring
# Score a random 20%, extrapolate quality
sample = random.sample(examples, len(examples) // 5)
passed, failed = filter_by_quality(client, sample)
estimated_quality = len(passed) / len(sample)
print(f"Estimated dataset quality: {estimated_quality:.1%}")

# Option 2: Score only borderline examples
# Use embedding clustering to find unusual examples, score only those

Complete Cleaning Pipeline

Putting it all together:

#!/usr/bin/env python3
"""
Data cleaning pipeline for fine-tuning datasets.
Runs: Schema validation -> Deduplication -> Quality scoring
"""

import json
import os
from pathlib import Path
from openai import OpenAI

def clean_dataset(
input_file: str,
output_file: str,
quality_threshold: int = 4
) -> dict:
"""Run complete cleaning pipeline."""

# Load data
print(f"Loading {input_file}...")
with open(input_file) as f:
examples = [json.loads(line) for line in f]
print(f"Loaded {len(examples)} examples")

# Stage 1: Schema validation
print("\n--- Stage 1: Schema Validation ---")
valid, invalid = validate_dataset(examples)
print(f" Valid: {len(valid)}, Invalid: {len(invalid)}")

# Stage 2: Exact deduplication
print("\n--- Stage 2: Exact Deduplication ---")
after_exact = exact_dedup(valid)

# Stage 3: Semantic deduplication
print("\n--- Stage 3: Semantic Deduplication ---")
after_semantic = semantic_dedup(after_exact, similarity_threshold=0.92)

# Stage 4: Quality scoring (optional, costs money)
print("\n--- Stage 4: Quality Scoring ---")
if os.environ.get("OPENAI_API_KEY"):
client = OpenAI()
passed, failed = filter_by_quality(
client, after_semantic, min_score=quality_threshold
)
else:
print(" Skipping (no OPENAI_API_KEY)")
passed = after_semantic

# Save cleaned data
print(f"\nSaving {len(passed)} examples to {output_file}...")
with open(output_file, "w") as f:
for ex in passed:
f.write(json.dumps(ex) + "\n")

# Report
stats = {
"input": len(examples),
"after_validation": len(valid),
"after_exact_dedup": len(after_exact),
"after_semantic_dedup": len(after_semantic),
"after_quality_filter": len(passed),
"final_output": len(passed)
}

print("\n--- Pipeline Summary ---")
for stage, count in stats.items():
print(f" {stage}: {count}")

return stats


if __name__ == "__main__":
clean_dataset(
input_file="task_api_raw.jsonl",
output_file="task_api_clean.jsonl",
quality_threshold=4
)

Output:

Loading task_api_raw.jsonl...
Loaded 500 examples

--- Stage 1: Schema Validation ---
Valid: 492, Invalid: 8

--- Stage 2: Exact Deduplication ---
Exact dedup: 492 -> 489 (3 duplicates removed)

--- Stage 3: Semantic Deduplication ---
Generating embeddings...
Batches: 100%|██████████| 16/16 [00:02<00:00, 5.33batch/s]
Semantic dedup: 489 -> 456 (33 near-duplicates removed)

--- Stage 4: Quality Scoring ---
Scoring 456 examples...
Scored 50/456...
Scored 100/456...
...
Quality filter: 456 -> 421 (min_score=4)

Saving 421 examples to task_api_clean.jsonl...

--- Pipeline Summary ---
input: 500
after_validation: 492
after_exact_dedup: 489
after_semantic_dedup: 456
after_quality_filter: 421
final_output: 421

Train/Validation/Test Splits

After cleaning, split your data for proper evaluation:

import random

def split_dataset(
examples: List[dict],
train_ratio: float = 0.8,
val_ratio: float = 0.1,
test_ratio: float = 0.1,
seed: int = 42
) -> tuple[List[dict], List[dict], List[dict]]:
"""Split dataset into train/validation/test sets."""

assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 0.001

random.seed(seed)
shuffled = examples.copy()
random.shuffle(shuffled)

n = len(shuffled)
train_end = int(n * train_ratio)
val_end = train_end + int(n * val_ratio)

train = shuffled[:train_end]
val = shuffled[train_end:val_end]
test = shuffled[val_end:]

print(f"Split: train={len(train)}, val={len(val)}, test={len(test)}")
return train, val, test


# Save splits
train, val, test = split_dataset(cleaned_examples)

with open("train.jsonl", "w") as f:
for ex in train:
f.write(json.dumps(ex) + "\n")

with open("val.jsonl", "w") as f:
for ex in val:
f.write(json.dumps(ex) + "\n")

with open("test.jsonl", "w") as f:
for ex in test:
f.write(json.dumps(ex) + "\n")

Output:

Split: train=336, val=42, test=43

Debugging Common Issues

Issue: Too Many Validation Failures

Symptom: 20%+ examples fail schema validation

Diagnosis:

# Count error types
from collections import Counter
error_types = Counter()
for ex, error in invalid:
# Extract error type from message
error_type = error.split("\n")[0]
error_types[error_type] += 1

print(error_types.most_common())

Fix: Improve your generation prompt to match schema requirements.

Issue: High Duplicate Rate

Symptom: 15%+ examples removed by semantic dedup

Diagnosis:

# Check what's being removed
for i in removed_indices[:10]:
print(examples[i])

Fix: Increase diversity in generation (rotate seeds, vary temperature, add explicit diversity requirements).

Issue: Low Quality Scores

Symptom: 30%+ examples score below threshold

Diagnosis:

# Examine low-scoring examples
for ex, score, reason in failed[:10]:
print(f"Score: {score}, Reason: {reason}")
print(ex)

Fix: Improve seed examples, tighten format specifications, add more explicit output requirements.


Try With AI

Prompt 1: Build Custom Schema

I need to validate training examples for a [your domain] assistant.
The assistant should [describe behavior].

Help me build a Pydantic schema that:
1. Enforces the correct message structure
2. Validates response format (include your format spec)
3. Catches common errors in generated data

Include example valid and invalid cases with expected validation results.

What you're learning: Schema design for your specific use case. The Task API schema is a template—you'll adapt it.

Prompt 2: Tune Deduplication Threshold

I ran semantic deduplication on my 500-example dataset with threshold 0.92
and removed 45 examples. Some examples that were removed look different to me.

Here are 3 pairs that were marked as duplicates:
[paste example pairs]

Help me:
1. Understand why the model considers these similar
2. Decide if 0.92 is the right threshold for my use case
3. Suggest alternative approaches if I want to keep more examples

What you're learning: Threshold tuning based on your data. The right threshold depends on how much diversity you need versus how much repetition you can tolerate.

Prompt 3: Design Quality Rubric

I'm using LLM-as-Judge to score my Task API training examples.
Currently I'm using generic criteria: format, accuracy, helpfulness, naturalness.

Help me design a Task API-specific rubric that:
1. Has 5 clear criteria specific to task management
2. Includes explicit examples of each score (1-5)
3. Catches common issues in task assistant training data

Then show me how to update my scoring prompt to use this rubric.

What you're learning: Domain-specific quality evaluation. Generic rubrics miss domain-specific issues. Your rubric should catch problems specific to your task.

Safety Note

The cleaning pipeline removes examples—that's its job. But aggressive cleaning can remove good examples with unusual phrasing. Always review a sample of removed examples to ensure you're not systematically removing valuable diversity. Check that your filters catch problems, not just differences.