Skip to main content
Updated Feb 23, 2026

Message Schemas: Avro and Schema Registry

Your Task API is publishing events to Kafka. Today, your task.created event looks like this:

{"id": "task-123", "title": "Buy groceries", "created_at": "2025-01-15T10:00:00Z"}

Next month, the product team wants to add task priority. You add the field:

{"id": "task-123", "title": "Buy groceries", "created_at": "2025-01-15T10:00:00Z", "priority": 1}

But the notification service consuming these events wasn't updated. When it receives a message with the new priority field, it crashes. Or worse, it silently ignores the field and loses data. You've just experienced schema drift---the silent killer of event-driven systems.

In production, you'll have dozens of services reading and writing events. Without schema enforcement, any producer can add, remove, or rename fields at will. Consumers break in unpredictable ways. Debugging becomes a forensic investigation: "Which version of the event was this consumer built for?"

This lesson introduces Apache Avro for binary schema-based serialization and Schema Registry for centralized schema management. By the end, you'll design event schemas that evolve safely, enforce contracts between producers and consumers, and prevent the integration failures that plague untyped messaging systems.

Why Schemas Matter: The Contract Problem

In a typical Kafka deployment without schemas, producers and consumers communicate through implicit contracts:

Producer (v1)                          Consumer (v1)
├── Sends: {"id", "title"} → ├── Expects: {"id", "title"}
└── No enforcement └── No validation

This works until someone changes something:

Producer (v2)                          Consumer (v1)
├── Sends: {"task_id", "name"} → ├── Still expects: {"id", "title"}
└── Renamed fields └── KeyError: 'id'

The core problem: JSON doesn't enforce structure. Any producer can send anything, and you won't discover the mismatch until runtime---often in production.

What Schemas Provide

CapabilityWithout SchemaWith Avro + Schema Registry
Contract enforcementNoneCompile-time validation
DocumentationImplicit in codeExplicit in schema definition
Evolution rulesHope and prayBackward/forward compatibility
Message sizeJSON verbosityBinary encoding (50-70% smaller)
Type safetyNoneEnforced types (int, string, etc.)
VersioningManual trackingAutomatic with schema IDs

Apache Avro Fundamentals

Apache Avro is a data serialization system that provides:

  • Schema-based serialization: Data is always encoded with a schema
  • Binary encoding: Compact messages without field names in payload
  • Schema evolution: Add/remove fields with compatibility rules
  • Language-agnostic: Works with Python, Java, Go, and more

Avro Schema Syntax

An Avro schema is JSON that defines your data structure:

{
"type": "record",
"name": "TaskCreated",
"namespace": "com.taskapi.events",
"fields": [
{"name": "id", "type": "string"},
{"name": "title", "type": "string"},
{"name": "created_at", "type": "string"}
]
}

Key components:

  • type: Always "record" for structured data
  • name: The schema name (used in subject naming)
  • namespace: Package-like qualifier for uniqueness
  • fields: Array of field definitions with names and types

Avro Field Types

TypeAvro SyntaxExample Value
String"string""task-123"
Integer"int"42
Long"long"1705312800000
Boolean"boolean"true
Float"float"3.14
Double"double"3.14159265359
Bytes"bytes"Binary data
Null"null"null

Optional Fields with Union Types

To make a field optional, use a union type with null:

{
"type": "record",
"name": "TaskCreated",
"namespace": "com.taskapi.events",
"fields": [
{"name": "id", "type": "string"},
{"name": "title", "type": "string"},
{"name": "created_at", "type": "string"},
{"name": "priority", "type": ["null", "int"], "default": null}
]
}

The ["null", "int"] union means the field can be either null or an integer. The default: null makes it optional---old messages without priority will deserialize with priority = null.

Complex Schema Example

Here's a complete schema for Task API events:

{
"type": "record",
"name": "TaskCreated",
"namespace": "com.taskapi.events",
"doc": "Event published when a new task is created",
"fields": [
{
"name": "event_id",
"type": "string",
"doc": "Unique identifier for this event instance"
},
{
"name": "event_type",
"type": "string",
"doc": "Event type identifier"
},
{
"name": "occurred_at",
"type": "string",
"doc": "ISO-8601 timestamp when event occurred"
},
{
"name": "task_id",
"type": "string",
"doc": "Unique identifier for the task"
},
{
"name": "title",
"type": "string",
"doc": "Task title"
},
{
"name": "owner_id",
"type": "string",
"doc": "User ID of task owner"
},
{
"name": "priority",
"type": ["null", "int"],
"default": null,
"doc": "Task priority (1=highest, 5=lowest). Optional."
},
{
"name": "due_date",
"type": ["null", "string"],
"default": null,
"doc": "ISO-8601 date when task is due. Optional."
}
]
}

Schema Registry: Centralized Schema Management

Confluent Schema Registry provides:

  • Central schema storage: Single source of truth for all schemas
  • Schema versioning: Track all versions of each schema
  • Compatibility enforcement: Block incompatible schema changes
  • Schema ID in messages: Messages include schema ID, not full schema

How Schema Registry Works

When a producer sends a message:

1. Producer → Schema Registry: "Register this schema for topic 'task-created'"
2. Schema Registry → Producer: "Schema ID is 42"
3. Producer → Kafka: [Magic byte][Schema ID: 42][Avro-encoded data]
4. Consumer → Schema Registry: "Give me schema for ID 42"
5. Schema Registry → Consumer: "Here's the schema"
6. Consumer: Deserializes using schema

The message payload starts with 5 bytes of metadata:

[0x00][Schema ID: 4 bytes][Avro binary data]
↑ ↑ ↑
Magic Registry ID Your actual data
byte (e.g., 42)

Installing Dependencies

Add the required packages to your project:

uv add confluent-kafka[avro]

Or with pip:

pip install confluent-kafka[avro]

Deploying Schema Registry

Important: Strimzi doesn't include Schema Registry. You need to deploy it separately. We'll use Apicurio Registry, which is Confluent Schema Registry-compatible and works well on Kubernetes.

Deploy Apicurio Registry

Create schema-registry.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
name: schema-registry
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: schema-registry
template:
metadata:
labels:
app: schema-registry
spec:
containers:
- name: apicurio
image: apicurio/apicurio-registry:3.0.6
ports:
- containerPort: 8080
env:
- name: APICURIO_STORAGE_KIND
value: kafkasql
- name: APICURIO_KAFKASQL_BOOTSTRAP_SERVERS
value: task-events-kafka-bootstrap:9092
resources:
requests:
memory: 256Mi
cpu: 100m
limits:
memory: 512Mi
cpu: 500m
---
apiVersion: v1
kind: Service
metadata:
name: schema-registry
namespace: kafka
spec:
type: NodePort
ports:
- port: 8081
targetPort: 8080
nodePort: 30081
selector:
app: schema-registry

Apply the configuration:

kubectl apply -f schema-registry.yaml

Output:

deployment.apps/schema-registry created
service/schema-registry created

Wait for the pod to be ready:

kubectl wait --for=condition=ready pod -l app=schema-registry -n kafka --timeout=120s

Connection Reference

ServiceLocal URL (Mac/Windows)K8s Internal URL
Kafkalocalhost:30092task-events-kafka-bootstrap:9092
Schema Registryhttp://localhost:30081http://schema-registry:8081

For local development, we use the NodePort URLs. For code running inside Kubernetes, use the internal URLs.

Integrating Schema Registry with Python

Setting Up the Schema Registry Client

import os
from confluent_kafka.schema_registry import SchemaRegistryClient

# Environment-aware configuration
SCHEMA_REGISTRY_URL = os.environ.get('SCHEMA_REGISTRY_URL', 'http://localhost:30081')

# Connect to Schema Registry
sr_client = SchemaRegistryClient({
'url': SCHEMA_REGISTRY_URL
})

# Get schema for a subject
schema = sr_client.get_latest_version('task-created-value')
print(f"Schema ID: {schema.schema_id}")
print(f"Schema: {schema.schema.schema_str}")

Output:

Schema ID: 42
Schema: {"type":"record","name":"TaskCreated","namespace":"com.taskapi.events"...}

Producer with Avro Serialization

import os
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField

# Environment-aware configuration
KAFKA_BOOTSTRAP = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:30092')
SCHEMA_REGISTRY_URL = os.environ.get('SCHEMA_REGISTRY_URL', 'http://localhost:30081')

# Schema Registry client
sr_client = SchemaRegistryClient({'url': SCHEMA_REGISTRY_URL})

# Avro schema
task_schema = """
{
"type": "record",
"name": "TaskCreated",
"namespace": "com.taskapi.events",
"fields": [
{"name": "event_id", "type": "string"},
{"name": "task_id", "type": "string"},
{"name": "title", "type": "string"},
{"name": "occurred_at", "type": "string"},
{"name": "priority", "type": ["null", "int"], "default": null}
]
}
"""

# Create serializer
avro_serializer = AvroSerializer(
schema_registry_client=sr_client,
schema_str=task_schema,
to_dict=lambda obj, ctx: obj # Object is already a dict
)

# Producer configuration
producer = Producer({
'bootstrap.servers': KAFKA_BOOTSTRAP,
'acks': 'all',
'enable.idempotence': True
})

def delivery_report(err, msg):
if err:
print(f'Delivery failed: {err}')
else:
print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

# Create and send event
task_event = {
'event_id': 'evt-001',
'task_id': 'task-123',
'title': 'Buy groceries',
'occurred_at': '2025-01-15T10:00:00Z',
'priority': 1
}

producer.produce(
topic='task-created',
key='task-123',
value=avro_serializer(
task_event,
SerializationContext('task-created', MessageField.VALUE)
),
callback=delivery_report
)

producer.flush()

Output:

Delivered to task-created [0] @ 57

Consumer with Avro Deserialization

import os
from confluent_kafka import Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField

# Environment-aware configuration
KAFKA_BOOTSTRAP = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:30092')
SCHEMA_REGISTRY_URL = os.environ.get('SCHEMA_REGISTRY_URL', 'http://localhost:30081')

# Schema Registry client
sr_client = SchemaRegistryClient({'url': SCHEMA_REGISTRY_URL})

# Create deserializer (schema fetched automatically from registry)
avro_deserializer = AvroDeserializer(
schema_registry_client=sr_client,
from_dict=lambda obj, ctx: obj # Return as dict
)

# Consumer configuration
consumer = Consumer({
'bootstrap.servers': KAFKA_BOOTSTRAP,
'group.id': 'notification-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False
})

consumer.subscribe(['task-created'])

try:
while True:
msg = consumer.poll(1.0)

if msg is None:
continue

if msg.error():
print(f'Error: {msg.error()}')
continue

# Deserialize Avro message
task_event = avro_deserializer(
msg.value(),
SerializationContext('task-created', MessageField.VALUE)
)

print(f"Received: {task_event}")
print(f" Task ID: {task_event['task_id']}")
print(f" Title: {task_event['title']}")
print(f" Priority: {task_event.get('priority', 'Not set')}")

consumer.commit(message=msg)

finally:
consumer.close()

Output:

Received: {'event_id': 'evt-001', 'task_id': 'task-123', 'title': 'Buy groceries', 'occurred_at': '2025-01-15T10:00:00Z', 'priority': 1}
Task ID: task-123
Title: Buy groceries
Priority: 1

Schema Evolution: Changing Schemas Safely

The real power of Schema Registry is compatibility enforcement. You can evolve schemas over time without breaking consumers.

Compatibility Modes

ModeRuleUse Case
BACKWARD (default)New schema can read old dataUpgrading consumers first
FORWARDOld schema can read new dataUpgrading producers first
FULLBoth backward and forwardMaximum flexibility
NONENo compatibility checkDevelopment only

Backward Compatibility: The Default

With BACKWARD compatibility, consumers using the new schema can read data written with the old schema.

Safe changes (backward compatible):

ChangeRequirementWhy It Works
Add fieldMust have default valueOld messages get default
Remove fieldField must have been optionalNew consumer ignores it

Unsafe changes (breaks compatibility):

ChangeProblem
Add required field (no default)Old messages can't satisfy requirement
Remove required fieldNew consumer expects it, old messages have it
Change field typeType mismatch on deserialization
Rename fieldTreated as remove + add

Example: Adding a Field Safely

Original schema (v1):

{
"type": "record",
"name": "TaskCreated",
"fields": [
{"name": "task_id", "type": "string"},
{"name": "title", "type": "string"}
]
}

New schema (v2) - Adding priority:

{
"type": "record",
"name": "TaskCreated",
"fields": [
{"name": "task_id", "type": "string"},
{"name": "title", "type": "string"},
{"name": "priority", "type": ["null", "int"], "default": null}
]
}

This is backward compatible because:

  1. New consumers can read old messages (priority defaults to null)
  2. Old consumers can read new messages (they ignore unknown fields)

Example: Breaking Compatibility

Original schema:

{
"fields": [
{"name": "task_id", "type": "string"},
{"name": "title", "type": "string"}
]
}

Incompatible change - Adding required field:

{
"fields": [
{"name": "task_id", "type": "string"},
{"name": "title", "type": "string"},
{"name": "priority", "type": "int"} // No default = required
]
}

When you try to register this schema:

import os
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSchema

SCHEMA_REGISTRY_URL = os.environ.get('SCHEMA_REGISTRY_URL', 'http://localhost:30081')
sr_client = SchemaRegistryClient({'url': SCHEMA_REGISTRY_URL})

new_schema = AvroSchema("""{
"type": "record",
"name": "TaskCreated",
"fields": [
{"name": "task_id", "type": "string"},
{"name": "title", "type": "string"},
{"name": "priority", "type": "int"}
]
}""")

# This will fail!
sr_client.register_schema('task-created-value', new_schema)

Output (error):

SchemaRegistryError: Schema being registered is incompatible with an earlier schema

Schema Registry blocks the incompatible change, preventing production breakage.

Checking Compatibility Before Registration

Always check compatibility before deploying schema changes:

import os
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSchema

SCHEMA_REGISTRY_URL = os.environ.get('SCHEMA_REGISTRY_URL', 'http://localhost:30081')
sr_client = SchemaRegistryClient({'url': SCHEMA_REGISTRY_URL})

proposed_schema = AvroSchema("""{
"type": "record",
"name": "TaskCreated",
"fields": [
{"name": "task_id", "type": "string"},
{"name": "title", "type": "string"},
{"name": "priority", "type": ["null", "int"], "default": null}
]
}""")

# Check if compatible before registering
is_compatible = sr_client.test_compatibility(
subject_name='task-created-value',
schema=proposed_schema
)

if is_compatible:
schema_id = sr_client.register_schema('task-created-value', proposed_schema)
print(f"Registered with ID: {schema_id}")
else:
print("Schema is NOT compatible - review changes")

Output:

Registered with ID: 43

Designing Schemas Through Collaboration

You've learned the mechanics of Avro schemas. Now let's design a real schema for the Task API.

Your starting point:

"I need to design event schemas for my Task API. I want to publish task lifecycle events."

Identifying requirements:

Consider what information each event needs:

  • Event metadata: event_id, event_type, occurred_at
  • Correlation: correlation_id for tracing across services
  • Entity data: task_id, title, owner_id
  • Optional data: priority, due_date, tags

Initial design attempt:

{
"type": "record",
"name": "TaskEvent",
"fields": [
{"name": "task_id", "type": "string"},
{"name": "title", "type": "string"},
{"name": "priority", "type": "int"}
]
}

Evaluating the design:

This schema has problems:

  1. No event metadata (how do you trace events across services?)
  2. Priority is required (old producers can't send without it)
  3. No versioning strategy (what happens when you add fields?)

Refining based on production requirements:

A better design separates event metadata from entity data:

{
"type": "record",
"name": "TaskCreated",
"namespace": "com.taskapi.events",
"fields": [
{"name": "event_id", "type": "string", "doc": "Unique event identifier"},
{"name": "event_type", "type": "string", "doc": "Always 'task.created'"},
{"name": "occurred_at", "type": "string", "doc": "ISO-8601 timestamp"},
{
"name": "correlation_id",
"type": ["null", "string"],
"default": null,
"doc": "Request correlation ID for tracing"
},
{"name": "task_id", "type": "string"},
{"name": "title", "type": "string"},
{"name": "owner_id", "type": "string"},
{
"name": "priority",
"type": ["null", "int"],
"default": null,
"doc": "1=highest, 5=lowest"
},
{
"name": "due_date",
"type": ["null", "string"],
"default": null,
"doc": "ISO-8601 date"
}
]
}

What emerged from refinement:

  • Event metadata enables distributed tracing
  • Optional fields with defaults enable safe evolution
  • Documentation in schema serves as contract
  • Namespace prevents naming collisions

Subject Naming Strategies

Schema Registry organizes schemas by subject. The default naming strategy is:

<topic>-<key|value>

For topic task-created:

  • Key schema subject: task-created-key
  • Value schema subject: task-created-value

Alternative Strategies

StrategySubject NameUse Case
TopicNameStrategy (default)task-created-valueOne schema per topic
RecordNameStrategycom.taskapi.events.TaskCreatedShare schema across topics
TopicRecordNameStrategytask-created-com.taskapi.events.TaskCreatedSchema per topic+type

Configure in producer:

from confluent_kafka.schema_registry.avro import AvroSerializer

serializer = AvroSerializer(
schema_registry_client=sr_client,
schema_str=task_schema,
conf={'subject.name.strategy': 'record_name_strategy'}
)

Common Patterns and Anti-Patterns

Pattern: Envelope with Metadata

Wrap all events in a standard envelope:

{
"type": "record",
"name": "TaskCreated",
"fields": [
{"name": "event_id", "type": "string"},
{"name": "event_type", "type": "string"},
{"name": "occurred_at", "type": "string"},
{"name": "correlation_id", "type": ["null", "string"], "default": null},
{"name": "causation_id", "type": ["null", "string"], "default": null},
{"name": "data", "type": {...}}
]
}

Anti-Pattern: Overusing Union Types

Don't use unions to represent "any type":

// BAD: Too flexible, defeats schema purpose
{"name": "metadata", "type": ["null", "string", "int", "boolean", "map"]}

If you need this flexibility, you've lost the schema's contract value.

Anti-Pattern: Deeply Nested Optional Objects

// BAD: Hard to evolve, null checks everywhere
{
"name": "task",
"type": ["null", {
"type": "record",
"name": "Task",
"fields": [
{"name": "owner", "type": ["null", {
"type": "record",
"name": "Owner",
"fields": [...]
}]}
]
}]
}

Flatten when possible, or use separate events for different entity states.


Reflect on Your Skill

You built a kafka-events skill in Lesson 0. Test and improve it based on what you learned.

Test Your Skill

Using my kafka-events skill, design an Avro schema for task lifecycle events with backward compatibility.
Does my skill show proper schema evolution patterns (optional fields with defaults, logical types)?

Identify Gaps

Ask yourself:

  • Did my skill explain schema compatibility types (backward, forward, full)?
  • Did it show how to use Schema Registry and handle schema evolution?

Improve Your Skill

If you found gaps:

My kafka-events skill is missing schema design patterns (Avro schemas, Schema Registry, compatibility rules).
Update it to include when to use Avro vs JSON and how to evolve schemas without breaking consumers.

Try With AI

Apply schema design and evolution to your Task API events.

Setup: Open Claude Code or your preferred AI assistant in your project directory.


Prompt 1: Design an Event Schema

I need to design an Avro schema for TaskCompleted events in my Task API.
The event should include:
- Standard event metadata (event_id, event_type, occurred_at)
- Task identification (task_id, title)
- Completion details (completed_by user_id, completed_at timestamp)
- Optional: completion_notes, duration_minutes

Design the schema with proper types, documentation, and consider future evolution.
What fields should be required vs optional with defaults?

What you're learning: Schema design decisions---which fields are essential to the event's meaning (required) versus context that might not always be available (optional with defaults).


Prompt 2: Plan a Schema Evolution

My current TaskCreated schema has these fields:
- event_id (string, required)
- task_id (string, required)
- title (string, required)
- created_at (string, required)
- priority (int, optional with default null)

I need to add:
1. owner_id (required for all new tasks)
2. tags (optional array of strings)
3. estimated_minutes (optional integer)

Which of these changes are backward compatible?
How should I implement each change?
Show me the evolved schema.

What you're learning: Compatibility analysis---understanding which changes are safe and how to work around the restrictions when you need to add required fields to existing schemas.


Prompt 3: Debug a Compatibility Error

I'm trying to register this schema change and getting a compatibility error:

Current schema:
{
"type": "record",
"name": "TaskCreated",
"fields": [
{"name": "task_id", "type": "string"},
{"name": "title", "type": "string"}
]
}

New schema:
{
"type": "record",
"name": "TaskCreated",
"fields": [
{"name": "task_id", "type": "string"},
{"name": "name", "type": "string"},
{"name": "priority", "type": "int"}
]
}

Error: Schema being registered is incompatible with an earlier schema

What exactly makes this incompatible? How do I fix it while achieving my goal
of renaming 'title' to 'name' and adding 'priority'?

What you're learning: Compatibility debugging---understanding that renaming a field is effectively a delete+add operation, and how to handle migrations that require breaking changes (versioning strategies, new topics, dual-writes).


Safety Note: Schema changes affect all producers and consumers. Always test compatibility in a staging environment before production, and coordinate deployment order based on your compatibility mode (BACKWARD = upgrade consumers first).