Capstone: Evaluation Pipeline
You have learned to measure accuracy, implement LLM-as-Judge, design benchmarks, detect regressions, and configure quality gates. Now you will combine everything into a production-ready evaluation pipeline that automatically validates fine-tuned models before deployment.
This capstone project brings together all concepts from Chapter 69. By the end, you will have a system that can evaluate any Task API model and produce a clear deploy/block decision.
Pipeline Architecture
The complete evaluation pipeline orchestrates multiple evaluation stages:
┌─────────────────────────────────────────────────────────────────────────┐
│ EVALUATION PIPELINE │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Model │───▶│ Stage 1: │───▶│ Stage 2: │ │
│ │ Input │ │ Automated │ │ LLM-Judge │ │
│ └─────────────┘ │ Metrics │ │ Sampling │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Stage 3: │ │ Stage 4: │ │
│ │ Regression │◀───│ Aggregate │ │
│ │ Comparison │ │ Results │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Stage 5: │───▶│ Output: │ │
│ │Quality Gates │ │ DEPLOY or │ │
│ │ │ │ BLOCK │ │
│ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Specification First
Before implementation, define the pipeline specification:
# Evaluation Pipeline Specification
## Intent
Automated evaluation system for Task API fine-tuned models that produces
reliable deploy/block decisions with evidence.
## Inputs
- `model_path`: Path to fine-tuned model (HuggingFace format)
- `baseline_path`: Path to baseline results JSON
- `benchmark_path`: Path to evaluation benchmark dataset
- `config_path`: Path to quality gate configuration
## Outputs
- Deploy decision (PASS/FAIL)
- Detailed evaluation report (JSON + human-readable)
- Updated baseline (if passed)
## Success Criteria
- [ ] Automated metrics execute in < 5 minutes
- [ ] LLM-as-Judge samples 10% of outputs
- [ ] Regression comparison uses statistical significance
- [ ] Quality gates enforce blocking/warning thresholds
- [ ] Pipeline exits with code 0 (pass) or 1 (fail)
- [ ] Report includes actionable failure explanations
## Constraints
- Must run on Colab Free Tier (T4, 15GB VRAM)
- LLM-Judge budget: $2 per evaluation run
- Total runtime target: < 15 minutes
Implementation
Core Pipeline Class
import json
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
from enum import Enum
class PipelineStatus(Enum):
PENDING = "pending"
RUNNING = "running"
PASSED = "passed"
FAILED = "failed"
ERROR = "error"
@dataclass
class StageResult:
name: str
status: PipelineStatus
duration_seconds: float
metrics: dict = field(default_factory=dict)
errors: list = field(default_factory=list)
@dataclass
class PipelineResult:
overall_status: PipelineStatus
deploy_allowed: bool
stages: list[StageResult]
quality_gate_result: dict
total_duration_seconds: float
report_path: str
class EvaluationPipeline:
"""Complete evaluation pipeline for Task API models."""
def __init__(self, config: dict):
self.config = config
self.stages = []
def run(self, model_path: str, benchmark_path: str,
baseline_path: Optional[str] = None) -> PipelineResult:
"""Execute complete evaluation pipeline."""
start_time = time.time()
stage_results = []
try:
# Stage 1: Automated Metrics
stage1 = self._run_automated_metrics(model_path, benchmark_path)
stage_results.append(stage1)
if stage1.status == PipelineStatus.ERROR:
return self._create_error_result(stage_results, start_time)
# Stage 2: LLM-as-Judge (sampled)
stage2 = self._run_llm_judge(model_path, stage1.metrics)
stage_results.append(stage2)
# Stage 3: Regression Comparison (if baseline exists)
if baseline_path and Path(baseline_path).exists():
stage3 = self._run_regression_comparison(
stage1.metrics,
baseline_path
)
stage_results.append(stage3)
else:
stage_results.append(StageResult(
name="regression_comparison",
status=PipelineStatus.PASSED,
duration_seconds=0,
metrics={"skipped": True, "reason": "No baseline provided"}
))
# Stage 4: Aggregate Results
stage4 = self._aggregate_results(stage_results)
stage_results.append(stage4)
# Stage 5: Quality Gates
stage5 = self._evaluate_quality_gates(stage4.metrics)
stage_results.append(stage5)
# Generate final result
total_duration = time.time() - start_time
report_path = self._generate_report(stage_results, total_duration)
return PipelineResult(
overall_status=stage5.status,
deploy_allowed=stage5.metrics.get('deploy_allowed', False),
stages=stage_results,
quality_gate_result=stage5.metrics,
total_duration_seconds=total_duration,
report_path=report_path
)
except Exception as e:
return self._create_error_result(
stage_results, start_time, str(e)
)
def _run_automated_metrics(self, model_path: str,
benchmark_path: str) -> StageResult:
"""Stage 1: Run automated evaluation metrics."""
start = time.time()
try:
# Load model
model = self._load_model(model_path)
# Load benchmark
benchmark = json.loads(Path(benchmark_path).read_text())
# Run evaluation
results = {
'accuracy': 0,
'json_valid_rate': 0,
'schema_valid_rate': 0,
'action_accuracy': 0,
'outputs': []
}
correct = 0
json_valid = 0
schema_valid = 0
action_correct = 0
for test_case in benchmark:
output = model.generate(test_case['input'])
results['outputs'].append({
'input': test_case['input'],
'output': output,
'expected': test_case.get('expected_output')
})
# JSON validity
try:
parsed = json.loads(output)
json_valid += 1
# Schema validation
if self._validate_schema(parsed):
schema_valid += 1
# Action accuracy
if parsed.get('action') == test_case.get('expected_action'):
action_correct += 1
# Overall correctness
if output.strip() == test_case.get('expected_output', '').strip():
correct += 1
except json.JSONDecodeError:
pass
total = len(benchmark)
results['accuracy'] = correct / total
results['json_valid_rate'] = json_valid / total
results['schema_valid_rate'] = schema_valid / total
results['action_accuracy'] = action_correct / total
results['total_examples'] = total
return StageResult(
name="automated_metrics",
status=PipelineStatus.PASSED,
duration_seconds=time.time() - start,
metrics=results
)
except Exception as e:
return StageResult(
name="automated_metrics",
status=PipelineStatus.ERROR,
duration_seconds=time.time() - start,
errors=[str(e)]
)
def _run_llm_judge(self, model_path: str,
automated_results: dict) -> StageResult:
"""Stage 2: Run LLM-as-Judge on sampled outputs."""
start = time.time()
try:
outputs = automated_results.get('outputs', [])
sample_rate = self.config.get('llm_judge_sample_rate', 0.10)
sample_size = max(10, int(len(outputs) * sample_rate))
# Sample outputs for judging
import random
sample = random.sample(outputs, min(sample_size, len(outputs)))
judge_results = []
for item in sample:
score = self._judge_single_output(
item['input'],
item['output'],
item.get('expected')
)
judge_results.append(score)
# Aggregate judge scores
avg_scores = {}
criteria = ['accuracy', 'helpfulness', 'format_quality']
for criterion in criteria:
scores = [r['scores'].get(criterion, 0) for r in judge_results]
avg_scores[criterion] = sum(scores) / len(scores) if scores else 0
avg_scores['overall'] = sum(avg_scores.values()) / len(avg_scores)
return StageResult(
name="llm_judge",
status=PipelineStatus.PASSED,
duration_seconds=time.time() - start,
metrics={
'sample_size': len(sample),
'scores': avg_scores,
'detailed_results': judge_results
}
)
except Exception as e:
return StageResult(
name="llm_judge",
status=PipelineStatus.ERROR,
duration_seconds=time.time() - start,
errors=[str(e)]
)
def _run_regression_comparison(self, current_metrics: dict,
baseline_path: str) -> StageResult:
"""Stage 3: Compare with baseline for regression."""
start = time.time()
try:
baseline = json.loads(Path(baseline_path).read_text())
# Statistical comparison
comparison = {
'accuracy': {
'baseline': baseline.get('accuracy', 0),
'current': current_metrics.get('accuracy', 0),
'difference': current_metrics.get('accuracy', 0) - baseline.get('accuracy', 0)
},
'json_valid_rate': {
'baseline': baseline.get('json_valid_rate', 0),
'current': current_metrics.get('json_valid_rate', 0),
'difference': current_metrics.get('json_valid_rate', 0) - baseline.get('json_valid_rate', 0)
}
}
# Check for significant regression
regression_threshold = self.config.get('regression_threshold', -0.05)
has_regression = any(
v['difference'] < regression_threshold
for v in comparison.values()
)
return StageResult(
name="regression_comparison",
status=PipelineStatus.FAILED if has_regression else PipelineStatus.PASSED,
duration_seconds=time.time() - start,
metrics={
'comparison': comparison,
'has_regression': has_regression,
'threshold': regression_threshold
}
)
except Exception as e:
return StageResult(
name="regression_comparison",
status=PipelineStatus.ERROR,
duration_seconds=time.time() - start,
errors=[str(e)]
)
def _aggregate_results(self, stage_results: list[StageResult]) -> StageResult:
"""Stage 4: Aggregate all metrics."""
start = time.time()
aggregated = {}
# Collect from automated metrics
automated = next((s for s in stage_results if s.name == "automated_metrics"), None)
if automated:
aggregated['accuracy'] = automated.metrics.get('accuracy', 0)
aggregated['json_valid_rate'] = automated.metrics.get('json_valid_rate', 0)
aggregated['schema_valid_rate'] = automated.metrics.get('schema_valid_rate', 0)
aggregated['action_accuracy'] = automated.metrics.get('action_accuracy', 0)
# Collect from LLM judge
llm_judge = next((s for s in stage_results if s.name == "llm_judge"), None)
if llm_judge:
aggregated['llm_judge_overall'] = llm_judge.metrics.get('scores', {}).get('overall', 0)
# Collect from regression
regression = next((s for s in stage_results if s.name == "regression_comparison"), None)
if regression and not regression.metrics.get('skipped'):
comparison = regression.metrics.get('comparison', {})
aggregated['accuracy_vs_baseline'] = comparison.get('accuracy', {}).get('difference', 0)
# Placeholder for safety (would run red-team in production)
aggregated['harmful_rate'] = 0.02 # Example value
return StageResult(
name="aggregation",
status=PipelineStatus.PASSED,
duration_seconds=time.time() - start,
metrics=aggregated
)
def _evaluate_quality_gates(self, aggregated_metrics: dict) -> StageResult:
"""Stage 5: Evaluate quality gates."""
start = time.time()
gates = self.config.get('quality_gates', [])
evaluator = QualityGateEvaluator(gates)
result = evaluator.evaluate(aggregated_metrics)
return StageResult(
name="quality_gates",
status=PipelineStatus.PASSED if result['deploy_allowed'] else PipelineStatus.FAILED,
duration_seconds=time.time() - start,
metrics=result
)
def _generate_report(self, stages: list[StageResult],
duration: float) -> str:
"""Generate evaluation report."""
report = {
'timestamp': datetime.now().isoformat(),
'total_duration_seconds': duration,
'stages': [
{
'name': s.name,
'status': s.status.value,
'duration': s.duration_seconds,
'metrics': s.metrics,
'errors': s.errors
}
for s in stages
]
}
report_path = f"reports/evaluation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
Path(report_path).parent.mkdir(exist_ok=True)
Path(report_path).write_text(json.dumps(report, indent=2))
return report_path
def _load_model(self, model_path: str):
"""Load model for evaluation."""
# Implementation depends on model format
pass
def _validate_schema(self, parsed: dict) -> bool:
"""Validate against Task API schema."""
required = ['action']
return all(k in parsed for k in required)
def _judge_single_output(self, input_text: str, output: str,
expected: Optional[str]) -> dict:
"""Run LLM-as-Judge on single output."""
# Implementation from Lesson 02
pass
def _create_error_result(self, stages: list, start_time: float,
error: str = None) -> PipelineResult:
"""Create error result for pipeline failure."""
return PipelineResult(
overall_status=PipelineStatus.ERROR,
deploy_allowed=False,
stages=stages,
quality_gate_result={'error': error or 'Pipeline error'},
total_duration_seconds=time.time() - start_time,
report_path=""
)
Pipeline Configuration
PIPELINE_CONFIG = {
'llm_judge_sample_rate': 0.10, # Judge 10% of outputs
'regression_threshold': -0.05, # Allow 5% regression max
'quality_gates': [
QualityGate(
name="safety",
metric="harmful_rate",
threshold=0.05,
comparison=Comparison.LESS_THAN,
gate_type=GateType.BLOCKING,
description="Harmful rate must be < 5%"
),
QualityGate(
name="format",
metric="json_valid_rate",
threshold=0.95,
comparison=Comparison.GREATER_EQUAL,
gate_type=GateType.BLOCKING,
description="JSON validity >= 95%"
),
QualityGate(
name="accuracy",
metric="accuracy",
threshold=0.85,
comparison=Comparison.GREATER_EQUAL,
gate_type=GateType.BLOCKING,
description="Accuracy >= 85%"
),
QualityGate(
name="accuracy_target",
metric="accuracy",
threshold=0.90,
comparison=Comparison.GREATER_EQUAL,
gate_type=GateType.WARNING,
description="Target accuracy 90%+"
)
]
}
Running the Pipeline
# Initialize and run
pipeline = EvaluationPipeline(PIPELINE_CONFIG)
result = pipeline.run(
model_path="./models/task-api-v2",
benchmark_path="./benchmarks/task_api_500.json",
baseline_path="./baselines/current.json"
)
# Print summary
print(f"Status: {result.overall_status.value}")
print(f"Deploy Allowed: {result.deploy_allowed}")
print(f"Duration: {result.total_duration_seconds:.1f}s")
print(f"Report: {result.report_path}")
# Exit with appropriate code for CI/CD
import sys
sys.exit(0 if result.deploy_allowed else 1)
Output:
Status: passed
Deploy Allowed: True
Duration: 287.3s
Report: reports/evaluation_20250102_143215.json
CI/CD Integration
GitHub Actions Workflow
# .github/workflows/model-evaluation.yml
name: Model Evaluation Pipeline
on:
push:
paths:
- 'models/**'
workflow_dispatch:
inputs:
model_path:
description: 'Path to model'
required: true
jobs:
evaluate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements-eval.txt
- name: Download baseline
uses: actions/download-artifact@v4
with:
name: baseline-latest
path: baselines/
continue-on-error: true # First run won't have baseline
- name: Run Evaluation Pipeline
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: |
python -m evaluation.pipeline \
--model ${{ github.event.inputs.model_path || 'models/latest' }} \
--benchmark benchmarks/task_api.json \
--baseline baselines/current.json \
--config configs/production.yaml
- name: Upload Report
uses: actions/upload-artifact@v4
with:
name: evaluation-report
path: reports/
- name: Update Baseline on Success
if: success()
uses: actions/upload-artifact@v4
with:
name: baseline-latest
path: baselines/new_baseline.json
- name: Notify on Failure
if: failure()
uses: slackapi/slack-github-action@v1
with:
payload: |
{
"text": "Model evaluation failed for ${{ github.sha }}"
}
Finalize Your Skill
Complete your llmops-evaluator/SKILL.md with the full pipeline reference:
## Complete Pipeline Reference
### Pipeline Stages
| Stage | Purpose | Output |
|-------|---------|--------|
| 1. Automated Metrics | Fast, deterministic checks | Accuracy, format rates |
| 2. LLM-as-Judge | Subjective quality sampling | Helpfulness scores |
| 3. Regression | Compare to baseline | Delta metrics |
| 4. Aggregation | Combine all results | Unified metrics dict |
| 5. Quality Gates | Pass/fail decision | Deploy allowed bool |
### Configuration Checklist
- [ ] Define quality gates (blocking + warning)
- [ ] Set regression threshold (typically -5%)
- [ ] Configure LLM-Judge sample rate (10-20%)
- [ ] Set up baseline management
- [ ] Integrate with CI/CD
### Troubleshooting
| Symptom | Cause | Fix |
|---------|-------|-----|
| All evaluations fail | Invalid model path | Verify model loads |
| Random gate failures | Low sample size | Increase benchmark size |
| Slow pipeline | Too many LLM-Judge calls | Reduce sample rate |
| False regressions | Baseline stale | Update baseline regularly |
### Production Deployment
1. Run pipeline in CI on every model update
2. Block deployment on any blocking gate failure
3. Review warnings before manual deployment
4. Update baseline after successful deployment
5. Monitor production for issues not caught by evaluation
Try With AI
Prompt 1: Extend the Pipeline
My evaluation pipeline covers:
- Automated metrics (accuracy, format)
- LLM-as-Judge (helpfulness)
- Regression testing
- Quality gates
I want to add these capabilities:
1. Safety evaluation (red-team prompts)
2. Latency testing (measure response time)
3. Cost estimation (tokens used per request)
For each, describe:
- How to implement
- Where it fits in the pipeline
- What gates/thresholds to add
What you are learning: Pipeline extension. Production pipelines evolve over time. Your AI partner helps you plan additions that integrate cleanly.
Prompt 2: Debug Pipeline Issues
My pipeline produces inconsistent results:
- Run 1: accuracy 87.2%, PASS
- Run 2: accuracy 84.1%, FAIL
- Run 3: accuracy 88.5%, PASS
Same model, same benchmark. Temperature is set to 0.
Diagnose:
1. What could cause this variance?
2. How can I make results reproducible?
3. Should my thresholds account for this variance?
What you are learning: Pipeline debugging. Non-determinism is common in ML systems. Your AI partner helps identify and address sources of variance.
Prompt 3: Optimize for Scale
My pipeline currently takes 15 minutes and costs $3 per run.
At 50 model updates per week, this is:
- 12.5 hours of CI time
- $150 in evaluation costs
I need to reduce both by 50% while maintaining quality.
Propose optimizations with trade-off analysis for:
1. Parallel execution
2. Smarter sampling
3. Caching strategies
4. Cheaper LLM-Judge alternatives
What you are learning: Cost and performance optimization. Production systems must balance quality against resource constraints. Your AI partner helps find efficient solutions.
Reflect on Your Skill
You have built a complete evaluation skill. Review what you created:
# Check your skill file
cat llmops-evaluator/SKILL.md | wc -l
# Should be 200+ lines covering all evaluation patterns
Your skill now includes:
- Evaluation taxonomy and metric selection
- LLM-as-Judge implementation patterns
- Custom benchmark design
- Regression testing with statistical significance
- Quality gate configuration
- Complete pipeline architecture
This skill is now part of your Digital FTE toolkit. You can invoke it whenever evaluating any fine-tuned model.
Safety Note
Evaluation pipelines are defense in depth, not perfect barriers. Even models passing all evaluations can fail in production. Always implement:
- Production monitoring and alerting
- User feedback collection
- Periodic human review
- Incident response procedures
Never assume a passing pipeline means a safe deployment.