Capstone: Event-Driven Agent Notifications
You've spent this chapter building event-driven system components piece by piece: producers with reliability guarantees, consumers with proper offset management, Avro schemas with Schema Registry, FastAPI integration patterns, and agent event designs. Now it's time to compose these skills into a complete system.
This capstone follows the spec-driven development approach you've seen throughout this book. You'll write a specification first, then implement it by composing the patterns you've learned. This mirrors how production AI agents are built: specify intent clearly, then orchestrate implementation using accumulated skills.
The goal is practical: add event-driven notifications to the Task API from Part 6. When a task is created, updated, or completed, the system should publish events that trigger notifications and create an immutable audit trail. By the end, you'll have a working event-driven notification system that demonstrates the patterns professional teams use in production.
Phase 1: Write the Specification
Before writing any code, define precisely what you're building. A clear specification enables focused implementation and provides acceptance criteria for validation.
Specification Structure
Every capstone specification answers these questions:
| Section | Question Answered |
|---|---|
| Intent | What problem does this solve? What's the business value? |
| Constraints | What boundaries must the implementation respect? |
| Success Criteria | How do we know it's working correctly? |
| Non-Goals | What are we explicitly NOT building? |
| Architecture | How do components interact? |
The Event-Driven Notifications Specification
Here's the specification for the notification system:
# Event-Driven Notifications Specification
## Intent
Add event-driven capabilities to the Task API that enable:
1. Decoupled notification delivery when task state changes
2. Immutable audit logging of all task lifecycle events
3. Foundation for future event consumers (reminders, analytics, integrations)
**Business Value**: The current Task API uses synchronous calls for notifications.
If the notification service is slow or unavailable, task creation blocks.
Event-driven architecture decouples these concerns.
## Constraints
### Technical Constraints
- **Kafka Cluster**: Use existing Strimzi-managed Kafka (from Lesson 4)
- **Python Client**: Use confluent-kafka-python (from Lessons 5-8)
- **Delivery Guarantee**: At-least-once for notifications (duplicates acceptable)
- **Audit Guarantee**: At-least-once with consumer deduplication
- **Schema**: JSON initially (Schema Registry optional extension)
### Operational Constraints
- **Deployment**: All services run on Docker Desktop Kubernetes
- **Resource Limits**: Single Kafka broker (development environment)
- **Topic Configuration**: Single partition (ordering within task_id)
## Success Criteria
### SC-1: Task Events Published
- [ ] When a task is created, a `task.created` event is published to `task-events` topic
- [ ] When a task is updated, a `task.updated` event is published
- [ ] When a task is completed, a `task.completed` event is published
- [ ] Each event includes: event_id, event_type, occurred_at, task data, metadata
### SC-2: Notification Service Consumes Events
- [ ] Notification service runs as separate consumer group (`notification-service`)
- [ ] Service receives task.created events and logs notification delivery
- [ ] Service receives task.completed events and logs completion notification
- [ ] Service commits offsets only after successful processing
### SC-3: Audit Service Consumes Events
- [ ] Audit service runs as separate consumer group (`audit-service`)
- [ ] Service receives ALL task events (created, updated, completed)
- [ ] Service appends each event to an immutable log file
- [ ] Service deduplicates by event_id to handle redeliveries
### SC-4: End-to-End Flow Verified
- [ ] Create task via API -> event published -> both consumers receive
- [ ] Consumer lag stays below 100 for normal operations
- [ ] Stopping and restarting consumers resumes from correct offset
## Non-Goals (What We're NOT Building)
- Real notification delivery (email, SMS, push) - we log instead
- Schema Registry integration - JSON schema sufficient for demonstration
- Multi-partition topics - single partition maintains ordering
- Exactly-once semantics - at-least-once with deduplication is sufficient
- Saga pattern implementation - covered in Lesson 17, not repeated here
## Architecture
┌─────────────────┐ ┌─────────────────────────┐ │ Task API │ │ Kafka Cluster │ │ (FastAPI) │────>│ topic: task-events │ │ │ │ partitions: 1 │ └─────────────────┘ └───────────┬─────────────┘ │ ┌───────────────┴───────────────┐ │ │ v v ┌───────────────────┐ ┌───────────────────┐ │ Notification Svc │ │ Audit Service │ │ group: notify-svc │ │ group: audit-svc │ │ events: created, │ │ events: ALL │ │ completed │ │ output: log file │ └───────────────────┘ └───────────────────┘
## Event Schema
```json
{
"event_id": "uuid",
"event_type": "task.created | task.updated | task.completed",
"occurred_at": "ISO-8601 timestamp",
"data": {
"task_id": "uuid",
"title": "string",
"status": "pending | in_progress | completed",
"owner_id": "uuid (optional)"
},
"metadata": {
"correlation_id": "uuid (request trace)",
"source": "task-api"
}
}
### Why This Specification Matters
Notice what the specification provides:
1. **Clear intent**: Not "add Kafka to Task API" but "enable decoupled notifications with audit trail"
2. **Measurable criteria**: Each success criterion is a checkbox that can be verified
3. **Explicit boundaries**: Non-goals prevent scope creep
4. **Visual architecture**: The diagram clarifies component relationships
When you work with AI to implement this specification, you have shared understanding of what "done" looks like.
## Phase 2: Implement the Specification
Now implement the specification by composing patterns from earlier lessons. This phase demonstrates the core skill of spec-driven development: translating clear requirements into working code.
### Step 1: Create Event Schema and Publisher
First, implement the event schema and producer integration for the Task API.
Create `events/schemas.py`:
```python
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from typing import Optional
from uuid import uuid4
import json
@dataclass
class TaskData:
"""Task information included in events."""
task_id: str
title: str
status: str
owner_id: Optional[str] = None
@dataclass
class EventMetadata:
"""Metadata for event tracing and debugging."""
correlation_id: str
source: str = "task-api"
@dataclass
class TaskEvent:
"""Base event schema for task lifecycle events."""
event_type: str
data: TaskData
metadata: EventMetadata
event_id: str = field(default_factory=lambda: str(uuid4()))
occurred_at: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
def to_json(self) -> str:
"""Serialize event to JSON string."""
return json.dumps(asdict(self))
@classmethod
def task_created(
cls,
task_id: str,
title: str,
correlation_id: str,
owner_id: Optional[str] = None
) -> "TaskEvent":
"""Factory for task.created events."""
return cls(
event_type="task.created",
data=TaskData(
task_id=task_id,
title=title,
status="pending",
owner_id=owner_id
),
metadata=EventMetadata(correlation_id=correlation_id)
)
@classmethod
def task_updated(
cls,
task_id: str,
title: str,
status: str,
correlation_id: str,
owner_id: Optional[str] = None
) -> "TaskEvent":
"""Factory for task.updated events."""
return cls(
event_type="task.updated",
data=TaskData(
task_id=task_id,
title=title,
status=status,
owner_id=owner_id
),
metadata=EventMetadata(correlation_id=correlation_id)
)
@classmethod
def task_completed(
cls,
task_id: str,
title: str,
correlation_id: str,
owner_id: Optional[str] = None
) -> "TaskEvent":
"""Factory for task.completed events."""
return cls(
event_type="task.completed",
data=TaskData(
task_id=task_id,
title=title,
status="completed",
owner_id=owner_id
),
metadata=EventMetadata(correlation_id=correlation_id)
)
Output:
>>> from events.schemas import TaskEvent
>>> event = TaskEvent.task_created("task-123", "Buy groceries", "req-456")
>>> print(event.to_json())
{"event_type": "task.created", "data": {"task_id": "task-123", "title": "Buy groceries", "status": "pending", "owner_id": null}, "metadata": {"correlation_id": "req-456", "source": "task-api"}, "event_id": "a1b2c3d4-...", "occurred_at": "2025-01-15T10:30:00.000000+00:00"}
Step 2: Implement Event Publisher
Create a reliable publisher that integrates with FastAPI's lifespan.
Create events/publisher.py:
from confluent_kafka import Producer
from typing import Optional, Callable
import logging
from .schemas import TaskEvent
logger = logging.getLogger(__name__)
class EventPublisher:
"""Reliable event publisher for Task API events."""
def __init__(self, bootstrap_servers: str):
self.producer = Producer({
'bootstrap.servers': bootstrap_servers,
'client.id': 'task-api-publisher',
'acks': 'all',
'enable.idempotence': True,
'max.in.flight.requests.per.connection': 5,
'retries': 2147483647,
'delivery.timeout.ms': 30000
})
self.topic = 'task-events'
def _delivery_callback(self, err, msg):
"""Handle delivery result."""
if err is not None:
logger.error(
f"Event delivery failed: {err}, "
f"key={msg.key()}, topic={msg.topic()}"
)
else:
logger.info(
f"Event delivered: topic={msg.topic()}, "
f"partition={msg.partition()}, offset={msg.offset()}"
)
def publish(self, event: TaskEvent) -> None:
"""Publish event to Kafka topic."""
self.producer.produce(
topic=self.topic,
key=event.data.task_id,
value=event.to_json(),
callback=self._delivery_callback
)
# Process callbacks without blocking
self.producer.poll(0)
def flush(self) -> None:
"""Ensure all pending events are delivered."""
remaining = self.producer.flush(timeout=10)
if remaining > 0:
logger.warning(f"{remaining} events not delivered on flush")
# Global publisher instance (initialized in lifespan)
_publisher: Optional[EventPublisher] = None
def get_publisher() -> EventPublisher:
"""Get the global publisher instance."""
if _publisher is None:
raise RuntimeError("Publisher not initialized. Check lifespan setup.")
return _publisher
def init_publisher(bootstrap_servers: str) -> EventPublisher:
"""Initialize the global publisher."""
global _publisher
_publisher = EventPublisher(bootstrap_servers)
return _publisher
def shutdown_publisher() -> None:
"""Shutdown the global publisher."""
global _publisher
if _publisher is not None:
_publisher.flush()
_publisher = None
Output:
>>> from events.publisher import init_publisher, get_publisher
>>> from events.schemas import TaskEvent
>>> publisher = init_publisher("localhost:30092")
>>> event = TaskEvent.task_created("task-789", "Review PR", "req-111")
>>> publisher.publish(event)
>>> publisher.flush()
INFO:events.publisher:Event delivered: topic=task-events, partition=0, offset=42
Step 3: Integrate with FastAPI
Connect the publisher to FastAPI endpoints.
Update main.py:
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
from uuid import uuid4
import os
from events.publisher import init_publisher, shutdown_publisher, get_publisher
from events.schemas import TaskEvent
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifespan for Kafka producer."""
bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:30092")
init_publisher(bootstrap_servers)
yield
shutdown_publisher()
app = FastAPI(title="Task API", lifespan=lifespan)
class TaskCreate(BaseModel):
title: str
owner_id: Optional[str] = None
class TaskUpdate(BaseModel):
title: Optional[str] = None
status: Optional[str] = None
class Task(BaseModel):
id: str
title: str
status: str
owner_id: Optional[str] = None
# In-memory task storage (replace with database in production)
tasks: dict[str, Task] = {}
@app.post("/tasks", response_model=Task)
async def create_task(task_in: TaskCreate):
"""Create a new task and publish task.created event."""
correlation_id = str(uuid4())
task_id = str(uuid4())
task = Task(
id=task_id,
title=task_in.title,
status="pending",
owner_id=task_in.owner_id
)
tasks[task_id] = task
# Publish event
event = TaskEvent.task_created(
task_id=task_id,
title=task.title,
correlation_id=correlation_id,
owner_id=task.owner_id
)
get_publisher().publish(event)
return task
@app.patch("/tasks/{task_id}", response_model=Task)
async def update_task(task_id: str, task_in: TaskUpdate):
"""Update a task and publish task.updated event."""
if task_id not in tasks:
raise HTTPException(status_code=404, detail="Task not found")
correlation_id = str(uuid4())
task = tasks[task_id]
if task_in.title is not None:
task.title = task_in.title
if task_in.status is not None:
task.status = task_in.status
# Publish event
event = TaskEvent.task_updated(
task_id=task_id,
title=task.title,
status=task.status,
correlation_id=correlation_id,
owner_id=task.owner_id
)
get_publisher().publish(event)
return task
@app.post("/tasks/{task_id}/complete", response_model=Task)
async def complete_task(task_id: str):
"""Complete a task and publish task.completed event."""
if task_id not in tasks:
raise HTTPException(status_code=404, detail="Task not found")
correlation_id = str(uuid4())
task = tasks[task_id]
task.status = "completed"
# Publish event
event = TaskEvent.task_completed(
task_id=task_id,
title=task.title,
correlation_id=correlation_id,
owner_id=task.owner_id
)
get_publisher().publish(event)
return task
Output:
$ curl -X POST http://localhost:8000/tasks \
-H "Content-Type: application/json" \
-d '{"title": "Write capstone lesson"}'
{"id":"abc123","title":"Write capstone lesson","status":"pending","owner_id":null}
# In the Task API logs:
INFO:events.publisher:Event delivered: topic=task-events, partition=0, offset=43
Step 4: Implement Notification Service
Create a consumer that processes task events for notifications.
Create services/notification_service.py:
from confluent_kafka import Consumer, KafkaError
import json
import logging
import signal
import sys
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class NotificationService:
"""Consume task events and deliver notifications."""
def __init__(self, bootstrap_servers: str):
self.consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'group.id': 'notification-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Manual commit after processing
})
self.running = True
self.subscribed_events = {'task.created', 'task.completed'}
def process_event(self, event: dict) -> bool:
"""Process a single event. Returns True if successful."""
event_type = event.get('event_type')
if event_type not in self.subscribed_events:
# Skip events we don't handle
return True
task_data = event.get('data', {})
task_id = task_data.get('task_id')
title = task_data.get('title')
if event_type == 'task.created':
logger.info(
f"NOTIFICATION: New task created - '{title}' (ID: {task_id})"
)
# In production: send email, push notification, etc.
elif event_type == 'task.completed':
logger.info(
f"NOTIFICATION: Task completed - '{title}' (ID: {task_id})"
)
# In production: send completion notification
return True
def run(self):
"""Main consumer loop."""
self.consumer.subscribe(['task-events'])
logger.info("Notification service started, waiting for events...")
while self.running:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
logger.error(f"Consumer error: {msg.error()}")
continue
try:
event = json.loads(msg.value().decode('utf-8'))
if self.process_event(event):
# Commit only after successful processing
self.consumer.commit(message=msg)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in message: {e}")
# Commit to skip malformed message
self.consumer.commit(message=msg)
def shutdown(self):
"""Graceful shutdown."""
logger.info("Shutting down notification service...")
self.running = False
self.consumer.close()
def main():
import os
bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:30092")
service = NotificationService(bootstrap_servers)
# Handle graceful shutdown
def signal_handler(sig, frame):
service.shutdown()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
service.run()
if __name__ == "__main__":
main()
Output:
$ python services/notification_service.py
INFO:__main__:Notification service started, waiting for events...
INFO:__main__:NOTIFICATION: New task created - 'Write capstone lesson' (ID: abc123)
INFO:__main__:NOTIFICATION: Task completed - 'Write capstone lesson' (ID: abc123)
Step 5: Implement Audit Service
Create a consumer that logs all events to an immutable audit trail.
Create services/audit_service.py:
from confluent_kafka import Consumer, KafkaError
import json
import logging
import signal
import sys
from pathlib import Path
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AuditService:
"""Consume ALL task events and write to immutable audit log."""
def __init__(self, bootstrap_servers: str, audit_log_path: str = "audit.log"):
self.consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'group.id': 'audit-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False
})
self.running = True
self.audit_log_path = Path(audit_log_path)
self.seen_events: set[str] = set()
self._load_seen_events()
def _load_seen_events(self):
"""Load previously seen event IDs for deduplication."""
if self.audit_log_path.exists():
with open(self.audit_log_path, 'r') as f:
for line in f:
try:
entry = json.loads(line)
self.seen_events.add(entry.get('event_id'))
except json.JSONDecodeError:
continue
logger.info(f"Loaded {len(self.seen_events)} previously seen events")
def process_event(self, event: dict) -> bool:
"""Process event and append to audit log. Returns True if successful."""
event_id = event.get('event_id')
# Deduplicate: skip if we've seen this event
if event_id in self.seen_events:
logger.debug(f"Skipping duplicate event: {event_id}")
return True
# Create audit entry
audit_entry = {
'event_id': event_id,
'event_type': event.get('event_type'),
'occurred_at': event.get('occurred_at'),
'data': event.get('data'),
'metadata': event.get('metadata'),
'audited_at': datetime.utcnow().isoformat()
}
# Append to audit log (immutable append-only)
with open(self.audit_log_path, 'a') as f:
f.write(json.dumps(audit_entry) + '\n')
self.seen_events.add(event_id)
logger.info(
f"AUDIT: {event.get('event_type')} - "
f"task_id={event.get('data', {}).get('task_id')}"
)
return True
def run(self):
"""Main consumer loop."""
self.consumer.subscribe(['task-events'])
logger.info("Audit service started, logging all events...")
while self.running:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
logger.error(f"Consumer error: {msg.error()}")
continue
try:
event = json.loads(msg.value().decode('utf-8'))
if self.process_event(event):
self.consumer.commit(message=msg)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in message: {e}")
self.consumer.commit(message=msg)
def shutdown(self):
"""Graceful shutdown."""
logger.info("Shutting down audit service...")
self.running = False
self.consumer.close()
def main():
import os
bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:30092")
audit_log_path = os.getenv("AUDIT_LOG_PATH", "audit.log")
service = AuditService(bootstrap_servers, audit_log_path)
def signal_handler(sig, frame):
service.shutdown()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
service.run()
if __name__ == "__main__":
main()
Output:
$ python services/audit_service.py
INFO:__main__:Loaded 0 previously seen events
INFO:__main__:Audit service started, logging all events...
INFO:__main__:AUDIT: task.created - task_id=abc123
INFO:__main__:AUDIT: task.updated - task_id=abc123
INFO:__main__:AUDIT: task.completed - task_id=abc123
$ cat audit.log
{"event_id":"evt-001","event_type":"task.created","occurred_at":"2025-01-15T10:30:00Z","data":{"task_id":"abc123","title":"Write capstone lesson","status":"pending","owner_id":null},"metadata":{"correlation_id":"req-789","source":"task-api"},"audited_at":"2025-01-15T10:30:01.000000"}
Phase 3: Validate Against Specification
The implementation is complete. Now verify each success criterion from the specification.
Validation Checklist
SC-1: Task Events Published
# Start Kafka console consumer to observe events
kubectl exec -it task-events-kafka-0 -n kafka -- \
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:30092 \
--topic task-events \
--from-beginning
# In another terminal, create a task via API
curl -X POST http://localhost:8000/tasks \
-H "Content-Type: application/json" \
-d '{"title": "Validate capstone"}'
# Console consumer shows:
{"event_type":"task.created","data":{"task_id":"xyz789","title":"Validate capstone",...}
| Criterion | Status |
|---|---|
| task.created event published | PASS |
| task.updated event published | PASS |
| task.completed event published | PASS |
| Event schema includes all required fields | PASS |
SC-2: Notification Service Consumes Events
# Start notification service
python services/notification_service.py
# Create and complete a task
curl -X POST http://localhost:8000/tasks -d '{"title": "Test notification"}'
curl -X POST http://localhost:8000/tasks/xyz789/complete
# Notification service logs:
INFO: NOTIFICATION: New task created - 'Test notification' (ID: xyz789)
INFO: NOTIFICATION: Task completed - 'Test notification' (ID: xyz789)
| Criterion | Status |
|---|---|
| Runs as separate consumer group | PASS |
| Receives task.created events | PASS |
| Receives task.completed events | PASS |
| Commits after successful processing | PASS |
SC-3: Audit Service Consumes Events
# Start audit service
python services/audit_service.py
# Create, update, and complete a task
curl -X POST http://localhost:8000/tasks -d '{"title": "Test audit"}'
curl -X PATCH http://localhost:8000/tasks/abc123 -d '{"status": "in_progress"}'
curl -X POST http://localhost:8000/tasks/abc123/complete
# Verify audit log contains all three events
cat audit.log | wc -l
3
# Restart audit service and verify deduplication
python services/audit_service.py
# Logs: "Loaded 3 previously seen events"
# No duplicate entries in audit.log
| Criterion | Status |
|---|---|
| Runs as separate consumer group | PASS |
| Receives ALL task events | PASS |
| Appends to immutable log file | PASS |
| Deduplicates by event_id | PASS |
SC-4: End-to-End Flow Verified
# Run all services simultaneously
# Terminal 1: Task API
uvicorn main:app --reload
# Terminal 2: Notification Service
python services/notification_service.py
# Terminal 3: Audit Service
python services/audit_service.py
# Terminal 4: Test the flow
curl -X POST http://localhost:8000/tasks \
-H "Content-Type: application/json" \
-d '{"title": "End-to-end test"}'
# Verify:
# - Notification service logged the creation
# - Audit service logged the creation
# - audit.log has the entry
# Check consumer lag
kubectl exec task-events-kafka-0 -n kafka -- \
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:30092 \
--describe --group notification-service
| Criterion | Status |
|---|---|
| Create task triggers both consumers | PASS |
| Consumer lag below 100 | PASS |
| Resume from correct offset after restart | PASS |
Validation Summary
All success criteria from the specification have been verified:
| Success Criteria | Result |
|---|---|
| SC-1: Task Events Published | 4/4 PASS |
| SC-2: Notification Service | 4/4 PASS |
| SC-3: Audit Service | 4/4 PASS |
| SC-4: End-to-End Flow | 3/3 PASS |
The implementation matches the specification.
What You Built
This capstone demonstrated the spec-driven development pattern for event-driven systems:
| Phase | Activity | Outcome |
|---|---|---|
| Specification | Define intent, constraints, success criteria | Clear requirements document |
| Implementation | Compose patterns from previous lessons | Working code matching spec |
| Validation | Verify each success criterion | Evidence of correctness |
The system you built includes:
- Event schema with factory methods for type-safe event creation
- Reliable publisher with idempotent producer and proper lifespan management
- FastAPI integration publishing events from API endpoints
- Notification service consuming specific event types with manual offset commits
- Audit service with deduplication for immutable event logging
These patterns compose into production event-driven architectures. The same structure scales to dozens of consumers processing millions of events.
Try With AI
Use AI to extend and refine your capstone implementation.
Prompt 1: Add Schema Registry Integration
I have a working event-driven notification system with JSON events.
I want to add Avro schemas with Schema Registry for type safety.
Here's my current TaskEvent schema (Python dataclass):
[paste TaskEvent class]
Help me:
1. Create an Avro schema that matches this structure
2. Modify the publisher to use AvroSerializer
3. Modify the consumers to use AvroDeserializer
4. Handle schema evolution (what if I add a new field later?)
Show the code changes needed for each component.
What you're learning: Schema Registry integration adds type safety and evolution management. This is the pattern production systems use to prevent schema drift between producers and consumers.
Prompt 2: Add Dead Letter Queue
My notification service might fail to process some events
(e.g., invalid task data, external service timeout).
Help me implement a dead letter queue pattern:
1. Catch processing failures in the consumer
2. Publish failed events to a 'task-events-dlq' topic
3. Include original event, error message, and retry count
4. Create a simple DLQ processor that logs failed events
Show me the modified NotificationService and the DLQ processor.
What you're learning: Production consumers need failure handling. Dead letter queues capture problematic events for investigation without blocking the main consumer.
Prompt 3: Improve the Specification
Review my event-driven notifications specification and suggest improvements:
[paste the specification from Phase 1]
Consider:
1. Are the success criteria specific enough to automate testing?
2. What edge cases are missing (network failures, duplicate events)?
3. Should the architecture include error handling components?
4. What monitoring requirements should be added?
Propose an enhanced specification with these improvements.
What you're learning: Specifications evolve through iteration. Reviewing your spec after implementation reveals gaps and improvements for future systems.
Safety Note: When testing event-driven systems, always verify consumer offsets are committed correctly before stopping services. Uncommitted offsets can cause duplicate processing on restart. Use kafka-consumer-groups.sh --describe to check consumer group state before any planned maintenance.