Skip to main content
Updated Feb 23, 2026

Your First Producer (Python)

You have a Kafka cluster running on Docker Desktop Kubernetes. Now it's time to send your first message.

In the request-response world, you call an API and wait for a response. In event-driven systems, you publish an event and move on. But "move on" doesn't mean "forget about it." You need to know whether your message actually reached Kafka. Did it land on a partition? Which offset was assigned? Did something go wrong?

The confluent-kafka-python library handles this through a pattern that might feel unusual at first: asynchronous production with delivery callbacks. You call produce(), which returns immediately. Later, you call poll() to process callbacks that tell you what happened. This pattern maximizes throughput while still giving you visibility into delivery success or failure.

By the end of this lesson, you'll have working producer code that sends messages to your Kafka cluster and confirms each delivery.

Installing the Client Library

The confluent-kafka-python library is the official Confluent client for Python. It wraps the high-performance librdkafka C library, giving you the best performance available for Python Kafka clients.

Why not aiokafka? You might see aiokafka in tutorials—it has cleaner async/await syntax. We use confluent-kafka because:

confluent-kafkaaiokafka
Official (Confluent maintains it)Community-maintained
~10x faster (C library underneath)Pure Python
Native Schema Registry supportRequires extra libraries
Production standardLess common in production

The callback pattern takes getting used to, but it's what you'll see in real Kafka jobs, and you'll need Schema Registry support in Lesson 10.

Install it with uv:

uv add confluent-kafka

Output:

Resolved 1 package in 0.5s
Installed 1 package in 0.3s
+ confluent-kafka==2.6.1

If you're using pip:

pip install confluent-kafka

The library requires librdkafka to be available on your system. On macOS, the pip/uv installation handles this automatically. On Linux, you may need to install it separately (apt-get install librdkafka-dev on Debian/Ubuntu).

Connect to Your Kafka Cluster

Your Kafka cluster from Lesson 4 includes a NodePort listener on port 30092. This exposes Kafka directly on your localhost—no extra setup needed.

Verify the NodePort is working:

kubectl get svc -n kafka | grep external

Output:

task-events-kafka-external-bootstrap   NodePort   10.96.x.x   <none>   9094:30092/TCP

Connection Reference:

Where Your Code RunsBootstrap Server
Local machine (Mac/Windows)localhost:30092
Pod in same namespacetask-events-kafka-bootstrap:9092
Pod in different namespacetask-events-kafka-bootstrap.kafka.svc.cluster.local:9092

For this lesson, you run code locally, so use localhost:30092.

The Minimal Producer

Let's start with the simplest producer that actually works:

from confluent_kafka import Producer

# Create producer with minimal configuration
producer = Producer({
'bootstrap.servers': 'localhost:30092',
'client.id': 'my-first-producer'
})

# Send a message
producer.produce(
topic='task-created',
value='Hello, Kafka!'
)

# Wait for all messages to be delivered
producer.flush()

print("Message sent!")

Output:

Message sent!

This works, but it's blind. You have no idea whether the message actually reached Kafka or where it landed. Let's add visibility.

Understanding the Asynchronous Model

The produce() method is non-blocking. When you call it, the message goes into an internal buffer, and the method returns immediately. The actual network transmission happens in a background thread.

This creates a problem: how do you know if delivery succeeded?

The answer is delivery callbacks. You provide a function that Kafka calls after each message is delivered (or fails). But there's a catch: callbacks don't execute automatically. You must call poll() to trigger them.

Here's the mental model:

produce() → Message enters buffer → Background thread sends to Kafka

Kafka acknowledges

poll() → Triggers your callback with result

Adding Delivery Callbacks

A delivery callback receives two arguments:

  • err: An error object if delivery failed, or None if successful
  • msg: A message object with metadata about the delivered message
from confluent_kafka import Producer

def delivery_report(err, msg):
"""Called once for each message produced."""
if err is not None:
print(f'FAILED: {err}')
else:
print(f'SUCCESS: topic={msg.topic()} partition={msg.partition()} offset={msg.offset()}')

producer = Producer({
'bootstrap.servers': 'localhost:30092',
'client.id': 'task-api-producer'
})

# Send with callback
producer.produce(
topic='task-created',
value='{"id": "task-001", "title": "Buy groceries"}',
callback=delivery_report
)

# Trigger callback processing
producer.poll(0)

# Ensure delivery before exit
producer.flush()

Output:

SUCCESS: topic=task-created partition=0 offset=0

Now you can see exactly where your message landed: topic task-created, partition 0, offset 0.

Why Message Keys Matter

So far, we've sent messages without keys. Kafka accepts this, but you lose an important guarantee.

When you provide a key, Kafka uses it to determine the partition:

# Messages with same key always go to same partition
producer.produce(
topic='task-created',
key='user-123', # User ID as key
value='{"id": "task-001", "title": "Buy groceries", "user": "user-123"}',
callback=delivery_report
)

Why this matters:

  1. Ordering: Messages with the same key are always ordered (within a partition)
  2. Locality: All events for an entity stay together, simplifying consumer logic
  3. Scaling: Different keys can be processed in parallel across partitions

For task events, using task_id or user_id as the key ensures all events for that entity arrive in order.

# All events for task-001 go to same partition, preserving order
producer.produce(topic='task-events', key='task-001', value='{"type": "created", ...}')
producer.produce(topic='task-events', key='task-001', value='{"type": "updated", ...}')
producer.produce(topic='task-events', key='task-001', value='{"type": "completed", ...}')

Complete Producer Example

Here's a production-ready producer that sends multiple messages with proper error handling:

from confluent_kafka import Producer
import json
from datetime import datetime, timezone

def delivery_report(err, msg):
"""Callback triggered by poll() or flush() after message delivery."""
if err is not None:
print(f'Delivery failed for {msg.key()}: {err}')
else:
print(f'Delivered: {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

def create_producer():
"""Create a configured Kafka producer."""
return Producer({
'bootstrap.servers': 'localhost:30092',
'client.id': 'task-api-producer',
})

def send_task_event(producer, task_id: str, title: str):
"""Send a task created event."""
event = {
'id': task_id,
'title': title,
'created_at': datetime.now(timezone.utc).isoformat()
}

producer.produce(
topic='task-created',
key=task_id,
value=json.dumps(event),
callback=delivery_report
)

# Process any pending callbacks (non-blocking)
producer.poll(0)

def main():
producer = create_producer()

# Send some task events
tasks = [
('task-001', 'Buy groceries'),
('task-002', 'Review pull request'),
('task-003', 'Deploy to production'),
]

for task_id, title in tasks:
send_task_event(producer, task_id, title)
print(f'Queued: {task_id}')

# Wait for all messages to be delivered
remaining = producer.flush(timeout=10)

if remaining > 0:
print(f'WARNING: {remaining} messages were not delivered')
else:
print('All messages delivered successfully')

if __name__ == '__main__':
main()

Output:

Queued: task-001
Queued: task-002
Queued: task-003
Delivered: task-created [0] @ 1
Delivered: task-created [1] @ 0
Delivered: task-created [2] @ 0
All messages delivered successfully

Notice how messages landed on different partitions (0, 1, 2). Kafka distributed them based on the key hash.

Understanding poll() and flush()

These two methods are often confused. Here's the difference:

MethodBehaviorWhen to Use
poll(timeout)Process callbacks for delivered messages; returns number of events processedCall regularly in loops to handle callbacks without blocking
flush(timeout)Block until all buffered messages are delivered (or timeout); processes callbacksCall before shutdown to ensure no messages are lost

The pattern:

# In a loop: poll(0) for non-blocking callback processing
for message in messages:
producer.produce(topic, value=message, callback=callback)
producer.poll(0) # Non-blocking, process any ready callbacks

# Before shutdown: flush() to ensure all messages delivered
producer.flush(timeout=10) # Block up to 10 seconds

What happens if you skip poll()?

Callbacks accumulate in memory. If you never call poll() or flush(), your callback functions never execute, and you never learn about delivery failures until the program exits.

Verifying Messages with Kafka CLI

Your producer is sending messages, but let's verify they actually arrived. Use the Kafka console consumer:

# First, port-forward to access Kafka from your machine
kubectl port-forward svc/task-events-kafka-bootstrap 9092:9092 -n kafka &

# Then consume from the beginning
kubectl exec -it task-events-dual-role-0 -n kafka -- \
/opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic task-created \
--from-beginning

Output:

{"id": "task-001", "title": "Buy groceries", "created_at": "2025-01-15T10:30:00Z"}
{"id": "task-002", "title": "Review pull request", "created_at": "2025-01-15T10:30:01Z"}
{"id": "task-003", "title": "Deploy to production", "created_at": "2025-01-15T10:30:02Z"}

Your messages are in Kafka, persisted and ready for any consumer to read.

Common Producer Errors

ErrorCauseFix
NoBrokersAvailableCan't connect to bootstrap serversVerify port-forward is running; check bootstrap.servers address
UNKNOWN_TOPIC_OR_PARTITIONTopic doesn't existCreate topic first using KafkaTopic CRD or auto.create.topics.enable
MSG_SIZE_TOO_LARGEMessage exceeds max.message.bytesIncrease broker limit or reduce message size
Callback never calledForgot to call poll()/flush()Add poll(0) after produce(), flush() before exit

Reflect on Your Skill

You built a kafka-events skill in Lesson 0. Test and improve it based on what you learned.

Test Your Skill

Using my kafka-events skill, generate Python producer code that sends task.created events to a Kafka topic.
Does my skill include proper error handling, serialization, and callback patterns?

Identify Gaps

Ask yourself:

  • Did my skill show synchronous vs asynchronous producer patterns?
  • Did it include delivery callbacks and error handling?

Improve Your Skill

If you found gaps:

My kafka-events skill is missing producer implementation patterns (sync vs async, callbacks, error handling).
Update it to include when to use synchronous vs asynchronous producers and how to handle delivery failures.

Try With AI

Prompt 1: Debug a Silent Producer

My Kafka producer runs without errors but my delivery_report callback
never prints anything. Here's my code:

producer.produce(topic='events', value='test', callback=delivery_report)
# ... more produce calls ...
print("Done sending")

What am I missing? Walk me through the produce/poll/flush lifecycle.

What you're learning: The asynchronous callback model—understanding that produce() is non-blocking and callbacks require explicit triggering.

Prompt 2: Design a Key Strategy

I'm building an event-driven task management system. Each task has:
- task_id (unique)
- user_id (who owns it)
- project_id (which project it belongs to)

Help me choose the right message key. I need ordering guarantees for
task lifecycle events (created → updated → completed). But I also
want to scale consumer processing. What are the trade-offs between
using task_id vs user_id vs project_id as the key?

What you're learning: Key design decisions—balancing ordering guarantees against parallelism and understanding partition assignment.

Prompt 3: Add Error Handling

My producer works in development but I'm worried about production.
What happens if:
1. Kafka is temporarily unreachable?
2. A message is too large?
3. The topic doesn't exist?

Help me add robust error handling to my delivery_report callback.
Show me how to log failures, potentially retry, and alert on
critical errors.

What you're learning: Production resilience patterns—moving from "it works on my machine" to handling real-world failure scenarios.

Safety note: When testing producer code, start with a development topic. Avoid producing to production topics until you've verified your error handling works correctly.