Message Schemas: Avro and Schema Registry
Your Task API is publishing events to Kafka. Today, your task.created event looks like this:
{"id": "task-123", "title": "Buy groceries", "created_at": "2025-01-15T10:00:00Z"}
Next month, the product team wants to add task priority. You add the field:
{"id": "task-123", "title": "Buy groceries", "created_at": "2025-01-15T10:00:00Z", "priority": 1}
But the notification service consuming these events wasn't updated. When it receives a message with the new priority field, it crashes. Or worse, it silently ignores the field and loses data. You've just experienced schema drift---the silent killer of event-driven systems.
In production, you'll have dozens of services reading and writing events. Without schema enforcement, any producer can add, remove, or rename fields at will. Consumers break in unpredictable ways. Debugging becomes a forensic investigation: "Which version of the event was this consumer built for?"
This lesson introduces Apache Avro for binary schema-based serialization and Schema Registry for centralized schema management. By the end, you'll design event schemas that evolve safely, enforce contracts between producers and consumers, and prevent the integration failures that plague untyped messaging systems.
Why Schemas Matter: The Contract Problem
In a typical Kafka deployment without schemas, producers and consumers communicate through implicit contracts:
Producer (v1) Consumer (v1)
├── Sends: {"id", "title"} → ├── Expects: {"id", "title"}
└── No enforcement └── No validation
This works until someone changes something:
Producer (v2) Consumer (v1)
├── Sends: {"task_id", "name"} → ├── Still expects: {"id", "title"}
└── Renamed fields └── KeyError: 'id'
The core problem: JSON doesn't enforce structure. Any producer can send anything, and you won't discover the mismatch until runtime---often in production.
What Schemas Provide
| Capability | Without Schema | With Avro + Schema Registry |
|---|---|---|
| Contract enforcement | None | Compile-time validation |
| Documentation | Implicit in code | Explicit in schema definition |
| Evolution rules | Hope and pray | Backward/forward compatibility |
| Message size | JSON verbosity | Binary encoding (50-70% smaller) |
| Type safety | None | Enforced types (int, string, etc.) |
| Versioning | Manual tracking | Automatic with schema IDs |
Apache Avro Fundamentals
Apache Avro is a data serialization system that provides:
- Schema-based serialization: Data is always encoded with a schema
- Binary encoding: Compact messages without field names in payload
- Schema evolution: Add/remove fields with compatibility rules
- Language-agnostic: Works with Python, Java, Go, and more
Avro Schema Syntax
An Avro schema is JSON that defines your data structure:
{
"type": "record",
"name": "TaskCreated",
"namespace": "com.taskapi.events",
"fields": [
{"name": "id", "type": "string"},
{"name": "title", "type": "string"},
{"name": "created_at", "type": "string"}
]
}
Key components:
- type: Always
"record"for structured data - name: The schema name (used in subject naming)
- namespace: Package-like qualifier for uniqueness
- fields: Array of field definitions with names and types
Avro Field Types
| Type | Avro Syntax | Example Value |
|---|---|---|
| String | "string" | "task-123" |
| Integer | "int" | 42 |
| Long | "long" | 1705312800000 |
| Boolean | "boolean" | true |
| Float | "float" | 3.14 |
| Double | "double" | 3.14159265359 |
| Bytes | "bytes" | Binary data |
| Null | "null" | null |
Optional Fields with Union Types
To make a field optional, use a union type with null:
{
"type": "record",
"name": "TaskCreated",
"namespace": "com.taskapi.events",
"fields": [
{"name": "id", "type": "string"},
{"name": "title", "type": "string"},
{"name": "created_at", "type": "string"},
{"name": "priority", "type": ["null", "int"], "default": null}
]
}
The ["null", "int"] union means the field can be either null or an integer. The default: null makes it optional---old messages without priority will deserialize with priority = null.
Complex Schema Example
Here's a complete schema for Task API events:
{
"type": "record",
"name": "TaskCreated",
"namespace": "com.taskapi.events",
"doc": "Event published when a new task is created",
"fields": [
{
"name": "event_id",
"type": "string",
"doc": "Unique identifier for this event instance"
},
{
"name": "event_type",
"type": "string",
"doc": "Event type identifier"
},
{
"name": "occurred_at",
"type": "string",
"doc": "ISO-8601 timestamp when event occurred"
},
{
"name": "task_id",
"type": "string",
"doc": "Unique identifier for the task"
},
{
"name": "title",
"type": "string",
"doc": "Task title"
},
{
"name": "owner_id",
"type": "string",
"doc": "User ID of task owner"
},
{
"name": "priority",
"type": ["null", "int"],
"default": null,
"doc": "Task priority (1=highest, 5=lowest). Optional."
},
{
"name": "due_date",
"type": ["null", "string"],
"default": null,
"doc": "ISO-8601 date when task is due. Optional."
}
]
}
Schema Registry: Centralized Schema Management
Confluent Schema Registry provides:
- Central schema storage: Single source of truth for all schemas
- Schema versioning: Track all versions of each schema
- Compatibility enforcement: Block incompatible schema changes
- Schema ID in messages: Messages include schema ID, not full schema
How Schema Registry Works
When a producer sends a message:
1. Producer → Schema Registry: "Register this schema for topic 'task-created'"
2. Schema Registry → Producer: "Schema ID is 42"
3. Producer → Kafka: [Magic byte][Schema ID: 42][Avro-encoded data]
4. Consumer → Schema Registry: "Give me schema for ID 42"
5. Schema Registry → Consumer: "Here's the schema"
6. Consumer: Deserializes using schema
The message payload starts with 5 bytes of metadata:
[0x00][Schema ID: 4 bytes][Avro binary data]
↑ ↑ ↑
Magic Registry ID Your actual data
byte (e.g., 42)
Installing Dependencies
Add the required packages to your project:
uv add confluent-kafka[avro]
Or with pip:
pip install confluent-kafka[avro]
Deploying Schema Registry
Important: Strimzi doesn't include Schema Registry. You need to deploy it separately. We'll use Apicurio Registry, which is Confluent Schema Registry-compatible and works well on Kubernetes.
Deploy Apicurio Registry
Create schema-registry.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:
name: schema-registry
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: schema-registry
template:
metadata:
labels:
app: schema-registry
spec:
containers:
- name: apicurio
image: apicurio/apicurio-registry:3.0.6
ports:
- containerPort: 8080
env:
- name: APICURIO_STORAGE_KIND
value: kafkasql
- name: APICURIO_KAFKASQL_BOOTSTRAP_SERVERS
value: task-events-kafka-bootstrap:9092
resources:
requests:
memory: 256Mi
cpu: 100m
limits:
memory: 512Mi
cpu: 500m
---
apiVersion: v1
kind: Service
metadata:
name: schema-registry
namespace: kafka
spec:
type: NodePort
ports:
- port: 8081
targetPort: 8080
nodePort: 30081
selector:
app: schema-registry
Apply the configuration:
kubectl apply -f schema-registry.yaml
Output:
deployment.apps/schema-registry created
service/schema-registry created
Wait for the pod to be ready:
kubectl wait --for=condition=ready pod -l app=schema-registry -n kafka --timeout=120s
Connection Reference
| Service | Local URL (Mac/Windows) | K8s Internal URL |
|---|---|---|
| Kafka | localhost:30092 | task-events-kafka-bootstrap:9092 |
| Schema Registry | http://localhost:30081 | http://schema-registry:8081 |
For local development, we use the NodePort URLs. For code running inside Kubernetes, use the internal URLs.
Integrating Schema Registry with Python
Setting Up the Schema Registry Client
import os
from confluent_kafka.schema_registry import SchemaRegistryClient
# Environment-aware configuration
SCHEMA_REGISTRY_URL = os.environ.get('SCHEMA_REGISTRY_URL', 'http://localhost:30081')
# Connect to Schema Registry
sr_client = SchemaRegistryClient({
'url': SCHEMA_REGISTRY_URL
})
# Get schema for a subject
schema = sr_client.get_latest_version('task-created-value')
print(f"Schema ID: {schema.schema_id}")
print(f"Schema: {schema.schema.schema_str}")
Output:
Schema ID: 42
Schema: {"type":"record","name":"TaskCreated","namespace":"com.taskapi.events"...}
Producer with Avro Serialization
import os
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField
# Environment-aware configuration
KAFKA_BOOTSTRAP = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:30092')
SCHEMA_REGISTRY_URL = os.environ.get('SCHEMA_REGISTRY_URL', 'http://localhost:30081')
# Schema Registry client
sr_client = SchemaRegistryClient({'url': SCHEMA_REGISTRY_URL})
# Avro schema
task_schema = """
{
"type": "record",
"name": "TaskCreated",
"namespace": "com.taskapi.events",
"fields": [
{"name": "event_id", "type": "string"},
{"name": "task_id", "type": "string"},
{"name": "title", "type": "string"},
{"name": "occurred_at", "type": "string"},
{"name": "priority", "type": ["null", "int"], "default": null}
]
}
"""
# Create serializer
avro_serializer = AvroSerializer(
schema_registry_client=sr_client,
schema_str=task_schema,
to_dict=lambda obj, ctx: obj # Object is already a dict
)
# Producer configuration
producer = Producer({
'bootstrap.servers': KAFKA_BOOTSTRAP,
'acks': 'all',
'enable.idempotence': True
})
def delivery_report(err, msg):
if err:
print(f'Delivery failed: {err}')
else:
print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
# Create and send event
task_event = {
'event_id': 'evt-001',
'task_id': 'task-123',
'title': 'Buy groceries',
'occurred_at': '2025-01-15T10:00:00Z',
'priority': 1
}
producer.produce(
topic='task-created',
key='task-123',
value=avro_serializer(
task_event,
SerializationContext('task-created', MessageField.VALUE)
),
callback=delivery_report
)
producer.flush()
Output:
Delivered to task-created [0] @ 57
Consumer with Avro Deserialization
import os
from confluent_kafka import Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField
# Environment-aware configuration
KAFKA_BOOTSTRAP = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:30092')
SCHEMA_REGISTRY_URL = os.environ.get('SCHEMA_REGISTRY_URL', 'http://localhost:30081')
# Schema Registry client
sr_client = SchemaRegistryClient({'url': SCHEMA_REGISTRY_URL})
# Create deserializer (schema fetched automatically from registry)
avro_deserializer = AvroDeserializer(
schema_registry_client=sr_client,
from_dict=lambda obj, ctx: obj # Return as dict
)
# Consumer configuration
consumer = Consumer({
'bootstrap.servers': KAFKA_BOOTSTRAP,
'group.id': 'notification-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False
})
consumer.subscribe(['task-created'])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f'Error: {msg.error()}')
continue
# Deserialize Avro message
task_event = avro_deserializer(
msg.value(),
SerializationContext('task-created', MessageField.VALUE)
)
print(f"Received: {task_event}")
print(f" Task ID: {task_event['task_id']}")
print(f" Title: {task_event['title']}")
print(f" Priority: {task_event.get('priority', 'Not set')}")
consumer.commit(message=msg)
finally:
consumer.close()
Output:
Received: {'event_id': 'evt-001', 'task_id': 'task-123', 'title': 'Buy groceries', 'occurred_at': '2025-01-15T10:00:00Z', 'priority': 1}
Task ID: task-123
Title: Buy groceries
Priority: 1
Schema Evolution: Changing Schemas Safely
The real power of Schema Registry is compatibility enforcement. You can evolve schemas over time without breaking consumers.
Compatibility Modes
| Mode | Rule | Use Case |
|---|---|---|
| BACKWARD (default) | New schema can read old data | Upgrading consumers first |
| FORWARD | Old schema can read new data | Upgrading producers first |
| FULL | Both backward and forward | Maximum flexibility |
| NONE | No compatibility check | Development only |
Backward Compatibility: The Default
With BACKWARD compatibility, consumers using the new schema can read data written with the old schema.
Safe changes (backward compatible):
| Change | Requirement | Why It Works |
|---|---|---|
| Add field | Must have default value | Old messages get default |
| Remove field | Field must have been optional | New consumer ignores it |
Unsafe changes (breaks compatibility):
| Change | Problem |
|---|---|
| Add required field (no default) | Old messages can't satisfy requirement |
| Remove required field | New consumer expects it, old messages have it |
| Change field type | Type mismatch on deserialization |
| Rename field | Treated as remove + add |
Example: Adding a Field Safely
Original schema (v1):
{
"type": "record",
"name": "TaskCreated",
"fields": [
{"name": "task_id", "type": "string"},
{"name": "title", "type": "string"}
]
}
New schema (v2) - Adding priority:
{
"type": "record",
"name": "TaskCreated",
"fields": [
{"name": "task_id", "type": "string"},
{"name": "title", "type": "string"},
{"name": "priority", "type": ["null", "int"], "default": null}
]
}
This is backward compatible because:
- New consumers can read old messages (priority defaults to null)
- Old consumers can read new messages (they ignore unknown fields)
Example: Breaking Compatibility
Original schema:
{
"fields": [
{"name": "task_id", "type": "string"},
{"name": "title", "type": "string"}
]
}
Incompatible change - Adding required field:
{
"fields": [
{"name": "task_id", "type": "string"},
{"name": "title", "type": "string"},
{"name": "priority", "type": "int"} // No default = required
]
}
When you try to register this schema:
import os
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSchema
SCHEMA_REGISTRY_URL = os.environ.get('SCHEMA_REGISTRY_URL', 'http://localhost:30081')
sr_client = SchemaRegistryClient({'url': SCHEMA_REGISTRY_URL})
new_schema = AvroSchema("""{
"type": "record",
"name": "TaskCreated",
"fields": [
{"name": "task_id", "type": "string"},
{"name": "title", "type": "string"},
{"name": "priority", "type": "int"}
]
}""")
# This will fail!
sr_client.register_schema('task-created-value', new_schema)
Output (error):
SchemaRegistryError: Schema being registered is incompatible with an earlier schema
Schema Registry blocks the incompatible change, preventing production breakage.
Checking Compatibility Before Registration
Always check compatibility before deploying schema changes:
import os
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSchema
SCHEMA_REGISTRY_URL = os.environ.get('SCHEMA_REGISTRY_URL', 'http://localhost:30081')
sr_client = SchemaRegistryClient({'url': SCHEMA_REGISTRY_URL})
proposed_schema = AvroSchema("""{
"type": "record",
"name": "TaskCreated",
"fields": [
{"name": "task_id", "type": "string"},
{"name": "title", "type": "string"},
{"name": "priority", "type": ["null", "int"], "default": null}
]
}""")
# Check if compatible before registering
is_compatible = sr_client.test_compatibility(
subject_name='task-created-value',
schema=proposed_schema
)
if is_compatible:
schema_id = sr_client.register_schema('task-created-value', proposed_schema)
print(f"Registered with ID: {schema_id}")
else:
print("Schema is NOT compatible - review changes")
Output:
Registered with ID: 43
Designing Schemas Through Collaboration
You've learned the mechanics of Avro schemas. Now let's design a real schema for the Task API.
Your starting point:
"I need to design event schemas for my Task API. I want to publish task lifecycle events."
Identifying requirements:
Consider what information each event needs:
- Event metadata: event_id, event_type, occurred_at
- Correlation: correlation_id for tracing across services
- Entity data: task_id, title, owner_id
- Optional data: priority, due_date, tags
Initial design attempt:
{
"type": "record",
"name": "TaskEvent",
"fields": [
{"name": "task_id", "type": "string"},
{"name": "title", "type": "string"},
{"name": "priority", "type": "int"}
]
}
Evaluating the design:
This schema has problems:
- No event metadata (how do you trace events across services?)
- Priority is required (old producers can't send without it)
- No versioning strategy (what happens when you add fields?)
Refining based on production requirements:
A better design separates event metadata from entity data:
{
"type": "record",
"name": "TaskCreated",
"namespace": "com.taskapi.events",
"fields": [
{"name": "event_id", "type": "string", "doc": "Unique event identifier"},
{"name": "event_type", "type": "string", "doc": "Always 'task.created'"},
{"name": "occurred_at", "type": "string", "doc": "ISO-8601 timestamp"},
{
"name": "correlation_id",
"type": ["null", "string"],
"default": null,
"doc": "Request correlation ID for tracing"
},
{"name": "task_id", "type": "string"},
{"name": "title", "type": "string"},
{"name": "owner_id", "type": "string"},
{
"name": "priority",
"type": ["null", "int"],
"default": null,
"doc": "1=highest, 5=lowest"
},
{
"name": "due_date",
"type": ["null", "string"],
"default": null,
"doc": "ISO-8601 date"
}
]
}
What emerged from refinement:
- Event metadata enables distributed tracing
- Optional fields with defaults enable safe evolution
- Documentation in schema serves as contract
- Namespace prevents naming collisions
Subject Naming Strategies
Schema Registry organizes schemas by subject. The default naming strategy is:
<topic>-<key|value>
For topic task-created:
- Key schema subject:
task-created-key - Value schema subject:
task-created-value
Alternative Strategies
| Strategy | Subject Name | Use Case |
|---|---|---|
TopicNameStrategy (default) | task-created-value | One schema per topic |
RecordNameStrategy | com.taskapi.events.TaskCreated | Share schema across topics |
TopicRecordNameStrategy | task-created-com.taskapi.events.TaskCreated | Schema per topic+type |
Configure in producer:
from confluent_kafka.schema_registry.avro import AvroSerializer
serializer = AvroSerializer(
schema_registry_client=sr_client,
schema_str=task_schema,
conf={'subject.name.strategy': 'record_name_strategy'}
)
Common Patterns and Anti-Patterns
Pattern: Envelope with Metadata
Wrap all events in a standard envelope:
{
"type": "record",
"name": "TaskCreated",
"fields": [
{"name": "event_id", "type": "string"},
{"name": "event_type", "type": "string"},
{"name": "occurred_at", "type": "string"},
{"name": "correlation_id", "type": ["null", "string"], "default": null},
{"name": "causation_id", "type": ["null", "string"], "default": null},
{"name": "data", "type": {...}}
]
}
Anti-Pattern: Overusing Union Types
Don't use unions to represent "any type":
// BAD: Too flexible, defeats schema purpose
{"name": "metadata", "type": ["null", "string", "int", "boolean", "map"]}
If you need this flexibility, you've lost the schema's contract value.
Anti-Pattern: Deeply Nested Optional Objects
// BAD: Hard to evolve, null checks everywhere
{
"name": "task",
"type": ["null", {
"type": "record",
"name": "Task",
"fields": [
{"name": "owner", "type": ["null", {
"type": "record",
"name": "Owner",
"fields": [...]
}]}
]
}]
}
Flatten when possible, or use separate events for different entity states.
Reflect on Your Skill
You built a kafka-events skill in Lesson 0. Test and improve it based on what you learned.
Test Your Skill
Using my kafka-events skill, design an Avro schema for task lifecycle events with backward compatibility.
Does my skill show proper schema evolution patterns (optional fields with defaults, logical types)?
Identify Gaps
Ask yourself:
- Did my skill explain schema compatibility types (backward, forward, full)?
- Did it show how to use Schema Registry and handle schema evolution?
Improve Your Skill
If you found gaps:
My kafka-events skill is missing schema design patterns (Avro schemas, Schema Registry, compatibility rules).
Update it to include when to use Avro vs JSON and how to evolve schemas without breaking consumers.
Try With AI
Apply schema design and evolution to your Task API events.
Setup: Open Claude Code or your preferred AI assistant in your project directory.
Prompt 1: Design an Event Schema
I need to design an Avro schema for TaskCompleted events in my Task API.
The event should include:
- Standard event metadata (event_id, event_type, occurred_at)
- Task identification (task_id, title)
- Completion details (completed_by user_id, completed_at timestamp)
- Optional: completion_notes, duration_minutes
Design the schema with proper types, documentation, and consider future evolution.
What fields should be required vs optional with defaults?
What you're learning: Schema design decisions---which fields are essential to the event's meaning (required) versus context that might not always be available (optional with defaults).
Prompt 2: Plan a Schema Evolution
My current TaskCreated schema has these fields:
- event_id (string, required)
- task_id (string, required)
- title (string, required)
- created_at (string, required)
- priority (int, optional with default null)
I need to add:
1. owner_id (required for all new tasks)
2. tags (optional array of strings)
3. estimated_minutes (optional integer)
Which of these changes are backward compatible?
How should I implement each change?
Show me the evolved schema.
What you're learning: Compatibility analysis---understanding which changes are safe and how to work around the restrictions when you need to add required fields to existing schemas.
Prompt 3: Debug a Compatibility Error
I'm trying to register this schema change and getting a compatibility error:
Current schema:
{
"type": "record",
"name": "TaskCreated",
"fields": [
{"name": "task_id", "type": "string"},
{"name": "title", "type": "string"}
]
}
New schema:
{
"type": "record",
"name": "TaskCreated",
"fields": [
{"name": "task_id", "type": "string"},
{"name": "name", "type": "string"},
{"name": "priority", "type": "int"}
]
}
Error: Schema being registered is incompatible with an earlier schema
What exactly makes this incompatible? How do I fix it while achieving my goal
of renaming 'title' to 'name' and adding 'priority'?
What you're learning: Compatibility debugging---understanding that renaming a field is effectively a delete+add operation, and how to handle migrations that require breaking changes (versioning strategies, new topics, dual-writes).
Safety Note: Schema changes affect all producers and consumers. Always test compatibility in a staging environment before production, and coordinate deployment order based on your compatibility mode (BACKWARD = upgrade consumers first).