Skip to main content

Pipeline Architecture Design

Before building individual pipeline components, you need to understand how they connect into a complete system. This lesson provides the architectural blueprint for your LLMOps pipeline.

The Complete Pipeline

Your LLMOps pipeline transforms a specification into a deployed Digital FTE:

┌─────────────────────────────────────────────────────────────────────────┐
│ LLMOps Pipeline │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ DATA │──▶│ TRAINING │──▶│ EVAL │──▶│ DEPLOY │ │
│ │ CURATION │ │ │ │ │ │ │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Synthetic│ │ Unsloth │ │LLM Judge │ │ Ollama │ │
│ │ + Clean │ │ + LoRA │ │+ Regress │ │ + Agent │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ MONITORING & CONTINUOUS TRAINING │ │
│ └────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘

Pipeline Stages

Stage 1: Data Curation

Purpose: Create high-quality training data from raw sources.

Inputs:

  • Domain specification (what expertise to encode)
  • Seed examples (real or synthetic)
  • Quality criteria

Process:

# Data curation workflow
raw_data = collect_domain_examples()
cleaned_data = deduplicate_and_validate(raw_data)
synthetic_data = generate_synthetic_examples(cleaned_data, count=150)
training_data = format_for_training(cleaned_data + synthetic_data)

Outputs:

  • train.jsonl (80% of data)
  • validation.jsonl (20% of data)
  • data_quality_report.json

Quality Gate:

MetricThreshold
Example count>= 200
Format validity100%
Deduplication ratio< 5% duplicates
Token distributionMean < 2048 tokens

Stage 2: Training Orchestration

Purpose: Fine-tune base model with curated data.

Inputs:

  • Training dataset from Stage 1
  • Base model selection
  • Compute constraints (T4 GPU, 16GB VRAM)

Process:

# Training workflow
model = load_base_model("Qwen/Qwen2.5-3B-Instruct")
model = apply_lora_adapters(model, rank=16, alpha=32)
model = quantize_4bit(model) # Fit in T4 VRAM

trainer = SFTTrainer(
model=model,
train_dataset=load_dataset("train.jsonl"),
eval_dataset=load_dataset("validation.jsonl"),
max_steps=500,
per_device_train_batch_size=2,
)
trainer.train()

Outputs:

  • adapter_weights/ (LoRA checkpoint)
  • training_metrics.json
  • loss_curves.png

Quality Gate:

MetricThreshold
Final loss< 0.5
Validation lossConverging (not diverging)
Training time< 2 hours (Colab limit)

Stage 3: Evaluation Integration

Purpose: Verify model quality before deployment.

Inputs:

  • Trained model from Stage 2
  • Test dataset (held out from training)
  • Quality thresholds

Process:

# Evaluation workflow
model = load_trained_model("adapter_weights/")
test_results = evaluate_on_testset(model, "test.jsonl")

# Automated quality gates
llm_judge_score = run_llm_judge(test_results)
regression_check = compare_to_baseline(test_results, baseline_results)
safety_check = run_safety_evaluation(test_results)

Outputs:

  • eval_report.json
  • llm_judge_scores.json
  • regression_analysis.json
  • pass_fail_decision (boolean)

Quality Gate:

MetricThreshold
Task accuracy>= 90%
LLM judge score>= 0.85
No regressionBaseline score +/- 5%
Safety pass100%

Stage 4: Deployment Automation

Purpose: Deploy validated model to production environment.

Inputs:

  • Validated model from Stage 3
  • Deployment target (Ollama, API, etc.)
  • Integration configuration

Process:

# Deployment workflow
# 1. Merge LoRA into base model
merged_model = merge_lora_weights(base_model, adapter_weights)

# 2. Convert to deployable format
gguf_model = convert_to_gguf(merged_model, quantization="Q4_K_M")

# 3. Deploy to Ollama
create_modelfile(gguf_model, name="task-api-v1")
ollama.pull("task-api-v1")

# 4. Integrate with agent framework
agent_config = generate_agent_config(model_name="task-api-v1")

Outputs:

  • model.gguf (deployable model)
  • Modelfile (Ollama configuration)
  • agent_config.json
  • deployment_status (success/failure)

Quality Gate:

MetricThreshold
Deployment successTrue
Health checkResponding
Latency< 500ms first token
Memory usage< 8GB

Stage Dependencies

Pipeline stages have strict dependencies:

DATA ───────────┐


TRAINING ──────────┐


EVALUATION ──────────┐


DEPLOYMENT

Why This Order Matters:

StageDepends OnCannot Run Until
TrainingDataDataset exists and validated
EvaluationTrainingModel checkpoint exists
DeploymentEvaluationQuality gates passed

Anti-Pattern: Parallel Execution

# WRONG: Training before data is ready
pipeline.run_parallel([data_stage, training_stage])

# Data not ready when training starts = undefined behavior

Correct Pattern: Sequential with Gates

# RIGHT: Each stage gates the next
data_result = await pipeline.run(data_stage)
if not data_result.passed:
raise PipelineError("Data quality gate failed")

training_result = await pipeline.run(training_stage)
if not training_result.passed:
raise PipelineError("Training quality gate failed")
# ... continue

Artifact Specifications

Data Stage Artifacts

// train.jsonl (one example per line)
{"messages": [
{"role": "user", "content": "Add task: Buy milk"},
{"role": "assistant", "content": null,
"tool_calls": [{"function": {"name": "add_task", "arguments": "{\"title\": \"Buy milk\"}"}}]}
]}

// data_quality_report.json
{
"total_examples": 200,
"train_count": 160,
"validation_count": 40,
"dedup_ratio": 0.02,
"avg_tokens": 847,
"format_errors": 0,
"quality_score": 0.94
}

Training Stage Artifacts

// training_metrics.json
{
"final_loss": 0.12,
"validation_loss": 0.18,
"epochs_completed": 3,
"total_steps": 500,
"learning_rate": 2e-4,
"training_time_seconds": 3420,
"checkpoint_path": "adapter_weights/"
}

Evaluation Stage Artifacts

// eval_report.json
{
"task_accuracy": 0.94,
"llm_judge_score": 0.89,
"baseline_comparison": {
"baseline_accuracy": 0.91,
"current_accuracy": 0.94,
"improvement": 0.03,
"regression": false
},
"safety_check": {
"harmful_outputs": 0,
"refusal_rate": 0.02,
"passed": true
},
"decision": "PASS"
}

Deployment Stage Artifacts

// deployment_status.json
{
"model_name": "task-api-v1",
"deployment_target": "ollama",
"status": "success",
"health_check": {
"responding": true,
"latency_ms": 245,
"memory_mb": 4200
},
"endpoint": "http://localhost:11434/api/generate",
"agent_config_path": "agent_config.json"
}

Quality Gates Architecture

Each stage has a quality gate that blocks progression:

┌─────────────────────────────────────────────────────┐
│ QUALITY GATES │
├─────────────────────────────────────────────────────┤
│ │
│ DATA ──▶ [Gate 1] ──▶ TRAINING │
│ ├── 200+ examples? │
│ ├── Format valid? │
│ └── Duplicates < 5%? │
│ │
│ TRAINING ──▶ [Gate 2] ──▶ EVALUATION │
│ ├── Loss < 0.5? │
│ ├── Converging? │
│ └── Time < 2 hours? │
│ │
│ EVALUATION ──▶ [Gate 3] ──▶ DEPLOYMENT │
│ ├── Accuracy >= 90%? │
│ ├── No regression? │
│ └── Safety passed? │
│ │
│ DEPLOYMENT ──▶ [Gate 4] ──▶ PRODUCTION │
│ ├── Health check OK? │
│ ├── Latency < 500ms? │
│ └── Memory < 8GB? │
│ │
└─────────────────────────────────────────────────────┘

Gate Implementation Pattern:

class QualityGate:
def __init__(self, name: str, checks: list[Callable]):
self.name = name
self.checks = checks

def evaluate(self, artifacts: dict) -> GateResult:
results = []
for check in self.checks:
result = check(artifacts)
results.append(result)

passed = all(r.passed for r in results)
return GateResult(
gate=self.name,
passed=passed,
checks=results,
reason=self._summarize_failures(results) if not passed else None
)

# Example usage
data_gate = QualityGate("data_quality", [
lambda a: Check("example_count", a["count"] >= 200),
lambda a: Check("format_valid", a["errors"] == 0),
lambda a: Check("dedup_ratio", a["duplicates"] < 0.05),
])

Rollback Strategy

When a stage fails, the pipeline needs to recover:

┌─────────────────────────────────────────────────────┐
│ ROLLBACK FLOWS │
├─────────────────────────────────────────────────────┤
│ │
│ Stage Failed │ Rollback Action │
│ ─────────────────────┼────────────────────────────│
│ Data Curation │ Regenerate dataset │
│ Training │ Adjust hyperparameters │
│ Evaluation │ Return to Training │
│ Deployment │ Keep previous version │
│ │
└─────────────────────────────────────────────────────┘

Rollback Implementation:

class PipelineOrchestrator:
def __init__(self):
self.checkpoints = {}

def run_with_rollback(self, stages: list[Stage]):
for stage in stages:
try:
# Save checkpoint before stage
self.checkpoints[stage.name] = self.capture_state()

# Run stage
result = stage.run()

# Check quality gate
if not result.gate.passed:
self.rollback(stage.name)
raise GateFailed(stage.name, result.gate.reason)

except Exception as e:
self.rollback(stage.name)
raise

def rollback(self, stage_name: str):
if stage_name in self.checkpoints:
self.restore_state(self.checkpoints[stage_name])
logging.info(f"Rolled back to checkpoint before {stage_name}")

Pipeline Configuration

Define your pipeline in a specification file:

# pipeline.yaml
name: task-api-llmops
version: "1.0.0"

stages:
data:
source: "synthetic"
count: 200
format: "chat"
quality_gate:
min_examples: 200
max_duplicates: 0.05

training:
base_model: "Qwen/Qwen2.5-3B-Instruct"
method: "lora"
lora_rank: 16
batch_size: 2
max_steps: 500
quality_gate:
max_loss: 0.5
max_time_hours: 2

evaluation:
test_split: 0.2
quality_gate:
min_accuracy: 0.90
regression_tolerance: 0.05

deployment:
target: "ollama"
quantization: "Q4_K_M"
quality_gate:
max_latency_ms: 500
max_memory_gb: 8

constraints:
hardware: "colab-t4"
max_cost: "$1.00"
max_time_hours: 4

What You Designed

Your pipeline architecture:

ComponentPurposeImplementation
4 StagesComplete workflowData → Train → Eval → Deploy
Quality GatesPrevent bad deploymentsStage-specific thresholds
Artifact SpecsEnable handoffsJSON schemas per stage
DependenciesCorrect orderingSequential with gates
RollbackError recoveryCheckpoint restoration

This architecture ensures your Digital FTE is built reliably and reproducibly.

Try With AI

Prompt 1: Analyze Your Pipeline Dependencies

I'm designing an LLMOps pipeline with these stages:
1. Data Curation
2. Training
3. Evaluation
4. Deployment

Help me create a dependency diagram. For each stage:
- What inputs does it require?
- What outputs does it produce?
- What happens if it fails?
- Can any stages run in parallel?

What you're learning: Dependency analysis—understanding how pipeline components must sequence.

Prompt 2: Design Quality Gates

For my LLMOps pipeline, I need quality gates at each stage.
My constraints: Colab Free Tier (T4 GPU), < $1 total cost, < 4 hours.

For each gate, help me define:
- 3-4 metrics to check
- Threshold values for pass/fail
- What to do when a gate fails

Start with the Training stage gate.

What you're learning: Quality gate design—defining measurable thresholds that prevent bad deployments.

Prompt 3: Customize for Your Domain

I want to adapt this LLMOps pipeline for [your domain: legal documents,
medical records, financial analysis, customer support].

Ask me questions about:
- What data sources I have
- What quality matters most (accuracy? latency? safety?)
- What deployment environment I'm targeting

Then help me customize the pipeline configuration.

What you're learning: Pipeline customization—adapting architecture to domain-specific requirements.

Safety Note

Pipeline quality gates are your safety net. Never bypass a gate "just to test" in production—failed gates indicate real problems that will affect your Digital FTE's reliability. Test bypasses only in isolated development environments.