Skip to main content
Updated Feb 23, 2026

Capstone: Event-Driven Agent Notifications

You've spent this chapter building event-driven system components piece by piece: producers with reliability guarantees, consumers with proper offset management, Avro schemas with Schema Registry, FastAPI integration patterns, and agent event designs. Now it's time to compose these skills into a complete system.

This capstone follows the spec-driven development approach you've seen throughout this book. You'll write a specification first, then implement it by composing the patterns you've learned. This mirrors how production AI agents are built: specify intent clearly, then orchestrate implementation using accumulated skills.

The goal is practical: add event-driven notifications to the Task API from Part 6. When a task is created, updated, or completed, the system should publish events that trigger notifications and create an immutable audit trail. By the end, you'll have a working event-driven notification system that demonstrates the patterns professional teams use in production.

Phase 1: Write the Specification

Before writing any code, define precisely what you're building. A clear specification enables focused implementation and provides acceptance criteria for validation.

Specification Structure

Every capstone specification answers these questions:

SectionQuestion Answered
IntentWhat problem does this solve? What's the business value?
ConstraintsWhat boundaries must the implementation respect?
Success CriteriaHow do we know it's working correctly?
Non-GoalsWhat are we explicitly NOT building?
ArchitectureHow do components interact?

The Event-Driven Notifications Specification

Here's the specification for the notification system:

# Event-Driven Notifications Specification

## Intent

Add event-driven capabilities to the Task API that enable:
1. Decoupled notification delivery when task state changes
2. Immutable audit logging of all task lifecycle events
3. Foundation for future event consumers (reminders, analytics, integrations)

**Business Value**: The current Task API uses synchronous calls for notifications.
If the notification service is slow or unavailable, task creation blocks.
Event-driven architecture decouples these concerns.

## Constraints

### Technical Constraints
- **Kafka Cluster**: Use existing Strimzi-managed Kafka (from Lesson 4)
- **Python Client**: Use confluent-kafka-python (from Lessons 5-8)
- **Delivery Guarantee**: At-least-once for notifications (duplicates acceptable)
- **Audit Guarantee**: At-least-once with consumer deduplication
- **Schema**: JSON initially (Schema Registry optional extension)

### Operational Constraints
- **Deployment**: All services run on Docker Desktop Kubernetes
- **Resource Limits**: Single Kafka broker (development environment)
- **Topic Configuration**: Single partition (ordering within task_id)

## Success Criteria

### SC-1: Task Events Published
- [ ] When a task is created, a `task.created` event is published to `task-events` topic
- [ ] When a task is updated, a `task.updated` event is published
- [ ] When a task is completed, a `task.completed` event is published
- [ ] Each event includes: event_id, event_type, occurred_at, task data, metadata

### SC-2: Notification Service Consumes Events
- [ ] Notification service runs as separate consumer group (`notification-service`)
- [ ] Service receives task.created events and logs notification delivery
- [ ] Service receives task.completed events and logs completion notification
- [ ] Service commits offsets only after successful processing

### SC-3: Audit Service Consumes Events
- [ ] Audit service runs as separate consumer group (`audit-service`)
- [ ] Service receives ALL task events (created, updated, completed)
- [ ] Service appends each event to an immutable log file
- [ ] Service deduplicates by event_id to handle redeliveries

### SC-4: End-to-End Flow Verified
- [ ] Create task via API -> event published -> both consumers receive
- [ ] Consumer lag stays below 100 for normal operations
- [ ] Stopping and restarting consumers resumes from correct offset

## Non-Goals (What We're NOT Building)

- Real notification delivery (email, SMS, push) - we log instead
- Schema Registry integration - JSON schema sufficient for demonstration
- Multi-partition topics - single partition maintains ordering
- Exactly-once semantics - at-least-once with deduplication is sufficient
- Saga pattern implementation - covered in Lesson 17, not repeated here

## Architecture

┌─────────────────┐ ┌─────────────────────────┐ │ Task API │ │ Kafka Cluster │ │ (FastAPI) │────>│ topic: task-events │ │ │ │ partitions: 1 │ └─────────────────┘ └───────────┬─────────────┘ │ ┌───────────────┴───────────────┐ │ │ v v ┌───────────────────┐ ┌───────────────────┐ │ Notification Svc │ │ Audit Service │ │ group: notify-svc │ │ group: audit-svc │ │ events: created, │ │ events: ALL │ │ completed │ │ output: log file │ └───────────────────┘ └───────────────────┘


## Event Schema

```json
{
"event_id": "uuid",
"event_type": "task.created | task.updated | task.completed",
"occurred_at": "ISO-8601 timestamp",
"data": {
"task_id": "uuid",
"title": "string",
"status": "pending | in_progress | completed",
"owner_id": "uuid (optional)"
},
"metadata": {
"correlation_id": "uuid (request trace)",
"source": "task-api"
}
}

### Why This Specification Matters

Notice what the specification provides:

1. **Clear intent**: Not "add Kafka to Task API" but "enable decoupled notifications with audit trail"
2. **Measurable criteria**: Each success criterion is a checkbox that can be verified
3. **Explicit boundaries**: Non-goals prevent scope creep
4. **Visual architecture**: The diagram clarifies component relationships

When you work with AI to implement this specification, you have shared understanding of what "done" looks like.

## Phase 2: Implement the Specification

Now implement the specification by composing patterns from earlier lessons. This phase demonstrates the core skill of spec-driven development: translating clear requirements into working code.

### Step 1: Create Event Schema and Publisher

First, implement the event schema and producer integration for the Task API.

Create `events/schemas.py`:

```python
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from typing import Optional
from uuid import uuid4
import json

@dataclass
class TaskData:
"""Task information included in events."""
task_id: str
title: str
status: str
owner_id: Optional[str] = None

@dataclass
class EventMetadata:
"""Metadata for event tracing and debugging."""
correlation_id: str
source: str = "task-api"

@dataclass
class TaskEvent:
"""Base event schema for task lifecycle events."""
event_type: str
data: TaskData
metadata: EventMetadata
event_id: str = field(default_factory=lambda: str(uuid4()))
occurred_at: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)

def to_json(self) -> str:
"""Serialize event to JSON string."""
return json.dumps(asdict(self))

@classmethod
def task_created(
cls,
task_id: str,
title: str,
correlation_id: str,
owner_id: Optional[str] = None
) -> "TaskEvent":
"""Factory for task.created events."""
return cls(
event_type="task.created",
data=TaskData(
task_id=task_id,
title=title,
status="pending",
owner_id=owner_id
),
metadata=EventMetadata(correlation_id=correlation_id)
)

@classmethod
def task_updated(
cls,
task_id: str,
title: str,
status: str,
correlation_id: str,
owner_id: Optional[str] = None
) -> "TaskEvent":
"""Factory for task.updated events."""
return cls(
event_type="task.updated",
data=TaskData(
task_id=task_id,
title=title,
status=status,
owner_id=owner_id
),
metadata=EventMetadata(correlation_id=correlation_id)
)

@classmethod
def task_completed(
cls,
task_id: str,
title: str,
correlation_id: str,
owner_id: Optional[str] = None
) -> "TaskEvent":
"""Factory for task.completed events."""
return cls(
event_type="task.completed",
data=TaskData(
task_id=task_id,
title=title,
status="completed",
owner_id=owner_id
),
metadata=EventMetadata(correlation_id=correlation_id)
)

Output:

>>> from events.schemas import TaskEvent
>>> event = TaskEvent.task_created("task-123", "Buy groceries", "req-456")
>>> print(event.to_json())
{"event_type": "task.created", "data": {"task_id": "task-123", "title": "Buy groceries", "status": "pending", "owner_id": null}, "metadata": {"correlation_id": "req-456", "source": "task-api"}, "event_id": "a1b2c3d4-...", "occurred_at": "2025-01-15T10:30:00.000000+00:00"}

Step 2: Implement Event Publisher

Create a reliable publisher that integrates with FastAPI's lifespan.

Create events/publisher.py:

from confluent_kafka import Producer
from typing import Optional, Callable
import logging

from .schemas import TaskEvent

logger = logging.getLogger(__name__)

class EventPublisher:
"""Reliable event publisher for Task API events."""

def __init__(self, bootstrap_servers: str):
self.producer = Producer({
'bootstrap.servers': bootstrap_servers,
'client.id': 'task-api-publisher',
'acks': 'all',
'enable.idempotence': True,
'max.in.flight.requests.per.connection': 5,
'retries': 2147483647,
'delivery.timeout.ms': 30000
})
self.topic = 'task-events'

def _delivery_callback(self, err, msg):
"""Handle delivery result."""
if err is not None:
logger.error(
f"Event delivery failed: {err}, "
f"key={msg.key()}, topic={msg.topic()}"
)
else:
logger.info(
f"Event delivered: topic={msg.topic()}, "
f"partition={msg.partition()}, offset={msg.offset()}"
)

def publish(self, event: TaskEvent) -> None:
"""Publish event to Kafka topic."""
self.producer.produce(
topic=self.topic,
key=event.data.task_id,
value=event.to_json(),
callback=self._delivery_callback
)
# Process callbacks without blocking
self.producer.poll(0)

def flush(self) -> None:
"""Ensure all pending events are delivered."""
remaining = self.producer.flush(timeout=10)
if remaining > 0:
logger.warning(f"{remaining} events not delivered on flush")

# Global publisher instance (initialized in lifespan)
_publisher: Optional[EventPublisher] = None

def get_publisher() -> EventPublisher:
"""Get the global publisher instance."""
if _publisher is None:
raise RuntimeError("Publisher not initialized. Check lifespan setup.")
return _publisher

def init_publisher(bootstrap_servers: str) -> EventPublisher:
"""Initialize the global publisher."""
global _publisher
_publisher = EventPublisher(bootstrap_servers)
return _publisher

def shutdown_publisher() -> None:
"""Shutdown the global publisher."""
global _publisher
if _publisher is not None:
_publisher.flush()
_publisher = None

Output:

>>> from events.publisher import init_publisher, get_publisher
>>> from events.schemas import TaskEvent
>>> publisher = init_publisher("localhost:30092")
>>> event = TaskEvent.task_created("task-789", "Review PR", "req-111")
>>> publisher.publish(event)
>>> publisher.flush()
INFO:events.publisher:Event delivered: topic=task-events, partition=0, offset=42

Step 3: Integrate with FastAPI

Connect the publisher to FastAPI endpoints.

Update main.py:

from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
from uuid import uuid4
import os

from events.publisher import init_publisher, shutdown_publisher, get_publisher
from events.schemas import TaskEvent

@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifespan for Kafka producer."""
bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:30092")
init_publisher(bootstrap_servers)
yield
shutdown_publisher()

app = FastAPI(title="Task API", lifespan=lifespan)

class TaskCreate(BaseModel):
title: str
owner_id: Optional[str] = None

class TaskUpdate(BaseModel):
title: Optional[str] = None
status: Optional[str] = None

class Task(BaseModel):
id: str
title: str
status: str
owner_id: Optional[str] = None

# In-memory task storage (replace with database in production)
tasks: dict[str, Task] = {}

@app.post("/tasks", response_model=Task)
async def create_task(task_in: TaskCreate):
"""Create a new task and publish task.created event."""
correlation_id = str(uuid4())
task_id = str(uuid4())

task = Task(
id=task_id,
title=task_in.title,
status="pending",
owner_id=task_in.owner_id
)
tasks[task_id] = task

# Publish event
event = TaskEvent.task_created(
task_id=task_id,
title=task.title,
correlation_id=correlation_id,
owner_id=task.owner_id
)
get_publisher().publish(event)

return task

@app.patch("/tasks/{task_id}", response_model=Task)
async def update_task(task_id: str, task_in: TaskUpdate):
"""Update a task and publish task.updated event."""
if task_id not in tasks:
raise HTTPException(status_code=404, detail="Task not found")

correlation_id = str(uuid4())
task = tasks[task_id]

if task_in.title is not None:
task.title = task_in.title
if task_in.status is not None:
task.status = task_in.status

# Publish event
event = TaskEvent.task_updated(
task_id=task_id,
title=task.title,
status=task.status,
correlation_id=correlation_id,
owner_id=task.owner_id
)
get_publisher().publish(event)

return task

@app.post("/tasks/{task_id}/complete", response_model=Task)
async def complete_task(task_id: str):
"""Complete a task and publish task.completed event."""
if task_id not in tasks:
raise HTTPException(status_code=404, detail="Task not found")

correlation_id = str(uuid4())
task = tasks[task_id]
task.status = "completed"

# Publish event
event = TaskEvent.task_completed(
task_id=task_id,
title=task.title,
correlation_id=correlation_id,
owner_id=task.owner_id
)
get_publisher().publish(event)

return task

Output:

$ curl -X POST http://localhost:8000/tasks \
-H "Content-Type: application/json" \
-d '{"title": "Write capstone lesson"}'

{"id":"abc123","title":"Write capstone lesson","status":"pending","owner_id":null}

# In the Task API logs:
INFO:events.publisher:Event delivered: topic=task-events, partition=0, offset=43

Step 4: Implement Notification Service

Create a consumer that processes task events for notifications.

Create services/notification_service.py:

from confluent_kafka import Consumer, KafkaError
import json
import logging
import signal
import sys

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class NotificationService:
"""Consume task events and deliver notifications."""

def __init__(self, bootstrap_servers: str):
self.consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'group.id': 'notification-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Manual commit after processing
})
self.running = True
self.subscribed_events = {'task.created', 'task.completed'}

def process_event(self, event: dict) -> bool:
"""Process a single event. Returns True if successful."""
event_type = event.get('event_type')

if event_type not in self.subscribed_events:
# Skip events we don't handle
return True

task_data = event.get('data', {})
task_id = task_data.get('task_id')
title = task_data.get('title')

if event_type == 'task.created':
logger.info(
f"NOTIFICATION: New task created - '{title}' (ID: {task_id})"
)
# In production: send email, push notification, etc.

elif event_type == 'task.completed':
logger.info(
f"NOTIFICATION: Task completed - '{title}' (ID: {task_id})"
)
# In production: send completion notification

return True

def run(self):
"""Main consumer loop."""
self.consumer.subscribe(['task-events'])
logger.info("Notification service started, waiting for events...")

while self.running:
msg = self.consumer.poll(timeout=1.0)

if msg is None:
continue

if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
logger.error(f"Consumer error: {msg.error()}")
continue

try:
event = json.loads(msg.value().decode('utf-8'))
if self.process_event(event):
# Commit only after successful processing
self.consumer.commit(message=msg)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in message: {e}")
# Commit to skip malformed message
self.consumer.commit(message=msg)

def shutdown(self):
"""Graceful shutdown."""
logger.info("Shutting down notification service...")
self.running = False
self.consumer.close()

def main():
import os
bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:30092")

service = NotificationService(bootstrap_servers)

# Handle graceful shutdown
def signal_handler(sig, frame):
service.shutdown()
sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

service.run()

if __name__ == "__main__":
main()

Output:

$ python services/notification_service.py
INFO:__main__:Notification service started, waiting for events...
INFO:__main__:NOTIFICATION: New task created - 'Write capstone lesson' (ID: abc123)
INFO:__main__:NOTIFICATION: Task completed - 'Write capstone lesson' (ID: abc123)

Step 5: Implement Audit Service

Create a consumer that logs all events to an immutable audit trail.

Create services/audit_service.py:

from confluent_kafka import Consumer, KafkaError
import json
import logging
import signal
import sys
from pathlib import Path
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class AuditService:
"""Consume ALL task events and write to immutable audit log."""

def __init__(self, bootstrap_servers: str, audit_log_path: str = "audit.log"):
self.consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'group.id': 'audit-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False
})
self.running = True
self.audit_log_path = Path(audit_log_path)
self.seen_events: set[str] = set()
self._load_seen_events()

def _load_seen_events(self):
"""Load previously seen event IDs for deduplication."""
if self.audit_log_path.exists():
with open(self.audit_log_path, 'r') as f:
for line in f:
try:
entry = json.loads(line)
self.seen_events.add(entry.get('event_id'))
except json.JSONDecodeError:
continue
logger.info(f"Loaded {len(self.seen_events)} previously seen events")

def process_event(self, event: dict) -> bool:
"""Process event and append to audit log. Returns True if successful."""
event_id = event.get('event_id')

# Deduplicate: skip if we've seen this event
if event_id in self.seen_events:
logger.debug(f"Skipping duplicate event: {event_id}")
return True

# Create audit entry
audit_entry = {
'event_id': event_id,
'event_type': event.get('event_type'),
'occurred_at': event.get('occurred_at'),
'data': event.get('data'),
'metadata': event.get('metadata'),
'audited_at': datetime.utcnow().isoformat()
}

# Append to audit log (immutable append-only)
with open(self.audit_log_path, 'a') as f:
f.write(json.dumps(audit_entry) + '\n')

self.seen_events.add(event_id)
logger.info(
f"AUDIT: {event.get('event_type')} - "
f"task_id={event.get('data', {}).get('task_id')}"
)
return True

def run(self):
"""Main consumer loop."""
self.consumer.subscribe(['task-events'])
logger.info("Audit service started, logging all events...")

while self.running:
msg = self.consumer.poll(timeout=1.0)

if msg is None:
continue

if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
logger.error(f"Consumer error: {msg.error()}")
continue

try:
event = json.loads(msg.value().decode('utf-8'))
if self.process_event(event):
self.consumer.commit(message=msg)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in message: {e}")
self.consumer.commit(message=msg)

def shutdown(self):
"""Graceful shutdown."""
logger.info("Shutting down audit service...")
self.running = False
self.consumer.close()

def main():
import os
bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:30092")
audit_log_path = os.getenv("AUDIT_LOG_PATH", "audit.log")

service = AuditService(bootstrap_servers, audit_log_path)

def signal_handler(sig, frame):
service.shutdown()
sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

service.run()

if __name__ == "__main__":
main()

Output:

$ python services/audit_service.py
INFO:__main__:Loaded 0 previously seen events
INFO:__main__:Audit service started, logging all events...
INFO:__main__:AUDIT: task.created - task_id=abc123
INFO:__main__:AUDIT: task.updated - task_id=abc123
INFO:__main__:AUDIT: task.completed - task_id=abc123

$ cat audit.log
{"event_id":"evt-001","event_type":"task.created","occurred_at":"2025-01-15T10:30:00Z","data":{"task_id":"abc123","title":"Write capstone lesson","status":"pending","owner_id":null},"metadata":{"correlation_id":"req-789","source":"task-api"},"audited_at":"2025-01-15T10:30:01.000000"}

Phase 3: Validate Against Specification

The implementation is complete. Now verify each success criterion from the specification.

Validation Checklist

SC-1: Task Events Published

# Start Kafka console consumer to observe events
kubectl exec -it task-events-kafka-0 -n kafka -- \
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:30092 \
--topic task-events \
--from-beginning

# In another terminal, create a task via API
curl -X POST http://localhost:8000/tasks \
-H "Content-Type: application/json" \
-d '{"title": "Validate capstone"}'

# Console consumer shows:
{"event_type":"task.created","data":{"task_id":"xyz789","title":"Validate capstone",...}
CriterionStatus
task.created event publishedPASS
task.updated event publishedPASS
task.completed event publishedPASS
Event schema includes all required fieldsPASS

SC-2: Notification Service Consumes Events

# Start notification service
python services/notification_service.py

# Create and complete a task
curl -X POST http://localhost:8000/tasks -d '{"title": "Test notification"}'
curl -X POST http://localhost:8000/tasks/xyz789/complete

# Notification service logs:
INFO: NOTIFICATION: New task created - 'Test notification' (ID: xyz789)
INFO: NOTIFICATION: Task completed - 'Test notification' (ID: xyz789)
CriterionStatus
Runs as separate consumer groupPASS
Receives task.created eventsPASS
Receives task.completed eventsPASS
Commits after successful processingPASS

SC-3: Audit Service Consumes Events

# Start audit service
python services/audit_service.py

# Create, update, and complete a task
curl -X POST http://localhost:8000/tasks -d '{"title": "Test audit"}'
curl -X PATCH http://localhost:8000/tasks/abc123 -d '{"status": "in_progress"}'
curl -X POST http://localhost:8000/tasks/abc123/complete

# Verify audit log contains all three events
cat audit.log | wc -l
3

# Restart audit service and verify deduplication
python services/audit_service.py
# Logs: "Loaded 3 previously seen events"
# No duplicate entries in audit.log
CriterionStatus
Runs as separate consumer groupPASS
Receives ALL task eventsPASS
Appends to immutable log filePASS
Deduplicates by event_idPASS

SC-4: End-to-End Flow Verified

# Run all services simultaneously
# Terminal 1: Task API
uvicorn main:app --reload

# Terminal 2: Notification Service
python services/notification_service.py

# Terminal 3: Audit Service
python services/audit_service.py

# Terminal 4: Test the flow
curl -X POST http://localhost:8000/tasks \
-H "Content-Type: application/json" \
-d '{"title": "End-to-end test"}'

# Verify:
# - Notification service logged the creation
# - Audit service logged the creation
# - audit.log has the entry

# Check consumer lag
kubectl exec task-events-kafka-0 -n kafka -- \
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:30092 \
--describe --group notification-service
CriterionStatus
Create task triggers both consumersPASS
Consumer lag below 100PASS
Resume from correct offset after restartPASS

Validation Summary

All success criteria from the specification have been verified:

Success CriteriaResult
SC-1: Task Events Published4/4 PASS
SC-2: Notification Service4/4 PASS
SC-3: Audit Service4/4 PASS
SC-4: End-to-End Flow3/3 PASS

The implementation matches the specification.

What You Built

This capstone demonstrated the spec-driven development pattern for event-driven systems:

PhaseActivityOutcome
SpecificationDefine intent, constraints, success criteriaClear requirements document
ImplementationCompose patterns from previous lessonsWorking code matching spec
ValidationVerify each success criterionEvidence of correctness

The system you built includes:

  • Event schema with factory methods for type-safe event creation
  • Reliable publisher with idempotent producer and proper lifespan management
  • FastAPI integration publishing events from API endpoints
  • Notification service consuming specific event types with manual offset commits
  • Audit service with deduplication for immutable event logging

These patterns compose into production event-driven architectures. The same structure scales to dozens of consumers processing millions of events.

Try With AI

Use AI to extend and refine your capstone implementation.

Prompt 1: Add Schema Registry Integration

I have a working event-driven notification system with JSON events.
I want to add Avro schemas with Schema Registry for type safety.

Here's my current TaskEvent schema (Python dataclass):
[paste TaskEvent class]

Help me:
1. Create an Avro schema that matches this structure
2. Modify the publisher to use AvroSerializer
3. Modify the consumers to use AvroDeserializer
4. Handle schema evolution (what if I add a new field later?)

Show the code changes needed for each component.

What you're learning: Schema Registry integration adds type safety and evolution management. This is the pattern production systems use to prevent schema drift between producers and consumers.


Prompt 2: Add Dead Letter Queue

My notification service might fail to process some events
(e.g., invalid task data, external service timeout).

Help me implement a dead letter queue pattern:
1. Catch processing failures in the consumer
2. Publish failed events to a 'task-events-dlq' topic
3. Include original event, error message, and retry count
4. Create a simple DLQ processor that logs failed events

Show me the modified NotificationService and the DLQ processor.

What you're learning: Production consumers need failure handling. Dead letter queues capture problematic events for investigation without blocking the main consumer.


Prompt 3: Improve the Specification

Review my event-driven notifications specification and suggest improvements:

[paste the specification from Phase 1]

Consider:
1. Are the success criteria specific enough to automate testing?
2. What edge cases are missing (network failures, duplicate events)?
3. Should the architecture include error handling components?
4. What monitoring requirements should be added?

Propose an enhanced specification with these improvements.

What you're learning: Specifications evolve through iteration. Reviewing your spec after implementation reveals gaps and improvements for future systems.


Safety Note: When testing event-driven systems, always verify consumer offsets are committed correctly before stopping services. Uncommitted offsets can cause duplicate processing on restart. Use kafka-consumer-groups.sh --describe to check consumer group state before any planned maintenance.