Centralized Logging with Loki
Your Task API is running in production. Prometheus tells you the error rate spiked at 3am. Jaeger shows a slow request took 2.3 seconds. But what actually happened? What error message did the user see? What was the request payload that triggered the failure?
Metrics show you THAT something went wrong. Traces show you WHERE in the system it happened. Logs show you WHAT specifically occurred.
This is the needle-in-a-haystack problem of distributed systems. Without centralized logging, you're logging into individual pod shells, grepping through files, and hoping the container hasn't restarted and lost the evidence. With hundreds of pods across multiple nodes, this becomes impossible.
Grafana Loki solves this by aggregating logs from all your containers into a single queryable store. But unlike Elasticsearch (which indexes every word of every log), Loki takes a radically different approach: it indexes only the labels (metadata), not the log content itself. This makes Loki orders of magnitude cheaper to operate while still enabling fast queries when you know what you're looking for.
By the end of this lesson, you'll query logs across your entire cluster with LogQL, implement structured logging that plays well with Loki's architecture, and correlate logs with traces for full-stack debugging.
Why Loki Indexes Labels, Not Content
Traditional log aggregation tools like Elasticsearch create full-text indexes of every log line. This enables powerful searches ("find all logs containing 'timeout'") but comes with significant costs:
| Approach | Storage Cost | Query Speed | Best For |
|---|---|---|---|
| Full-text indexing (Elasticsearch) | High (index can exceed data size) | Fast for any query | Ad-hoc exploration |
| Label-only indexing (Loki) | Low (minimal index overhead) | Fast when labels narrow scope | Known patterns, targeted debugging |
Loki's insight: in practice, you rarely need to search "all logs everywhere." You search logs from:
- A specific namespace (
{namespace="production"}) - A specific application (
{app="task-api"}) - A specific time window (last 2 hours)
- With specific content filters (
|= "error")
By forcing you to narrow with labels first, then filter content, Loki achieves 10x lower storage costs while remaining fast for real debugging scenarios.
Loki Architecture
┌────────────────────────────────────────────────────────────────┐
│ Kubernetes Cluster │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Pod A │ │ Pod B │ │ Pod C │ (Application Pods) │
│ │ stdout→ │ │ stdout→ │ │ stdout→ │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Promtail (DaemonSet) │ │
│ │ - Tails /var/log/pods/*/*.log │ │
│ │ - Adds Kubernetes labels automatically │ │
│ │ - Pushes to Loki │ │
│ └──────────────────────┬──────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────┐ │
│ │ Loki │ │
│ │ ┌─────────────┐ ┌─────────────────────┐ │ │
│ │ │ Ingester │ │ Chunk Storage │ │ │
│ │ │ (index │ │ (compressed logs) │ │ │
│ │ │ labels) │ │ │ │ │
│ │ └─────────────┘ └─────────────────────┘ │ │
│ └──────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────┐ │
│ │ Grafana UI │ │
│ │ LogQL queries + Explore │ │
│ └──────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────┘
Components:
- Promtail: Agent running on each node (DaemonSet) that discovers logs, adds Kubernetes labels, and pushes to Loki
- Loki: The log aggregation server that indexes labels and stores compressed log chunks
- Grafana: Query interface using LogQL (Loki's query language)
Installing the Loki Stack
Install Loki with Promtail using Helm:
# Add Grafana Helm repository
helm repo add grafana https://grafana.github.io/helm-charts
helm repo update
# Install Loki stack with Promtail
helm install loki grafana/loki-stack \
--namespace monitoring \
--set promtail.enabled=true \
--set grafana.enabled=false # Use existing Grafana from kube-prometheus-stack
Output:
NAME: loki
NAMESPACE: monitoring
STATUS: deployed
REVISION: 1
NOTES:
The Loki stack has been deployed to your cluster.
Loki is now available at:
http://loki:3100
Promtail is running on each node and sending logs to Loki.
Configure Grafana to Use Loki
Add Loki as a data source in your existing Grafana instance:
# loki-datasource.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: grafana-loki-datasource
namespace: monitoring
labels:
grafana_datasource: "1"
data:
loki-datasource.yaml: |
apiVersion: 1
datasources:
- name: Loki
type: loki
access: proxy
url: http://loki:3100
jsonData:
maxLines: 1000
Apply the configuration:
kubectl apply -f loki-datasource.yaml
Output:
configmap/grafana-loki-datasource created
Verify Logs Are Flowing
Check that Promtail is collecting logs:
kubectl logs -n monitoring -l app=promtail --tail=20
Output:
level=info ts=2025-01-15T10:30:45.123Z caller=main.go:112 msg="Starting Promtail"
level=info ts=2025-01-15T10:30:45.456Z caller=server.go:261 msg="Listening on" address="0.0.0.0:9080"
level=info ts=2025-01-15T10:30:46.789Z caller=tailer.go:190 msg="tailing new file" path="/var/log/pods/default_task-api-7d8f6c5b4d-abc12_uid/task-api/0.log"
level=info ts=2025-01-15T10:30:47.012Z caller=tailer.go:190 msg="tailing new file" path="/var/log/pods/monitoring_prometheus-0_uid/prometheus/0.log"
LogQL: Querying Logs
LogQL is Loki's query language, designed to feel familiar if you know PromQL. Every query starts with a stream selector (the labels) and optionally adds filters and parsers.
Stream Selectors
Stream selectors filter logs by label. These are the "index" in Loki:
# All logs from a specific namespace
{namespace="default"}
# Logs from a specific app
{app="task-api"}
# Combine selectors
{namespace="default", app="task-api"}
# Regex matching
{namespace=~"prod|staging"}
# Exclude a label value
{namespace="default", container!="istio-proxy"}
Line Filters
After selecting streams, filter log content:
# Contains "error" (case-sensitive)
{app="task-api"} |= "error"
# Does not contain "health"
{app="task-api"} != "health"
# Regex match
{app="task-api"} |~ "user_id=[0-9]+"
# Case-insensitive contains
{app="task-api"} |~ "(?i)error"
Log Parsers
Parse structured logs to extract fields:
# Parse JSON logs
{app="task-api"} | json
# Parse and filter by extracted field
{app="task-api"} | json | level="error"
# Parse specific fields
{app="task-api"} | json | line_format "{{.timestamp}} [{{.level}}] {{.message}}"
# Parse with logfmt (key=value format)
{app="task-api"} | logfmt | duration > 1s
Aggregation Queries
Count and aggregate log data:
# Count errors per minute
sum(rate({app="task-api"} |= "error" [1m])) by (pod)
# Count logs by level
sum by (level) (count_over_time({app="task-api"} | json [5m]))
# Find the top 5 error-producing endpoints
topk(5, sum by (endpoint) (count_over_time({app="task-api"} | json | level="error" [1h])))
Practical Query Examples
Here are queries you'll use daily:
# Recent errors from Task API
{namespace="default", app="task-api"} |= "error" | json
# Output (in Grafana Explore):
# 2025-01-15 10:32:45 {"timestamp":"2025-01-15T10:32:45.123Z","level":"error","message":"Database connection failed","error":"connection refused","trace_id":"abc123"}
# 2025-01-15 10:32:47 {"timestamp":"2025-01-15T10:32:47.456Z","level":"error","message":"Request timeout","endpoint":"/tasks/create","trace_id":"def456"}
# Slow requests (parse JSON, filter by duration)
{app="task-api"} | json | duration > 1000
# Output:
# 2025-01-15 10:33:12 {"timestamp":"2025-01-15T10:33:12.789Z","level":"info","message":"Request completed","endpoint":"/tasks/list","duration":2341,"trace_id":"ghi789"}
# Logs correlated with a specific trace
{app="task-api"} |= "trace_id=\"abc123\""
# Output:
# 2025-01-15 10:32:44 {"timestamp":"2025-01-15T10:32:44.100Z","level":"info","message":"Request received","endpoint":"/tasks/5","trace_id":"abc123"}
# 2025-01-15 10:32:44 {"timestamp":"2025-01-15T10:32:44.500Z","level":"debug","message":"Fetching from database","trace_id":"abc123"}
# 2025-01-15 10:32:45 {"timestamp":"2025-01-15T10:32:45.123Z","level":"error","message":"Database connection failed","trace_id":"abc123"}
Structured Logging in Python
For Loki's label-based architecture to work well, your application should produce structured logs. JSON is the standard format.
Basic Structured Logging
# logging_config.py
import logging
import json
import sys
from datetime import datetime
from typing import Any
class JSONFormatter(logging.Formatter):
"""Format logs as JSON for Loki consumption."""
def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname.lower(),
"message": record.getMessage(),
"logger": record.name,
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
# Add extra fields if present
if hasattr(record, "trace_id"):
log_data["trace_id"] = record.trace_id
if hasattr(record, "span_id"):
log_data["span_id"] = record.span_id
if hasattr(record, "user_id"):
log_data["user_id"] = record.user_id
if hasattr(record, "endpoint"):
log_data["endpoint"] = record.endpoint
if hasattr(record, "duration"):
log_data["duration"] = record.duration
# Add exception info if present
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
return json.dumps(log_data)
def setup_logging(level: str = "INFO") -> logging.Logger:
"""Configure structured JSON logging."""
logger = logging.getLogger("task-api")
logger.setLevel(getattr(logging, level.upper()))
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
return logger
Output (when logging):
{"timestamp":"2025-01-15T10:35:00.123Z","level":"info","message":"Request received","logger":"task-api","module":"main","function":"get_task","line":42,"endpoint":"/tasks/5","trace_id":"abc123"}
Correlating Logs with Traces
The key to debugging distributed systems is connecting logs to traces. When you see a slow span in Jaeger, you want to jump directly to the corresponding logs:
# trace_context.py
from contextvars import ContextVar
from opentelemetry import trace
from opentelemetry.trace import SpanContext
import logging
# Context variable to hold trace context across async boundaries
trace_context: ContextVar[dict] = ContextVar("trace_context", default={})
class TraceContextFilter(logging.Filter):
"""Inject trace context into every log record."""
def filter(self, record: logging.LogRecord) -> bool:
span = trace.get_current_span()
span_context: SpanContext = span.get_span_context()
if span_context.is_valid:
# Format as hex strings for consistency with Jaeger
record.trace_id = format(span_context.trace_id, "032x")
record.span_id = format(span_context.span_id, "016x")
else:
record.trace_id = "00000000000000000000000000000000"
record.span_id = "0000000000000000"
return True
def setup_logging_with_traces(level: str = "INFO") -> logging.Logger:
"""Configure logging with automatic trace correlation."""
logger = logging.getLogger("task-api")
logger.setLevel(getattr(logging, level.upper()))
# Add trace context filter
logger.addFilter(TraceContextFilter())
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
return logger
FastAPI Integration with Trace Correlation
# main.py
from fastapi import FastAPI, Request
from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
import logging
import time
# Setup logging with trace correlation
from logging_config import setup_logging_with_traces
logger = setup_logging_with_traces("INFO")
app = FastAPI()
# Instrument FastAPI for automatic tracing
FastAPIInstrumentor.instrument_app(app)
@app.middleware("http")
async def logging_middleware(request: Request, call_next):
"""Log every request with timing and trace context."""
start_time = time.time()
# Log request received
logger.info(
"Request received",
extra={
"endpoint": request.url.path,
"method": request.method,
}
)
response = await call_next(request)
# Log request completed with duration
duration_ms = (time.time() - start_time) * 1000
logger.info(
"Request completed",
extra={
"endpoint": request.url.path,
"method": request.method,
"status_code": response.status_code,
"duration": round(duration_ms, 2),
}
)
return response
@app.get("/tasks/{task_id}")
async def get_task(task_id: int):
logger.debug(f"Fetching task {task_id} from database")
# Simulate database lookup
task = {"id": task_id, "title": "Sample Task", "completed": False}
logger.info(
"Task retrieved",
extra={"task_id": task_id}
)
return task
Output (JSON logs with trace correlation):
{"timestamp":"2025-01-15T10:40:00.100Z","level":"info","message":"Request received","endpoint":"/tasks/5","method":"GET","trace_id":"4bf92f3577b34da6a3ce929d0e0e4736","span_id":"00f067aa0ba902b7"}
{"timestamp":"2025-01-15T10:40:00.105Z","level":"debug","message":"Fetching task 5 from database","trace_id":"4bf92f3577b34da6a3ce929d0e0e4736","span_id":"00f067aa0ba902b7"}
{"timestamp":"2025-01-15T10:40:00.150Z","level":"info","message":"Task retrieved","task_id":5,"trace_id":"4bf92f3577b34da6a3ce929d0e0e4736","span_id":"00f067aa0ba902b7"}
{"timestamp":"2025-01-15T10:40:00.152Z","level":"info","message":"Request completed","endpoint":"/tasks/5","method":"GET","status_code":200,"duration":52.0,"trace_id":"4bf92f3577b34da6a3ce929d0e0e4736","span_id":"00f067aa0ba902b7"}
Now in Grafana, you can:
- See a slow trace in Jaeger
- Copy the
trace_id - Query Loki:
{app="task-api"} |= "4bf92f3577b34da6a3ce929d0e0e4736" - See every log entry from that request across all services
Log Retention and Storage
Loki stores logs in chunks, compressed with gzip. Configure retention to balance storage costs with debugging needs:
# loki-config.yaml (ConfigMap for custom Loki config)
apiVersion: v1
kind: ConfigMap
metadata:
name: loki-config
namespace: monitoring
data:
loki.yaml: |
auth_enabled: false
server:
http_listen_port: 3100
ingester:
lifecycler:
ring:
kvstore:
store: inmemory
replication_factor: 1
chunk_idle_period: 5m
chunk_retain_period: 30s
schema_config:
configs:
- from: 2020-05-15
store: boltdb-shipper
object_store: filesystem
schema: v11
index:
prefix: index_
period: 24h
storage_config:
boltdb_shipper:
active_index_directory: /data/loki/index
cache_location: /data/loki/cache
shared_store: filesystem
filesystem:
directory: /data/loki/chunks
limits_config:
enforce_metric_name: false
reject_old_samples: true
reject_old_samples_max_age: 168h # 7 days
max_entries_limit_per_query: 5000
compactor:
working_directory: /data/loki/compactor
shared_store: filesystem
retention_enabled: true
retention_delete_delay: 2h
retention_delete_worker_count: 150
table_manager:
retention_deletes_enabled: true
retention_period: 168h # Keep logs for 7 days
Retention Strategy Guidelines
| Environment | Retention | Rationale |
|---|---|---|
| Development | 24-48 hours | Quick iteration, minimal storage |
| Staging | 7 days | Debug release issues |
| Production | 14-30 days | Incident investigation, compliance |
| Audit logs | 90+ days | Regulatory requirements |
Promtail Configuration
Promtail automatically discovers pods and adds Kubernetes labels. Customize for your needs:
# promtail-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: promtail-config
namespace: monitoring
data:
promtail.yaml: |
server:
http_listen_port: 9080
grpc_listen_port: 0
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
- job_name: kubernetes-pods
kubernetes_sd_configs:
- role: pod
relabel_configs:
# Keep only pods with logging enabled
- source_labels: [__meta_kubernetes_pod_annotation_logging_enabled]
action: keep
regex: "true"
# Use pod labels as log labels
- action: labelmap
regex: __meta_kubernetes_pod_label_(.+)
# Add namespace label
- source_labels: [__meta_kubernetes_namespace]
target_label: namespace
# Add pod name label
- source_labels: [__meta_kubernetes_pod_name]
target_label: pod
# Add container name label
- source_labels: [__meta_kubernetes_pod_container_name]
target_label: container
# Construct log file path
- replacement: /var/log/pods/*$1/*.log
separator: /
source_labels:
- __meta_kubernetes_pod_uid
- __meta_kubernetes_pod_container_name
target_label: __path__
pipeline_stages:
# Parse JSON logs
- json:
expressions:
level: level
trace_id: trace_id
# Add level as label for filtering
- labels:
level:
trace_id:
Enable logging for your pods with an annotation:
# task-api-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: task-api
spec:
template:
metadata:
annotations:
logging/enabled: "true" # Promtail will collect these logs
spec:
containers:
- name: task-api
image: task-api:latest
env:
- name: LOG_LEVEL
value: "INFO"
- name: LOG_FORMAT
value: "json"
Reflect on Your Skill
Your observability-cost-engineer skill now needs logging capabilities. Consider:
-
Test your skill: Ask it to write a LogQL query for a specific debugging scenario. Does it generate correct syntax? Does it know when to use parsers vs line filters?
-
Identify gaps:
- Can your skill configure Promtail for custom log sources?
- Does it know retention best practices for different environments?
- Can it implement trace-correlated structured logging?
-
Improve your skill: Add these patterns:
- LogQL query templates for common scenarios (errors, slow requests, trace correlation)
- Structured logging implementation with trace context injection
- Retention configuration by environment type
## Skill Enhancement: Logging Patterns
### LogQL Query Templates
- Error investigation: `{app="$APP"} |= "error" | json | line_format "{{.timestamp}} {{.message}} trace={{.trace_id}}"`
- Slow requests: `{app="$APP"} | json | duration > $THRESHOLD_MS`
- Trace correlation: `{app="$APP"} |= "trace_id=\"$TRACE_ID\""`
- Error rate: `sum(rate({app="$APP"} |= "error" [$WINDOW])) by (pod)`
### Structured Logging Requirements
- JSON format with: timestamp, level, message, trace_id, span_id
- Use logging.Filter for trace context injection
- Include request context: endpoint, method, duration, status_code
### Retention Guidelines
- Dev: 24-48h | Staging: 7d | Prod: 14-30d | Audit: 90d+
Try With AI
Use your AI companion to deepen your logging expertise.
Prompt 1: Debug a Production Issue
I have a Loki installation with logs from my Task API. Users are reporting
intermittent 500 errors on the /tasks/create endpoint.
Help me build a LogQL query that:
1. Finds all error logs from task-api in the last 24 hours
2. Filters to only /tasks/create endpoint
3. Shows the error message and trace_id so I can investigate further
My logs are JSON structured with fields: timestamp, level, message, endpoint, trace_id, error
What you're learning: Translating a debugging scenario into LogQL syntax. You're practicing the stream selector -> filter -> parser pattern with real constraints.
Prompt 2: Design a Structured Logging Schema
I'm building a FastAPI application that will be deployed to Kubernetes and monitored
with Loki. Help me design a structured logging schema.
Requirements:
- JSON format for Loki parsing
- Automatic trace_id/span_id from OpenTelemetry
- Request context (endpoint, method, duration, status)
- Business context (user_id, organization_id)
- Error details with stack traces
Ask me clarifying questions about my application's domain before proposing the schema.
What you're learning: How to teach AI your constraints so it produces a logging schema that matches your specific needs rather than a generic template.
Prompt 3: Optimize Log Volume
My Loki instance is using more storage than expected. Current configuration:
- 200 pods across 10 namespaces
- JSON logs averaging 500 bytes per line
- DEBUG level logging in production
- 30-day retention
Help me analyze and optimize:
1. Calculate my approximate daily log volume
2. Identify which changes would have the biggest impact
3. Propose a Promtail pipeline that drops debug logs in production
Walk me through your reasoning so I can learn the optimization patterns.
What you're learning: Cost-aware observability design. AI helps you understand the math behind log volume and teaches you optimization techniques you can apply to future systems.
Safety Note
When querying logs, be mindful of sensitive data. Logs may contain PII, tokens, or secrets that were accidentally logged. Use LogQL filters to avoid displaying sensitive content, and configure Promtail pipelines to redact sensitive patterns before ingestion.