Monitoring and Observability
Your Digital FTE is deployed. Customers are using it. But how do you know it's working correctly? Production models fail silently—response quality degrades, latency creeps up, edge cases emerge that training didn't cover.
Monitoring transforms your deployment from a black box into an observable system. This lesson shows you how to instrument your model serving, detect quality drift, and respond to incidents before customers notice.
The Observability Stack
┌─────────────────────────────────────────────────────────────────────────────┐
│ OBSERVABILITY ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ DASHBOARDS │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Health │ │ Quality │ │ Alerts │ │ │
│ │ │ Dashboard │ │ Dashboard │ │ Dashboard │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ ▲ │
│ │ │
│ ┌────────────────────────────────┴─────────────────────────────────────┐ │
│ │ ANALYSIS │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Drift │ │ Quality │ │ Alert │ │ │
│ │ │ Detector │ │ Scorer │ │ Engine │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ ▲ │
│ │ │
│ ┌────────────────────────────────┴─────────────────────────────────────┐ │
│ │ DATA COLLECTION │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Metrics │ │ Logs │ │ Traces │ │ │
│ │ │ (Prometheus)│ │ (JSON) │ │ (Optional) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ ▲ │
│ │ │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ MODEL SERVING LAYER │ │
│ │ │ │
│ │ TaskMaster (Ollama + LiteLLM) │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Project Structure
monitoring/
├── __init__.py
├── config.py
├── metrics/
│ ├── __init__.py
│ ├── collector.py # Prometheus metrics
│ └── exporter.py # Metrics endpoint
├── logging/
│ ├── __init__.py
│ └── structured.py # JSON logging
├── drift/
│ ├── __init__.py
│ ├── detector.py # Drift detection
│ └── scorer.py # Quality scoring
├── alerts/
│ ├── __init__.py
│ ├── rules.py # Alert definitions
│ └── notifiers.py # Notification channels
└── dashboards/
└── grafana/
└── taskmaster.json
Metrics Collection
Create monitoring/metrics/collector.py:
from prometheus_client import Counter, Histogram, Gauge
from dataclasses import dataclass
from typing import Optional
import time
from functools import wraps
# Request metrics
REQUEST_COUNT = Counter(
'taskmaster_requests_total',
'Total requests to TaskMaster',
['endpoint', 'status']
)
REQUEST_LATENCY = Histogram(
'taskmaster_request_latency_seconds',
'Request latency in seconds',
['endpoint'],
buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
# Token metrics
TOKENS_INPUT = Counter(
'taskmaster_tokens_input_total',
'Total input tokens processed'
)
TOKENS_OUTPUT = Counter(
'taskmaster_tokens_output_total',
'Total output tokens generated'
)
# Quality metrics
RESPONSE_LENGTH = Histogram(
'taskmaster_response_length_chars',
'Response length in characters',
buckets=[50, 100, 200, 500, 1000, 2000]
)
# Health metrics
MODEL_LOADED = Gauge(
'taskmaster_model_loaded',
'Whether the model is loaded and ready'
)
LAST_SUCCESSFUL_REQUEST = Gauge(
'taskmaster_last_successful_request_timestamp',
'Timestamp of last successful request'
)
@dataclass
class RequestMetrics:
"""Metrics for a single request."""
endpoint: str
start_time: float
input_tokens: int = 0
output_tokens: int = 0
response_length: int = 0
status: str = "success"
error: Optional[str] = None
class MetricsCollector:
"""Collect and record metrics for TaskMaster."""
def record_request(self, metrics: RequestMetrics):
"""Record metrics for a completed request."""
latency = time.time() - metrics.start_time
# Record counters
REQUEST_COUNT.labels(
endpoint=metrics.endpoint,
status=metrics.status
).inc()
# Record latency
REQUEST_LATENCY.labels(endpoint=metrics.endpoint).observe(latency)
# Record tokens
TOKENS_INPUT.inc(metrics.input_tokens)
TOKENS_OUTPUT.inc(metrics.output_tokens)
# Record response length
RESPONSE_LENGTH.observe(metrics.response_length)
# Update health
if metrics.status == "success":
LAST_SUCCESSFUL_REQUEST.set(time.time())
def set_model_status(self, loaded: bool):
"""Set model loaded status."""
MODEL_LOADED.set(1 if loaded else 0)
def track_request(endpoint: str):
"""Decorator to track request metrics."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
collector = MetricsCollector()
metrics = RequestMetrics(
endpoint=endpoint,
start_time=time.time()
)
try:
result = func(*args, **kwargs)
# Extract metrics from result if available
if hasattr(result, 'usage'):
metrics.input_tokens = result.usage.prompt_tokens
metrics.output_tokens = result.usage.completion_tokens
if hasattr(result, 'content'):
metrics.response_length = len(result.content)
metrics.status = "success"
return result
except Exception as e:
metrics.status = "error"
metrics.error = str(e)
raise
finally:
collector.record_request(metrics)
return wrapper
return decorator
Structured Logging
Create monitoring/logging/structured.py:
import json
import logging
from datetime import datetime
from typing import Any, Dict, Optional
import uuid
class StructuredLogger:
"""JSON structured logging for observability."""
def __init__(self, service_name: str = "taskmaster"):
self.service_name = service_name
self.logger = logging.getLogger(service_name)
def _create_log_entry(
self,
level: str,
message: str,
**kwargs
) -> Dict[str, Any]:
"""Create structured log entry."""
return {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": level,
"service": self.service_name,
"message": message,
"request_id": kwargs.pop("request_id", str(uuid.uuid4())),
**kwargs
}
def info(self, message: str, **kwargs):
entry = self._create_log_entry("INFO", message, **kwargs)
self.logger.info(json.dumps(entry))
def error(self, message: str, **kwargs):
entry = self._create_log_entry("ERROR", message, **kwargs)
self.logger.error(json.dumps(entry))
def warning(self, message: str, **kwargs):
entry = self._create_log_entry("WARNING", message, **kwargs)
self.logger.warning(json.dumps(entry))
def request(
self,
prompt: str,
response: str,
latency_ms: float,
tokens_in: int,
tokens_out: int,
request_id: Optional[str] = None
):
"""Log a request with full details."""
self.info(
"Request completed",
request_id=request_id or str(uuid.uuid4()),
prompt_length=len(prompt),
response_length=len(response),
latency_ms=latency_ms,
tokens_in=tokens_in,
tokens_out=tokens_out,
event_type="request"
)
# Usage example
logger = StructuredLogger()
# In your request handler:
# logger.request(
# prompt="Create a task for budget review",
# response="I'll create that task...",
# latency_ms=245.3,
# tokens_in=12,
# tokens_out=85
# )
Output (log entry):
{
"timestamp": "2026-01-02T16:30:45.123Z",
"level": "INFO",
"service": "taskmaster",
"message": "Request completed",
"request_id": "abc123-def456",
"prompt_length": 32,
"response_length": 145,
"latency_ms": 245.3,
"tokens_in": 12,
"tokens_out": 85,
"event_type": "request"
}
Drift Detection
Create monitoring/drift/detector.py:
from dataclasses import dataclass
from typing import List, Optional, Dict
from datetime import datetime, timedelta
import statistics
import json
@dataclass
class DriftAlert:
metric: str
baseline_value: float
current_value: float
deviation_percent: float
severity: str # warning, critical
timestamp: str
class DriftDetector:
"""Detect model quality drift over time."""
def __init__(
self,
baseline_metrics: Dict[str, float],
warning_threshold: float = 0.15, # 15% deviation
critical_threshold: float = 0.30 # 30% deviation
):
self.baseline = baseline_metrics
self.warning_threshold = warning_threshold
self.critical_threshold = critical_threshold
self.history: Dict[str, List[float]] = {k: [] for k in baseline_metrics}
def record_metric(self, metric: str, value: float):
"""Record a metric observation."""
if metric in self.history:
self.history[metric].append(value)
# Keep last 1000 observations
if len(self.history[metric]) > 1000:
self.history[metric] = self.history[metric][-1000:]
def check_drift(self) -> List[DriftAlert]:
"""Check all metrics for drift."""
alerts = []
for metric, baseline_value in self.baseline.items():
if not self.history.get(metric):
continue
# Calculate recent average (last 100 observations)
recent = self.history[metric][-100:]
current_value = statistics.mean(recent)
# Calculate deviation
if baseline_value == 0:
continue
deviation = abs(current_value - baseline_value) / baseline_value
# Determine severity
severity = None
if deviation >= self.critical_threshold:
severity = "critical"
elif deviation >= self.warning_threshold:
severity = "warning"
if severity:
alerts.append(DriftAlert(
metric=metric,
baseline_value=baseline_value,
current_value=current_value,
deviation_percent=deviation * 100,
severity=severity,
timestamp=datetime.utcnow().isoformat()
))
return alerts
class QualityScorer:
"""Score response quality for drift detection."""
def __init__(self):
self.scores: List[float] = []
def score_response(
self,
prompt: str,
response: str,
latency_ms: float
) -> Dict[str, float]:
"""Score a response on multiple dimensions."""
scores = {}
# Length appropriateness (not too short, not too long)
length = len(response)
if 50 <= length <= 500:
scores["length_score"] = 1.0
elif 20 <= length < 50 or 500 < length <= 1000:
scores["length_score"] = 0.7
else:
scores["length_score"] = 0.3
# Latency score
if latency_ms < 500:
scores["latency_score"] = 1.0
elif latency_ms < 1000:
scores["latency_score"] = 0.8
elif latency_ms < 2000:
scores["latency_score"] = 0.6
else:
scores["latency_score"] = 0.3
# Format score (has structure indicators)
structure_indicators = [":", "-", "*", "**", "\n"]
has_structure = any(ind in response for ind in structure_indicators)
scores["format_score"] = 1.0 if has_structure else 0.5
# Coherence score (no repetition loops)
words = response.split()
if len(words) > 5:
unique_ratio = len(set(words)) / len(words)
scores["coherence_score"] = min(1.0, unique_ratio * 1.5)
else:
scores["coherence_score"] = 1.0
# Overall quality
scores["overall_quality"] = statistics.mean(scores.values())
return scores
# Example usage
baseline = {
"latency_score": 0.85,
"length_score": 0.90,
"format_score": 0.88,
"coherence_score": 0.95,
"overall_quality": 0.89
}
detector = DriftDetector(baseline)
scorer = QualityScorer()
# In your request handler:
# scores = scorer.score_response(prompt, response, latency)
# for metric, value in scores.items():
# detector.record_metric(metric, value)
#
# alerts = detector.check_drift()
# for alert in alerts:
# handle_drift_alert(alert)
Alert Configuration
Create monitoring/alerts/rules.py:
from dataclasses import dataclass
from typing import Callable, List, Optional
from enum import Enum
class Severity(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class AlertRule:
name: str
description: str
condition: str # Prometheus-style expression
severity: Severity
for_duration: str # e.g., "5m"
annotations: dict
# Define alert rules
ALERT_RULES: List[AlertRule] = [
AlertRule(
name="TaskMasterHighLatency",
description="TaskMaster response latency is too high",
condition="histogram_quantile(0.95, rate(taskmaster_request_latency_seconds_bucket[5m])) > 2",
severity=Severity.WARNING,
for_duration="5m",
annotations={
"summary": "High latency detected",
"description": "P95 latency is above 2 seconds for the last 5 minutes",
"runbook": "Check GPU utilization and concurrent request count"
}
),
AlertRule(
name="TaskMasterHighErrorRate",
description="TaskMaster error rate is elevated",
condition="rate(taskmaster_requests_total{status='error'}[5m]) / rate(taskmaster_requests_total[5m]) > 0.05",
severity=Severity.CRITICAL,
for_duration="2m",
annotations={
"summary": "High error rate detected",
"description": "Error rate is above 5% for the last 2 minutes",
"runbook": "Check model health and review error logs"
}
),
AlertRule(
name="TaskMasterModelDown",
description="TaskMaster model is not responding",
condition="taskmaster_model_loaded == 0",
severity=Severity.CRITICAL,
for_duration="1m",
annotations={
"summary": "Model is down",
"description": "TaskMaster model is not loaded or responding",
"runbook": "Restart Ollama service and check model loading"
}
),
AlertRule(
name="TaskMasterQualityDrift",
description="TaskMaster response quality has degraded",
condition="taskmaster_quality_score < 0.7",
severity=Severity.WARNING,
for_duration="15m",
annotations={
"summary": "Quality degradation detected",
"description": "Response quality score has dropped below threshold",
"runbook": "Review recent responses and consider retraining"
}
),
AlertRule(
name="TaskMasterNoRequests",
description="TaskMaster has not received requests",
condition="increase(taskmaster_requests_total[10m]) == 0",
severity=Severity.WARNING,
for_duration="10m",
annotations={
"summary": "No requests received",
"description": "TaskMaster has not received any requests in 10 minutes",
"runbook": "Check client connectivity and load balancer health"
}
)
]
def generate_prometheus_rules() -> str:
"""Generate Prometheus alerting rules YAML."""
rules = []
for rule in ALERT_RULES:
rules.append({
"alert": rule.name,
"expr": rule.condition,
"for": rule.for_duration,
"labels": {
"severity": rule.severity.value
},
"annotations": rule.annotations
})
import yaml
return yaml.dump({
"groups": [{
"name": "taskmaster",
"rules": rules
}]
})
Grafana Dashboard
Create monitoring/dashboards/grafana/taskmaster.json:
{
"dashboard": {
"title": "TaskMaster Digital FTE",
"uid": "taskmaster-main",
"panels": [
{
"title": "Request Rate",
"type": "graph",
"gridPos": {"x": 0, "y": 0, "w": 12, "h": 8},
"targets": [
{
"expr": "rate(taskmaster_requests_total[5m])",
"legendFormat": "{{status}}"
}
]
},
{
"title": "Latency (P50, P95, P99)",
"type": "graph",
"gridPos": {"x": 12, "y": 0, "w": 12, "h": 8},
"targets": [
{
"expr": "histogram_quantile(0.50, rate(taskmaster_request_latency_seconds_bucket[5m]))",
"legendFormat": "P50"
},
{
"expr": "histogram_quantile(0.95, rate(taskmaster_request_latency_seconds_bucket[5m]))",
"legendFormat": "P95"
},
{
"expr": "histogram_quantile(0.99, rate(taskmaster_request_latency_seconds_bucket[5m]))",
"legendFormat": "P99"
}
]
},
{
"title": "Error Rate",
"type": "stat",
"gridPos": {"x": 0, "y": 8, "w": 6, "h": 4},
"targets": [
{
"expr": "rate(taskmaster_requests_total{status='error'}[5m]) / rate(taskmaster_requests_total[5m]) * 100",
"legendFormat": "Error %"
}
],
"fieldConfig": {
"defaults": {
"unit": "percent",
"thresholds": {
"mode": "absolute",
"steps": [
{"color": "green", "value": null},
{"color": "yellow", "value": 1},
{"color": "red", "value": 5}
]
}
}
}
},
{
"title": "Model Status",
"type": "stat",
"gridPos": {"x": 6, "y": 8, "w": 6, "h": 4},
"targets": [
{
"expr": "taskmaster_model_loaded",
"legendFormat": "Status"
}
],
"fieldConfig": {
"defaults": {
"mappings": [
{"type": "value", "options": {"0": {"text": "DOWN", "color": "red"}}},
{"type": "value", "options": {"1": {"text": "UP", "color": "green"}}}
]
}
}
},
{
"title": "Quality Score",
"type": "gauge",
"gridPos": {"x": 12, "y": 8, "w": 6, "h": 4},
"targets": [
{
"expr": "taskmaster_quality_score",
"legendFormat": "Quality"
}
],
"fieldConfig": {
"defaults": {
"min": 0,
"max": 1,
"thresholds": {
"mode": "absolute",
"steps": [
{"color": "red", "value": null},
{"color": "yellow", "value": 0.7},
{"color": "green", "value": 0.85}
]
}
}
}
},
{
"title": "Tokens Processed",
"type": "graph",
"gridPos": {"x": 0, "y": 12, "w": 24, "h": 8},
"targets": [
{
"expr": "rate(taskmaster_tokens_input_total[5m])",
"legendFormat": "Input tokens/s"
},
{
"expr": "rate(taskmaster_tokens_output_total[5m])",
"legendFormat": "Output tokens/s"
}
]
}
]
}
}
Integration with Serving Layer
Create monitoring/middleware.py:
from functools import wraps
from typing import Callable
import time
from .metrics.collector import MetricsCollector, RequestMetrics
from .logging.structured import StructuredLogger
from .drift.detector import DriftDetector, QualityScorer
# Initialize components
collector = MetricsCollector()
logger = StructuredLogger()
scorer = QualityScorer()
# Load baseline from previous evaluation
BASELINE_METRICS = {
"latency_score": 0.85,
"length_score": 0.90,
"format_score": 0.88,
"coherence_score": 0.95,
"overall_quality": 0.89
}
detector = DriftDetector(BASELINE_METRICS)
def monitor_request(endpoint: str):
"""Decorator to add full observability to a request handler."""
def decorator(func: Callable):
@wraps(func)
def wrapper(prompt: str, *args, **kwargs):
start_time = time.time()
request_id = kwargs.get("request_id", f"{int(time.time()*1000)}")
try:
# Execute the request
response = func(prompt, *args, **kwargs)
# Calculate metrics
latency_ms = (time.time() - start_time) * 1000
# Score quality
scores = scorer.score_response(prompt, response, latency_ms)
# Record drift metrics
for metric, value in scores.items():
detector.record_metric(metric, value)
# Record Prometheus metrics
collector.record_request(RequestMetrics(
endpoint=endpoint,
start_time=start_time,
response_length=len(response),
status="success"
))
# Structured log
logger.request(
prompt=prompt,
response=response,
latency_ms=latency_ms,
tokens_in=len(prompt.split()), # Approximate
tokens_out=len(response.split()),
request_id=request_id
)
# Check for drift
alerts = detector.check_drift()
for alert in alerts:
logger.warning(
f"Drift detected: {alert.metric}",
request_id=request_id,
alert=alert.__dict__
)
return response
except Exception as e:
# Record error metrics
collector.record_request(RequestMetrics(
endpoint=endpoint,
start_time=start_time,
status="error",
error=str(e)
))
logger.error(
"Request failed",
request_id=request_id,
error=str(e),
prompt_length=len(prompt)
)
raise
return wrapper
return decorator
# Usage example:
#
# @monitor_request("generate")
# def generate(prompt: str) -> str:
# response = model.generate(prompt)
# return response
Try With AI
Prompt 1: Build Custom Dashboards
I want to create a custom Grafana dashboard for TaskMaster that shows:
1. Business metrics (requests by task type, average task complexity)
2. SLA compliance (% of requests under 500ms, uptime)
3. Cost metrics (tokens consumed, estimated cost)
4. Comparison view (this week vs last week)
Design the dashboard layout and provide the Prometheus queries and
Grafana JSON configuration.
What you're learning: Designing business-focused observability dashboards.
Prompt 2: Implement Automated Remediation
I want my monitoring system to automatically respond to certain conditions:
1. High latency → Scale up replicas
2. Model down → Restart service
3. Quality drift → Switch to fallback model
4. High error rate → Enable circuit breaker
Design an automated remediation system that:
- Integrates with my alerting
- Has safeguards against runaway automation
- Logs all actions for audit
- Supports manual override
Show the implementation.
What you're learning: Building self-healing production systems.
Prompt 3: Add User Feedback Loop
I want to collect user feedback to improve drift detection:
1. Add thumbs up/down buttons to responses
2. Collect feedback with request context
3. Use feedback to validate quality scores
4. Generate reports on user satisfaction trends
5. Trigger alerts on satisfaction drops
Design the feedback collection system and its integration with
my existing monitoring infrastructure.
What you're learning: Incorporating human feedback into production monitoring.