Your First Producer (Python)
You have a Kafka cluster running on Docker Desktop Kubernetes. Now it's time to send your first message.
In the request-response world, you call an API and wait for a response. In event-driven systems, you publish an event and move on. But "move on" doesn't mean "forget about it." You need to know whether your message actually reached Kafka. Did it land on a partition? Which offset was assigned? Did something go wrong?
The confluent-kafka-python library handles this through a pattern that might feel unusual at first: asynchronous production with delivery callbacks. You call produce(), which returns immediately. Later, you call poll() to process callbacks that tell you what happened. This pattern maximizes throughput while still giving you visibility into delivery success or failure.
By the end of this lesson, you'll have working producer code that sends messages to your Kafka cluster and confirms each delivery.
Installing the Client Library
The confluent-kafka-python library is the official Confluent client for Python. It wraps the high-performance librdkafka C library, giving you the best performance available for Python Kafka clients.
Why not aiokafka? You might see aiokafka in tutorials—it has cleaner async/await syntax. We use confluent-kafka because:
| confluent-kafka | aiokafka |
|---|---|
| Official (Confluent maintains it) | Community-maintained |
| ~10x faster (C library underneath) | Pure Python |
| Native Schema Registry support | Requires extra libraries |
| Production standard | Less common in production |
The callback pattern takes getting used to, but it's what you'll see in real Kafka jobs, and you'll need Schema Registry support in Lesson 10.
Install it with uv:
uv add confluent-kafka
Output:
Resolved 1 package in 0.5s
Installed 1 package in 0.3s
+ confluent-kafka==2.6.1
If you're using pip:
pip install confluent-kafka
The library requires librdkafka to be available on your system. On macOS, the pip/uv installation handles this automatically. On Linux, you may need to install it separately (apt-get install librdkafka-dev on Debian/Ubuntu).
Connect to Your Kafka Cluster
Your Kafka cluster from Lesson 4 includes a NodePort listener on port 30092. This exposes Kafka directly on your localhost—no extra setup needed.
Verify the NodePort is working:
kubectl get svc -n kafka | grep external
Output:
task-events-kafka-external-bootstrap NodePort 10.96.x.x <none> 9094:30092/TCP
Connection Reference:
| Where Your Code Runs | Bootstrap Server |
|---|---|
| Local machine (Mac/Windows) | localhost:30092 |
| Pod in same namespace | task-events-kafka-bootstrap:9092 |
| Pod in different namespace | task-events-kafka-bootstrap.kafka.svc.cluster.local:9092 |
For this lesson, you run code locally, so use localhost:30092.
The Minimal Producer
Let's start with the simplest producer that actually works:
from confluent_kafka import Producer
# Create producer with minimal configuration
producer = Producer({
'bootstrap.servers': 'localhost:30092',
'client.id': 'my-first-producer'
})
# Send a message
producer.produce(
topic='task-created',
value='Hello, Kafka!'
)
# Wait for all messages to be delivered
producer.flush()
print("Message sent!")
Output:
Message sent!
This works, but it's blind. You have no idea whether the message actually reached Kafka or where it landed. Let's add visibility.
Understanding the Asynchronous Model
The produce() method is non-blocking. When you call it, the message goes into an internal buffer, and the method returns immediately. The actual network transmission happens in a background thread.
This creates a problem: how do you know if delivery succeeded?
The answer is delivery callbacks. You provide a function that Kafka calls after each message is delivered (or fails). But there's a catch: callbacks don't execute automatically. You must call poll() to trigger them.
Here's the mental model:
produce() → Message enters buffer → Background thread sends to Kafka
↓
Kafka acknowledges
↓
poll() → Triggers your callback with result
Adding Delivery Callbacks
A delivery callback receives two arguments:
err: An error object if delivery failed, orNoneif successfulmsg: A message object with metadata about the delivered message
from confluent_kafka import Producer
def delivery_report(err, msg):
"""Called once for each message produced."""
if err is not None:
print(f'FAILED: {err}')
else:
print(f'SUCCESS: topic={msg.topic()} partition={msg.partition()} offset={msg.offset()}')
producer = Producer({
'bootstrap.servers': 'localhost:30092',
'client.id': 'task-api-producer'
})
# Send with callback
producer.produce(
topic='task-created',
value='{"id": "task-001", "title": "Buy groceries"}',
callback=delivery_report
)
# Trigger callback processing
producer.poll(0)
# Ensure delivery before exit
producer.flush()
Output:
SUCCESS: topic=task-created partition=0 offset=0
Now you can see exactly where your message landed: topic task-created, partition 0, offset 0.
Why Message Keys Matter
So far, we've sent messages without keys. Kafka accepts this, but you lose an important guarantee.
When you provide a key, Kafka uses it to determine the partition:
# Messages with same key always go to same partition
producer.produce(
topic='task-created',
key='user-123', # User ID as key
value='{"id": "task-001", "title": "Buy groceries", "user": "user-123"}',
callback=delivery_report
)
Why this matters:
- Ordering: Messages with the same key are always ordered (within a partition)
- Locality: All events for an entity stay together, simplifying consumer logic
- Scaling: Different keys can be processed in parallel across partitions
For task events, using task_id or user_id as the key ensures all events for that entity arrive in order.
# All events for task-001 go to same partition, preserving order
producer.produce(topic='task-events', key='task-001', value='{"type": "created", ...}')
producer.produce(topic='task-events', key='task-001', value='{"type": "updated", ...}')
producer.produce(topic='task-events', key='task-001', value='{"type": "completed", ...}')
Complete Producer Example
Here's a production-ready producer that sends multiple messages with proper error handling:
from confluent_kafka import Producer
import json
from datetime import datetime, timezone
def delivery_report(err, msg):
"""Callback triggered by poll() or flush() after message delivery."""
if err is not None:
print(f'Delivery failed for {msg.key()}: {err}')
else:
print(f'Delivered: {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
def create_producer():
"""Create a configured Kafka producer."""
return Producer({
'bootstrap.servers': 'localhost:30092',
'client.id': 'task-api-producer',
})
def send_task_event(producer, task_id: str, title: str):
"""Send a task created event."""
event = {
'id': task_id,
'title': title,
'created_at': datetime.now(timezone.utc).isoformat()
}
producer.produce(
topic='task-created',
key=task_id,
value=json.dumps(event),
callback=delivery_report
)
# Process any pending callbacks (non-blocking)
producer.poll(0)
def main():
producer = create_producer()
# Send some task events
tasks = [
('task-001', 'Buy groceries'),
('task-002', 'Review pull request'),
('task-003', 'Deploy to production'),
]
for task_id, title in tasks:
send_task_event(producer, task_id, title)
print(f'Queued: {task_id}')
# Wait for all messages to be delivered
remaining = producer.flush(timeout=10)
if remaining > 0:
print(f'WARNING: {remaining} messages were not delivered')
else:
print('All messages delivered successfully')
if __name__ == '__main__':
main()
Output:
Queued: task-001
Queued: task-002
Queued: task-003
Delivered: task-created [0] @ 1
Delivered: task-created [1] @ 0
Delivered: task-created [2] @ 0
All messages delivered successfully
Notice how messages landed on different partitions (0, 1, 2). Kafka distributed them based on the key hash.
Understanding poll() and flush()
These two methods are often confused. Here's the difference:
| Method | Behavior | When to Use |
|---|---|---|
poll(timeout) | Process callbacks for delivered messages; returns number of events processed | Call regularly in loops to handle callbacks without blocking |
flush(timeout) | Block until all buffered messages are delivered (or timeout); processes callbacks | Call before shutdown to ensure no messages are lost |
The pattern:
# In a loop: poll(0) for non-blocking callback processing
for message in messages:
producer.produce(topic, value=message, callback=callback)
producer.poll(0) # Non-blocking, process any ready callbacks
# Before shutdown: flush() to ensure all messages delivered
producer.flush(timeout=10) # Block up to 10 seconds
What happens if you skip poll()?
Callbacks accumulate in memory. If you never call poll() or flush(), your callback functions never execute, and you never learn about delivery failures until the program exits.
Verifying Messages with Kafka CLI
Your producer is sending messages, but let's verify they actually arrived. Use the Kafka console consumer:
# First, port-forward to access Kafka from your machine
kubectl port-forward svc/task-events-kafka-bootstrap 9092:9092 -n kafka &
# Then consume from the beginning
kubectl exec -it task-events-dual-role-0 -n kafka -- \
/opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic task-created \
--from-beginning
Output:
{"id": "task-001", "title": "Buy groceries", "created_at": "2025-01-15T10:30:00Z"}
{"id": "task-002", "title": "Review pull request", "created_at": "2025-01-15T10:30:01Z"}
{"id": "task-003", "title": "Deploy to production", "created_at": "2025-01-15T10:30:02Z"}
Your messages are in Kafka, persisted and ready for any consumer to read.
Common Producer Errors
| Error | Cause | Fix |
|---|---|---|
NoBrokersAvailable | Can't connect to bootstrap servers | Verify port-forward is running; check bootstrap.servers address |
UNKNOWN_TOPIC_OR_PARTITION | Topic doesn't exist | Create topic first using KafkaTopic CRD or auto.create.topics.enable |
MSG_SIZE_TOO_LARGE | Message exceeds max.message.bytes | Increase broker limit or reduce message size |
| Callback never called | Forgot to call poll()/flush() | Add poll(0) after produce(), flush() before exit |
Reflect on Your Skill
You built a kafka-events skill in Lesson 0. Test and improve it based on what you learned.
Test Your Skill
Using my kafka-events skill, generate Python producer code that sends task.created events to a Kafka topic.
Does my skill include proper error handling, serialization, and callback patterns?
Identify Gaps
Ask yourself:
- Did my skill show synchronous vs asynchronous producer patterns?
- Did it include delivery callbacks and error handling?
Improve Your Skill
If you found gaps:
My kafka-events skill is missing producer implementation patterns (sync vs async, callbacks, error handling).
Update it to include when to use synchronous vs asynchronous producers and how to handle delivery failures.
Try With AI
Prompt 1: Debug a Silent Producer
My Kafka producer runs without errors but my delivery_report callback
never prints anything. Here's my code:
producer.produce(topic='events', value='test', callback=delivery_report)
# ... more produce calls ...
print("Done sending")
What am I missing? Walk me through the produce/poll/flush lifecycle.
What you're learning: The asynchronous callback model—understanding that produce() is non-blocking and callbacks require explicit triggering.
Prompt 2: Design a Key Strategy
I'm building an event-driven task management system. Each task has:
- task_id (unique)
- user_id (who owns it)
- project_id (which project it belongs to)
Help me choose the right message key. I need ordering guarantees for
task lifecycle events (created → updated → completed). But I also
want to scale consumer processing. What are the trade-offs between
using task_id vs user_id vs project_id as the key?
What you're learning: Key design decisions—balancing ordering guarantees against parallelism and understanding partition assignment.
Prompt 3: Add Error Handling
My producer works in development but I'm worried about production.
What happens if:
1. Kafka is temporarily unreachable?
2. A message is too large?
3. The topic doesn't exist?
Help me add robust error handling to my delivery_report callback.
Show me how to log failures, potentially retry, and alert on
critical errors.
What you're learning: Production resilience patterns—moving from "it works on my machine" to handling real-world failure scenarios.
Safety note: When testing producer code, start with a development topic. Avoid producing to production topics until you've verified your error handling works correctly.