Skip to main content
Updated Feb 23, 2026

Agent Event Patterns

Your Task API creates tasks, but that's where it stops. When a task is created, nothing else happens automatically. You want to send email notifications, post to Slack, update a mobile app, log for compliance, and trigger reminder scheduling. But if you add all that logic to the Task API, you've created a monolith that knows about email services, Slack webhooks, mobile push notifications, and audit databases.

This is exactly where event-driven architecture shines. The Task API publishes a single event: "A task was created." It doesn't know or care who's listening. Email service, Slack integration, mobile push, audit logging, and reminder scheduling all subscribe independently. Each service evolves separately. The Task API stays focused on task management.

In this lesson, you'll design event schemas that enable this decoupling and implement the notification fanout pattern that makes it work. The patterns here connect directly to your Part 6 Task API agents, turning isolated API calls into a coordinated system of collaborating services.

Event Naming: The domain.action Convention

Consistent event naming is surprisingly important. When you have dozens of event types flowing through your system, predictable names make debugging, monitoring, and documentation dramatically easier.

The standard pattern is domain.action, using past tense to indicate something that happened:

Event NameWhat It Means
task.createdA task was created
task.updatedA task was modified
task.completedA task was marked done
task.deletedA task was removed
user.registeredA new user signed up
order.placedAn order was submitted

Why past tense? Events represent facts about things that already happened. They're immutable historical records, not requests to do something. Compare:

TypeExampleSemantics
Event (past tense)task.createdImmutable fact: "This happened"
Command (imperative)CreateTaskRequest: "Please do this" (can fail)

This distinction matters when you're designing systems. Events are facts you publish and forget. Commands require acknowledgment and error handling.

The Complete Event Schema

Every event in your system needs consistent metadata for tracing, debugging, and compliance. Here's the schema pattern that works at scale:

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import uuid

@dataclass
class EventMetadata:
"""Metadata for distributed tracing and compliance."""
correlation_id: str # Traces request across services
causation_id: str # What event caused this event
source: str # Service that produced the event

@dataclass
class TaskEvent:
"""Base structure for all task-related events."""
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = ""
occurred_at: str = field(default_factory=lambda: datetime.utcnow().isoformat() + "Z")
data: dict = field(default_factory=dict)
metadata: EventMetadata = None

def to_dict(self) -> dict:
return {
"event_id": self.event_id,
"event_type": self.event_type,
"occurred_at": self.occurred_at,
"data": self.data,
"metadata": {
"correlation_id": self.metadata.correlation_id,
"causation_id": self.metadata.causation_id,
"source": self.metadata.source
} if self.metadata else {}
}

Output (example event):

{
"event_id": "e7c5a8f2-3b4d-4e6a-9f1c-2d8e7a6b5c4d",
"event_type": "task.created",
"occurred_at": "2025-01-15T14:30:22.456Z",
"data": {
"task_id": "task-123",
"title": "Review quarterly report",
"owner_id": "user-456",
"priority": 2
},
"metadata": {
"correlation_id": "req-abc-123",
"causation_id": "api-call-789",
"source": "task-api"
}
}

Correlation ID vs Causation ID

These two IDs serve different purposes in distributed tracing:

Correlation ID: Traces a single user request across all services it touches. When a user creates a task through your API, that request gets a correlation ID. The task event, the notification, the Slack message, the audit log entry—all share the same correlation ID so you can reconstruct the complete picture.

User Request (correlation_id: "req-abc-123")
└─> Task API creates task
└─> task.created event (correlation_id: "req-abc-123")
├─> Email service sends notification (correlation_id: "req-abc-123")
├─> Slack service posts message (correlation_id: "req-abc-123")
└─> Audit service logs entry (correlation_id: "req-abc-123")

Causation ID: Links each event to what directly caused it. This creates an event chain that shows causal relationships, not just temporal relationships.

Event: task.created (causation_id: "api-call-789")
└─> Event: notification.sent (causation_id: "e7c5a8f2-...")
└─> Event: email.delivered (causation_id: "notif-456-...")

When something goes wrong, correlation ID shows you everything that happened for a user request. Causation ID shows you why each thing happened.

Publishing Events from Task API

Here's how your Task API publishes events when tasks are created. The producer integrates with FastAPI's lifespan to ensure proper initialization and cleanup:

import json
import uuid
from contextlib import asynccontextmanager
from datetime import datetime

from confluent_kafka import Producer
from fastapi import FastAPI, Request
from pydantic import BaseModel

# Event producer singleton
producer: Producer = None

def delivery_callback(err, msg):
if err:
print(f"Event delivery failed: {err}")
else:
print(f"Event delivered to {msg.topic()}[{msg.partition()}]")

@asynccontextmanager
async def lifespan(app: FastAPI):
global producer
producer = Producer({
'bootstrap.servers': 'localhost:30092',
'client.id': 'task-api',
'acks': 'all',
'enable.idempotence': True
})
yield
producer.flush()

app = FastAPI(lifespan=lifespan)

class TaskCreate(BaseModel):
title: str
owner_id: str
priority: int = 1

def publish_task_event(
event_type: str,
task_data: dict,
correlation_id: str,
causation_id: str
):
"""Publish a task lifecycle event."""
event = {
"event_id": str(uuid.uuid4()),
"event_type": event_type,
"occurred_at": datetime.utcnow().isoformat() + "Z",
"data": task_data,
"metadata": {
"correlation_id": correlation_id,
"causation_id": causation_id,
"source": "task-api"
}
}

producer.produce(
topic='task-events',
key=task_data.get('task_id', str(uuid.uuid4())),
value=json.dumps(event),
callback=delivery_callback
)
producer.poll(0)

@app.post("/tasks")
async def create_task(task: TaskCreate, request: Request):
# Get or create correlation ID from request headers
correlation_id = request.headers.get('X-Correlation-ID', str(uuid.uuid4()))

# Create the task (your database logic here)
task_id = str(uuid.uuid4())
task_data = {
"task_id": task_id,
"title": task.title,
"owner_id": task.owner_id,
"priority": task.priority
}

# Publish event
publish_task_event(
event_type="task.created",
task_data=task_data,
correlation_id=correlation_id,
causation_id=f"api-create-{task_id}"
)

return {"id": task_id, "status": "created"}

Output (console when task created):

Event delivered to task-events[0]

The key insight here: the Task API publishes an event and returns immediately. It doesn't wait for email, Slack, or audit services. Those happen asynchronously, triggered by the event.

Notification Fanout: Multiple Consumers, One Event

The power of event-driven architecture becomes clear in the fanout pattern. Multiple services subscribe to the same topic, each processing events for their specific purpose. They operate in separate consumer groups, so each service receives every event independently.

                         task.created event

┌────────────────────┼────────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Email Service│ │ Slack Service│ │ Audit Service│
│ (group:email)│ │ (group:slack)│ │ (group:audit)│
└──────────────┘ └──────────────┘ └──────────────┘

Each service uses a different group.id, ensuring complete independence:

Email Notification Consumer

import json
from confluent_kafka import Consumer, KafkaError

def create_email_consumer():
consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'notification-email', # Unique group for email
'auto.offset.reset': 'earliest',
'enable.auto.commit': False
})
consumer.subscribe(['task-events'])
return consumer

def send_email(to: str, subject: str, body: str):
"""Placeholder for actual email sending logic."""
print(f"Sending email to {to}: {subject}")

def process_email_notifications():
consumer = create_email_consumer()

try:
while True:
msg = consumer.poll(1.0)

if msg is None:
continue

if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
print(f"Error: {msg.error()}")
continue

event = json.loads(msg.value().decode())

# Only process task.created events
if event.get('event_type') == 'task.created':
task_data = event.get('data', {})
correlation_id = event.get('metadata', {}).get('correlation_id', 'unknown')

send_email(
to=f"{task_data.get('owner_id')}@company.com",
subject=f"New task assigned: {task_data.get('title')}",
body=f"Task ID: {task_data.get('task_id')}\nCorrelation: {correlation_id}"
)

consumer.commit(message=msg)

finally:
consumer.close()

if __name__ == "__main__":
process_email_notifications()

Output:

Sending email to user-456@company.com: New task assigned: Review quarterly report

Slack Notification Consumer

import json
from confluent_kafka import Consumer, KafkaError

def create_slack_consumer():
consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'notification-slack', # Different group from email
'auto.offset.reset': 'earliest',
'enable.auto.commit': False
})
consumer.subscribe(['task-events'])
return consumer

def post_to_slack(channel: str, message: str):
"""Placeholder for Slack webhook integration."""
print(f"Slack [{channel}]: {message}")

def process_slack_notifications():
consumer = create_slack_consumer()

try:
while True:
msg = consumer.poll(1.0)

if msg is None:
continue

if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
print(f"Error: {msg.error()}")
continue

event = json.loads(msg.value().decode())

# Handle multiple event types
event_type = event.get('event_type', '')
task_data = event.get('data', {})

if event_type == 'task.created':
post_to_slack(
channel="#tasks",
message=f":new: Task created: {task_data.get('title')}"
)
elif event_type == 'task.completed':
post_to_slack(
channel="#tasks",
message=f":white_check_mark: Task completed: {task_data.get('title')}"
)

consumer.commit(message=msg)

finally:
consumer.close()

Output:

Slack [#tasks]: :new: Task created: Review quarterly report

The Immutable Audit Log Pattern

For compliance, debugging, and analytics, you often need a complete record of everything that happened. The audit log consumer implements this with a strict append-only pattern:

import json
import os
from datetime import datetime
from confluent_kafka import Consumer, KafkaError

def create_audit_consumer():
consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'audit-log', # Dedicated group for audit
'auto.offset.reset': 'earliest', # Never miss an event
'enable.auto.commit': False
})
consumer.subscribe(['task-events'])
return consumer

class AuditLogger:
"""Append-only audit log for compliance."""

def __init__(self, log_dir: str = "/var/log/audit"):
self.log_dir = log_dir
os.makedirs(log_dir, exist_ok=True)

def append(self, event: dict):
"""Append event to immutable log. Never delete or modify."""
# Daily log files for easier management
date_str = datetime.utcnow().strftime("%Y-%m-%d")
log_file = os.path.join(self.log_dir, f"audit-{date_str}.jsonl")

log_entry = {
"logged_at": datetime.utcnow().isoformat() + "Z",
"event": event
}

with open(log_file, 'a') as f:
f.write(json.dumps(log_entry) + '\n')

print(f"Audit logged: {event.get('event_type')} [{event.get('event_id')}]")

def run_audit_logger():
consumer = create_audit_consumer()
audit = AuditLogger()

try:
while True:
msg = consumer.poll(1.0)

if msg is None:
continue

if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
print(f"Error: {msg.error()}")
continue

event = json.loads(msg.value().decode())

# Log ALL events, not just specific types
audit.append(event)

consumer.commit(message=msg)

finally:
consumer.close()

if __name__ == "__main__":
run_audit_logger()

Output:

Audit logged: task.created [e7c5a8f2-3b4d-4e6a-9f1c-2d8e7a6b5c4d]

Audit log file (/var/log/audit/audit-2025-01-15.jsonl):

{"logged_at": "2025-01-15T14:30:23.001Z", "event": {"event_id": "e7c5a8f2-...", "event_type": "task.created", ...}}

Key audit log principles:

PrincipleImplementation
Append-onlyNever modify or delete log entries
CompleteLog ALL events, not filtered subset
TimestampedBoth event time and logged time recorded
SearchableJSON lines format for easy grep/analysis

Topic Design: Single vs Multiple Topics

You have a choice: put all task events in one topic (task-events) or create separate topics per event type (task-created, task-updated, task-completed).

ApproachProsCons
Single topic (task-events)Simple configuration, consumers filter by event_typeAll consumers receive all events
Multiple topicsConsumers subscribe only to what they needMore topics to manage, more complex routing

Recommendation: Start with a single topic per domain (task-events, user-events, order-events). Only split when you have clear performance or isolation requirements.

# Single topic approach (recommended for most cases)
producer.produce(topic='task-events', ...)

# Consumer filters by event_type
if event.get('event_type') == 'task.created':
handle_task_created(event)
elif event.get('event_type') == 'task.completed':
handle_task_completed(event)

Connecting to Part 6 Agents

Your Part 6 Task API agents can now communicate through events. When an orchestrating agent creates a task, it publishes an event that notification agents, scheduling agents, and analytics agents all consume independently.

Orchestrating Agent (Task Creation)

▼ publish: task.created

├─────────────────────────────────────────────┐
│ │
▼ ▼
Notification Agent Scheduling Agent
(sends email, Slack) (creates calendar reminders)
│ │
▼ publish: notification.sent ▼ publish: reminder.scheduled
│ │
└──────────────────┬──────────────────────────┘


Audit Agent
(logs all events for compliance)

This architecture means agents evolve independently. Adding a new notification channel (mobile push, SMS) requires only deploying a new consumer—no changes to the Task API or other agents.


Reflect on Your Skill

You built a kafka-events skill in Lesson 0. Test and improve it based on what you learned.

Test Your Skill

Using my kafka-events skill, design an event schema with correlation_id and causation_id for an agent-based workflow.
Does my skill show how to trace events across multiple agents and reconstruct causality chains?

Identify Gaps

Ask yourself:

  • Did my skill explain correlation_id (workflow tracing) vs causation_id (event causality)?
  • Did it show how agents use these IDs to coordinate distributed workflows?

Improve Your Skill

If you found gaps:

My kafka-events skill is missing agent event patterns (correlation_id, causation_id, event chaining).
Update it to include how to implement distributed tracing and causality tracking in agent systems.

Try With AI

Setup: You're designing event schemas for an AI agent system that manages tasks, sends notifications, and maintains audit trails.

Prompt 1: Design a complete event schema

I'm designing events for a Task API that needs to support:
- Multiple notification channels (email, Slack, mobile push)
- Distributed tracing across agent services
- Compliance audit logging
- Future analytics on task completion times

Here's my current event structure:
{
"event_id": "uuid",
"event_type": "task.created",
"data": { "task_id": "...", "title": "..." }
}

What fields am I missing? Design a complete schema that supports
all my requirements. Explain why each field is necessary.

What you're learning: AI will suggest additions like occurred_at for analytics, correlation_id for tracing, causation_id for event chains, and metadata.source for debugging. You'll see how requirements translate to schema decisions.

Prompt 2: Troubleshoot a fanout issue

I have three consumers on the same 'task-events' topic:
- Email service (group: notification-email)
- Slack service (group: notification-slack)
- Audit service (group: audit-log)

Email and Slack work fine, but audit is missing events.
The audit consumer shows it's connected and subscribed.

Here's my audit consumer config:
consumer = Consumer({
'bootstrap.servers': 'localhost:30092',
'group.id': 'audit-log',
'auto.offset.reset': 'latest' # I want only new events
})

What could cause the audit consumer to miss events?
Walk me through the diagnosis.

What you're learning: AI helps you understand how auto.offset.reset: latest means you miss events that occurred before the consumer started. For audit logging, you likely want earliest to never miss an event.

Prompt 3: Extend for your domain

I'm building an agent system for [your domain: e-commerce/healthcare/finance/education].

The main entity lifecycle events would be:
- [entity].created (e.g., order.placed, patient.admitted, trade.executed)
- [entity].updated
- [entity].completed/cancelled

Design the event schema and consumer architecture for my domain.
Consider:
1. What notification channels make sense?
2. What compliance requirements might apply?
3. What analytics would be valuable?
4. How should I handle failed events?

What you're learning: AI helps translate generic patterns to your specific domain, suggesting notification channels appropriate for healthcare (pager, EHR integration) vs e-commerce (email, SMS) and compliance requirements (HIPAA vs PCI-DSS).

Safety note: When publishing events containing user data, consider GDPR and privacy requirements. Events in Kafka are retained based on topic configuration and may need data masking or separate topics for sensitive information.