Pipeline Architecture Design
Before building individual pipeline components, you need to understand how they connect into a complete system. This lesson provides the architectural blueprint for your LLMOps pipeline.
The Complete Pipeline
Your LLMOps pipeline transforms a specification into a deployed Digital FTE:
┌─────────────────────────────────────────────────────────────────────────┐
│ LLMOps Pipeline │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ DATA │──▶│ TRAINING │──▶│ EVAL │──▶│ DEPLOY │ │
│ │ CURATION │ │ │ │ │ │ │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Synthetic│ │ Unsloth │ │LLM Judge │ │ Ollama │ │
│ │ + Clean │ │ + LoRA │ │+ Regress │ │ + Agent │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ MONITORING & CONTINUOUS TRAINING │ │
│ └────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
Pipeline Stages
Stage 1: Data Curation
Purpose: Create high-quality training data from raw sources.
Inputs:
- Domain specification (what expertise to encode)
- Seed examples (real or synthetic)
- Quality criteria
Process:
# Data curation workflow
raw_data = collect_domain_examples()
cleaned_data = deduplicate_and_validate(raw_data)
synthetic_data = generate_synthetic_examples(cleaned_data, count=150)
training_data = format_for_training(cleaned_data + synthetic_data)
Outputs:
train.jsonl(80% of data)validation.jsonl(20% of data)data_quality_report.json
Quality Gate:
| Metric | Threshold |
|---|---|
| Example count | >= 200 |
| Format validity | 100% |
| Deduplication ratio | < 5% duplicates |
| Token distribution | Mean < 2048 tokens |
Stage 2: Training Orchestration
Purpose: Fine-tune base model with curated data.
Inputs:
- Training dataset from Stage 1
- Base model selection
- Compute constraints (T4 GPU, 16GB VRAM)
Process:
# Training workflow
model = load_base_model("Qwen/Qwen2.5-3B-Instruct")
model = apply_lora_adapters(model, rank=16, alpha=32)
model = quantize_4bit(model) # Fit in T4 VRAM
trainer = SFTTrainer(
model=model,
train_dataset=load_dataset("train.jsonl"),
eval_dataset=load_dataset("validation.jsonl"),
max_steps=500,
per_device_train_batch_size=2,
)
trainer.train()
Outputs:
adapter_weights/(LoRA checkpoint)training_metrics.jsonloss_curves.png
Quality Gate:
| Metric | Threshold |
|---|---|
| Final loss | < 0.5 |
| Validation loss | Converging (not diverging) |
| Training time | < 2 hours (Colab limit) |
Stage 3: Evaluation Integration
Purpose: Verify model quality before deployment.
Inputs:
- Trained model from Stage 2
- Test dataset (held out from training)
- Quality thresholds
Process:
# Evaluation workflow
model = load_trained_model("adapter_weights/")
test_results = evaluate_on_testset(model, "test.jsonl")
# Automated quality gates
llm_judge_score = run_llm_judge(test_results)
regression_check = compare_to_baseline(test_results, baseline_results)
safety_check = run_safety_evaluation(test_results)
Outputs:
eval_report.jsonllm_judge_scores.jsonregression_analysis.jsonpass_fail_decision(boolean)
Quality Gate:
| Metric | Threshold |
|---|---|
| Task accuracy | >= 90% |
| LLM judge score | >= 0.85 |
| No regression | Baseline score +/- 5% |
| Safety pass | 100% |
Stage 4: Deployment Automation
Purpose: Deploy validated model to production environment.
Inputs:
- Validated model from Stage 3
- Deployment target (Ollama, API, etc.)
- Integration configuration
Process:
# Deployment workflow
# 1. Merge LoRA into base model
merged_model = merge_lora_weights(base_model, adapter_weights)
# 2. Convert to deployable format
gguf_model = convert_to_gguf(merged_model, quantization="Q4_K_M")
# 3. Deploy to Ollama
create_modelfile(gguf_model, name="task-api-v1")
ollama.pull("task-api-v1")
# 4. Integrate with agent framework
agent_config = generate_agent_config(model_name="task-api-v1")
Outputs:
model.gguf(deployable model)Modelfile(Ollama configuration)agent_config.jsondeployment_status(success/failure)
Quality Gate:
| Metric | Threshold |
|---|---|
| Deployment success | True |
| Health check | Responding |
| Latency | < 500ms first token |
| Memory usage | < 8GB |
Stage Dependencies
Pipeline stages have strict dependencies:
DATA ───────────┐
│
▼
TRAINING ──────────┐
│
▼
EVALUATION ──────────┐
│
▼
DEPLOYMENT
Why This Order Matters:
| Stage | Depends On | Cannot Run Until |
|---|---|---|
| Training | Data | Dataset exists and validated |
| Evaluation | Training | Model checkpoint exists |
| Deployment | Evaluation | Quality gates passed |
Anti-Pattern: Parallel Execution
# WRONG: Training before data is ready
pipeline.run_parallel([data_stage, training_stage])
# Data not ready when training starts = undefined behavior
Correct Pattern: Sequential with Gates
# RIGHT: Each stage gates the next
data_result = await pipeline.run(data_stage)
if not data_result.passed:
raise PipelineError("Data quality gate failed")
training_result = await pipeline.run(training_stage)
if not training_result.passed:
raise PipelineError("Training quality gate failed")
# ... continue
Artifact Specifications
Data Stage Artifacts
// train.jsonl (one example per line)
{"messages": [
{"role": "user", "content": "Add task: Buy milk"},
{"role": "assistant", "content": null,
"tool_calls": [{"function": {"name": "add_task", "arguments": "{\"title\": \"Buy milk\"}"}}]}
]}
// data_quality_report.json
{
"total_examples": 200,
"train_count": 160,
"validation_count": 40,
"dedup_ratio": 0.02,
"avg_tokens": 847,
"format_errors": 0,
"quality_score": 0.94
}
Training Stage Artifacts
// training_metrics.json
{
"final_loss": 0.12,
"validation_loss": 0.18,
"epochs_completed": 3,
"total_steps": 500,
"learning_rate": 2e-4,
"training_time_seconds": 3420,
"checkpoint_path": "adapter_weights/"
}
Evaluation Stage Artifacts
// eval_report.json
{
"task_accuracy": 0.94,
"llm_judge_score": 0.89,
"baseline_comparison": {
"baseline_accuracy": 0.91,
"current_accuracy": 0.94,
"improvement": 0.03,
"regression": false
},
"safety_check": {
"harmful_outputs": 0,
"refusal_rate": 0.02,
"passed": true
},
"decision": "PASS"
}
Deployment Stage Artifacts
// deployment_status.json
{
"model_name": "task-api-v1",
"deployment_target": "ollama",
"status": "success",
"health_check": {
"responding": true,
"latency_ms": 245,
"memory_mb": 4200
},
"endpoint": "http://localhost:11434/api/generate",
"agent_config_path": "agent_config.json"
}
Quality Gates Architecture
Each stage has a quality gate that blocks progression:
┌─────────────────────────────────────────────────────┐
│ QUALITY GATES │
├─────────────────────────────────────────────────────┤
│ │
│ DATA ──▶ [Gate 1] ──▶ TRAINING │
│ ├── 200+ examples? │
│ ├── Format valid? │
│ └── Duplicates < 5%? │
│ │
│ TRAINING ──▶ [Gate 2] ──▶ EVALUATION │
│ ├── Loss < 0.5? │
│ ├── Converging? │
│ └── Time < 2 hours? │
│ │
│ EVALUATION ──▶ [Gate 3] ──▶ DEPLOYMENT │
│ ├── Accuracy >= 90%? │
│ ├── No regression? │
│ └── Safety passed? │
│ │
│ DEPLOYMENT ──▶ [Gate 4] ──▶ PRODUCTION │
│ ├── Health check OK? │
│ ├── Latency < 500ms? │
│ └── Memory < 8GB? │
│ │
└─────────────────────────────────────────────────────┘
Gate Implementation Pattern:
class QualityGate:
def __init__(self, name: str, checks: list[Callable]):
self.name = name
self.checks = checks
def evaluate(self, artifacts: dict) -> GateResult:
results = []
for check in self.checks:
result = check(artifacts)
results.append(result)
passed = all(r.passed for r in results)
return GateResult(
gate=self.name,
passed=passed,
checks=results,
reason=self._summarize_failures(results) if not passed else None
)
# Example usage
data_gate = QualityGate("data_quality", [
lambda a: Check("example_count", a["count"] >= 200),
lambda a: Check("format_valid", a["errors"] == 0),
lambda a: Check("dedup_ratio", a["duplicates"] < 0.05),
])
Rollback Strategy
When a stage fails, the pipeline needs to recover:
┌─────────────────────────────────────────────────────┐
│ ROLLBACK FLOWS │
├─────────────────────────────────────────────────────┤
│ │
│ Stage Failed │ Rollback Action │
│ ─────────────────────┼────────────────────────────│
│ Data Curation │ Regenerate dataset │
│ Training │ Adjust hyperparameters │
│ Evaluation │ Return to Training │
│ Deployment │ Keep previous version │
│ │
└─────────────────────────────────────────────────────┘
Rollback Implementation:
class PipelineOrchestrator:
def __init__(self):
self.checkpoints = {}
def run_with_rollback(self, stages: list[Stage]):
for stage in stages:
try:
# Save checkpoint before stage
self.checkpoints[stage.name] = self.capture_state()
# Run stage
result = stage.run()
# Check quality gate
if not result.gate.passed:
self.rollback(stage.name)
raise GateFailed(stage.name, result.gate.reason)
except Exception as e:
self.rollback(stage.name)
raise
def rollback(self, stage_name: str):
if stage_name in self.checkpoints:
self.restore_state(self.checkpoints[stage_name])
logging.info(f"Rolled back to checkpoint before {stage_name}")
Pipeline Configuration
Define your pipeline in a specification file:
# pipeline.yaml
name: task-api-llmops
version: "1.0.0"
stages:
data:
source: "synthetic"
count: 200
format: "chat"
quality_gate:
min_examples: 200
max_duplicates: 0.05
training:
base_model: "Qwen/Qwen2.5-3B-Instruct"
method: "lora"
lora_rank: 16
batch_size: 2
max_steps: 500
quality_gate:
max_loss: 0.5
max_time_hours: 2
evaluation:
test_split: 0.2
quality_gate:
min_accuracy: 0.90
regression_tolerance: 0.05
deployment:
target: "ollama"
quantization: "Q4_K_M"
quality_gate:
max_latency_ms: 500
max_memory_gb: 8
constraints:
hardware: "colab-t4"
max_cost: "$1.00"
max_time_hours: 4
What You Designed
Your pipeline architecture:
| Component | Purpose | Implementation |
|---|---|---|
| 4 Stages | Complete workflow | Data → Train → Eval → Deploy |
| Quality Gates | Prevent bad deployments | Stage-specific thresholds |
| Artifact Specs | Enable handoffs | JSON schemas per stage |
| Dependencies | Correct ordering | Sequential with gates |
| Rollback | Error recovery | Checkpoint restoration |
This architecture ensures your Digital FTE is built reliably and reproducibly.
Try With AI
Prompt 1: Analyze Your Pipeline Dependencies
I'm designing an LLMOps pipeline with these stages:
1. Data Curation
2. Training
3. Evaluation
4. Deployment
Help me create a dependency diagram. For each stage:
- What inputs does it require?
- What outputs does it produce?
- What happens if it fails?
- Can any stages run in parallel?
What you're learning: Dependency analysis—understanding how pipeline components must sequence.
Prompt 2: Design Quality Gates
For my LLMOps pipeline, I need quality gates at each stage.
My constraints: Colab Free Tier (T4 GPU), < $1 total cost, < 4 hours.
For each gate, help me define:
- 3-4 metrics to check
- Threshold values for pass/fail
- What to do when a gate fails
Start with the Training stage gate.
What you're learning: Quality gate design—defining measurable thresholds that prevent bad deployments.
Prompt 3: Customize for Your Domain
I want to adapt this LLMOps pipeline for [your domain: legal documents,
medical records, financial analysis, customer support].
Ask me questions about:
- What data sources I have
- What quality matters most (accuracy? latency? safety?)
- What deployment environment I'm targeting
Then help me customize the pipeline configuration.
What you're learning: Pipeline customization—adapting architecture to domain-specific requirements.
Safety Note
Pipeline quality gates are your safety net. Never bypass a gate "just to test" in production—failed gates indicate real problems that will affect your Digital FTE's reliability. Test bypasses only in isolated development environments.