Authentication and Security
Your TaskManager chat works—users can create tasks, agents respond conversationally, history persists. But there's a critical problem: anyone can access any conversation. User A could read User B's sensitive task data. There's no authentication, no access control, no security boundary between tenants.
This isn't just a technical gap—it's a business blocker. SaaS applications require multi-tenant isolation. Healthcare, finance, legal domains demand strict data segregation. Without authentication, your ChatKit server is a prototype, not a product.
In this lesson, you'll secure your ChatKit server using RequestContext validation, JWT/JWKS authentication, and thread ownership enforcement. More importantly, you'll extract these patterns into a chatkit-auth-security skill—reusable intelligence you can apply across projects.
The Security Problem
Current State: Unprotected Access
# chatkit_server.py (INSECURE - current state)
from chatkit.server import ChatKitServer
from chatkit.types import ThreadMetadata, UserMessageItem
class TaskManagerServer(ChatKitServer):
async def respond(
self,
thread: ThreadMetadata,
input: UserMessageItem | None,
context: Any, # ⚠️ Context not validated
) -> AsyncIterator[ThreadStreamEvent]:
# ⚠️ No user identity check
# ⚠️ No thread ownership check
# ⚠️ Anyone can access any thread_id
thread_metadata = await self.store.load_thread(thread.id, context)
# Responds to ANY request with thread data
...
Threat Model:
| Threat | Attack Vector | Impact |
|---|---|---|
| Unauthorized Thread Access | User A sends request with User B's thread_id | User A reads User B's private tasks |
| Thread Hijacking | Attacker iterates thread IDs, finds valid ones | Mass data exfiltration across tenants |
| Metadata Tampering | Client sends {"user_id": "admin"} in context | Privilege escalation, impersonation |
| Missing Authentication | No token validation on requests | Anonymous access, no identity verification |
| Cross-Tenant Leakage | Thread IDs not scoped to users | User data visible across organizational boundaries |
Target State: Defense in Depth
# chatkit_server.py (SECURE - target state)
class TaskManagerServer(ChatKitServer):
async def respond(
self,
thread: ThreadMetadata,
input: UserMessageItem | None,
context: RequestContext, # ✓ Typed as RequestContext
) -> AsyncIterator[ThreadStreamEvent]:
# ✓ Layer 1: Validate user identity from context
user_id = self._validate_user(context)
# ✓ Layer 2: Verify thread ownership
await self._verify_thread_access(thread.id, user_id)
# ✓ Layer 3: Enforce tenant isolation via RequestContext
thread_metadata = await self.store.load_thread(
thread.id,
context # Contains user_id for access control
)
...
Security Layers:
- Identity Verification: Validate JWT token, extract user claims
- Access Control: Check thread belongs to requesting user
- Tenant Isolation: Scope all data queries to authenticated user
RequestContext Validation
Understanding RequestContext
RequestContext is ChatKit's identity container. It flows from frontend → HTTP proxy → ChatKit server:
// Frontend (React)
const { useChatKit } = require("@openai/chatkit-react");
function ChatUI() {
const kit = useChatKit({
baseUrl: "/api/chatkit",
// Custom fetch injects auth headers + page context
fetch: async (url, options) => {
const token = await getAccessToken(); // From auth provider
return fetch(url, {
...options,
headers: {
...options.headers,
Authorization: `Bearer ${token}`,
"X-Page-Context": JSON.stringify({ page: "tasks" }),
},
});
},
});
return <ChatKitUI chatKit={kit} />;
}
Output:
POST /api/chatkit/respond HTTP/1.1
Authorization: Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...
X-Page-Context: {"page":"tasks"}
Content-Type: application/json
{"thread_id": "thread_abc123", "message": "Create task..."}
Backend: Extracting Identity
# chatkit_server.py
from chatkit.server import ChatKitServer
from chatkit.types import RequestContext
from typing import Any
import jwt
from jwt import PyJWKClient
class TaskManagerServer(ChatKitServer):
def __init__(self, jwks_url: str, **kwargs):
super().__init__(**kwargs)
# JWKS client for public key retrieval
self.jwks_client = PyJWKClient(jwks_url)
def _validate_user(self, context: RequestContext) -> str:
"""
Extract and validate user identity from RequestContext.
Args:
context: RequestContext with headers/metadata from request
Returns:
user_id: Validated user identifier
Raises:
PermissionError: If token invalid or missing
"""
# Extract Authorization header
auth_header = context.request_headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise PermissionError("Missing or invalid Authorization header")
token = auth_header.removeprefix("Bearer ")
try:
# Get signing key from JWKS endpoint
signing_key = self.jwks_client.get_signing_key_from_jwt(token)
# Verify and decode JWT
payload = jwt.decode(
token,
signing_key.key,
algorithms=["RS256"],
options={"verify_exp": True, "verify_aud": False}
)
# Extract user ID from claims
user_id = payload.get("sub") # 'sub' = subject claim
if not user_id:
raise PermissionError("Token missing 'sub' claim")
return user_id
except jwt.ExpiredSignatureError:
raise PermissionError("Token expired")
except jwt.InvalidTokenError as e:
raise PermissionError(f"Invalid token: {e}")
Output (successful validation):
user_id = _validate_user(context)
# user_id = "user_abc123" (extracted from JWT 'sub' claim)
Output (failed validation):
try:
user_id = _validate_user(context)
except PermissionError as e:
# Log: "Invalid token: Signature verification failed"
# Return 401 Unauthorized to client
What's Happening
- Extract Token: Get
Authorization: Bearer <token>from request headers - Fetch Public Key: Retrieve signing key from JWKS URL (e.g.,
https://auth.example.com/.well-known/jwks.json) - Verify Signature: Ensure token signed by trusted auth server (Better Auth, Auth0, etc.)
- Check Expiration: Reject expired tokens (
expclaim) - Extract Identity: Get user ID from
sub(subject) claim
Why JWKS? Public key rotation support. Auth server can rotate signing keys without server reconfiguration.
Thread Ownership Enforcement
Database Schema for Ownership
# models.py
from sqlmodel import SQLModel, Field
from datetime import datetime
class Thread(SQLModel, table=True):
id: str = Field(primary_key=True)
user_id: str = Field(index=True) # ← Owner of this thread
created_at: datetime = Field(default_factory=datetime.utcnow)
metadata: dict = Field(default_factory=dict)
class ThreadItem(SQLModel, table=True):
id: str = Field(primary_key=True)
thread_id: str = Field(foreign_key="thread.id", index=True)
user_id: str # ← Redundant but fast for queries
role: str # "user" | "assistant" | "tool"
content: str
created_at: datetime = Field(default_factory=datetime.utcnow)
Key Design:
user_idcolumn onThreadtable for ownershipuser_idcolumn onThreadItemfor scoped queries (denormalized for performance)- Index on
user_idfor fast filtering
Access Control Check
# chatkit_server.py
class TaskManagerServer(ChatKitServer):
async def _verify_thread_access(self, thread_id: str, user_id: str):
"""
Verify authenticated user owns the requested thread.
Args:
thread_id: Thread being accessed
user_id: Authenticated user from JWT
Raises:
PermissionError: If user doesn't own thread
"""
async with self.db.session() as session:
result = await session.exec(
select(Thread).where(
Thread.id == thread_id,
Thread.user_id == user_id # ← Ownership check
)
)
thread = result.first()
if not thread:
# Thread doesn't exist OR user doesn't own it
raise PermissionError(
f"Thread {thread_id} not found or access denied"
)
async def respond(
self,
thread: ThreadMetadata,
input: UserMessageItem | None,
context: RequestContext,
) -> AsyncIterator[ThreadStreamEvent]:
# Layer 1: Validate identity
user_id = self._validate_user(context)
# Layer 2: Verify access BEFORE loading data
await self._verify_thread_access(thread.id, user_id)
# Layer 3: Scoped data retrieval
thread_items = await self._get_user_thread_items(thread.id, user_id)
# Safe to proceed - user owns this thread
async for event in self._generate_response(thread_items, input):
yield event
Output (authorized access):
# User "user_abc" accessing their own thread "thread_123"
await _verify_thread_access("thread_123", "user_abc")
# ✓ Check passes, continues execution
Output (unauthorized access):
# User "user_abc" trying to access "user_xyz"'s thread
try:
await _verify_thread_access("thread_789", "user_abc")
except PermissionError as e:
# ❌ "Thread thread_789 not found or access denied"
# Return 403 Forbidden to client
Why This Works
Attack Scenario: User A tries to access User B's thread
POST /api/chatkit/respond
Authorization: Bearer <user_a_token>
{"thread_id": "thread_belonging_to_user_b"}
Defense in Depth:
- JWT validated → extracts
user_id = "user_a" - Access check queries:
SELECT * FROM threads WHERE id = 'thread_b' AND user_id = 'user_a' - Query returns 0 rows (thread exists but different owner)
PermissionErrorraised → 403 Forbidden response- User A never sees User B's data
Metadata Validation
The Tampering Threat
// MALICIOUS CLIENT (attacker's code)
const kit = useChatKit({
fetch: async (url, options) => {
return fetch(url, {
...options,
headers: {
Authorization: "Bearer <valid_user_token>",
// ⚠️ Attacker injects fake metadata
"X-User-Context": JSON.stringify({ role: "admin", user_id: "admin_user" }),
},
});
},
});
Problem: If backend trusts metadata from headers without validation, attacker escalates privileges.
Secure Pattern: Trust Only JWT Claims
class TaskManagerServer(ChatKitServer):
def _validate_user(self, context: RequestContext) -> dict:
"""
Extract user identity AND roles from VERIFIED JWT only.
NEVER trust client-sent metadata for identity/permissions.
"""
# ... JWT validation code ...
payload = jwt.decode(token, signing_key.key, algorithms=["RS256"])
# Extract from JWT claims (signed by auth server)
return {
"user_id": payload["sub"],
"email": payload.get("email"),
"roles": payload.get("roles", []), # From token, not headers
}
def _get_page_context(self, context: RequestContext) -> dict:
"""
Extract non-sensitive metadata (page context, UI state).
OK to trust for UX, NOT for authorization.
"""
page_header = context.request_headers.get("X-Page-Context", "{}")
return json.loads(page_header)
async def respond(self, thread, input, context):
# Identity from JWT (trusted)
user_info = self._validate_user(context)
user_id = user_info["user_id"]
# UI context from headers (untrusted for auth, OK for UX)
page_context = self._get_page_context(context)
# Use user_id from JWT for access control
await self._verify_thread_access(thread.id, user_id)
# Use page_context for agent instructions (UX enhancement)
agent_instructions = f"User is on {page_context.get('page', 'unknown')} page"
...
Output (attack mitigated):
# Attacker sends: X-User-Context: {"role": "admin"}
# Backend ignores header, uses JWT:
user_info = _validate_user(context)
# user_info = {"user_id": "regular_user", "roles": ["user"]}
# Access check uses "regular_user", not attacker's fake "admin"
Security Rule: Identity/authorization from JWT only. UI context from headers OK for UX, never for access control.
Extract Your chatkit-auth-security Skill
You've implemented JWT validation, thread ownership enforcement, and metadata validation. These security patterns will recur across every production ChatKit server. Time to harvest this intelligence.
Layer 3 trigger: Pattern recurs 2+ times across projects → Create reusable intelligence
Step: Extract the Skill
Copy and paste this prompt:
Using your skill-creator skill, study the authentication and security
implementation code I just wrote in this lesson and extract it into a
reusable chatkit-auth-security skill.
Encapsulate all 5 patterns: JWKS public key fetching, claim extraction,
error handling, Better Auth integration, and multi-tenant isolation.
Claude will:
- Review the 5 security patterns from this lesson
- Use
skill-creatorto generate the skill structure - Extract patterns: JWKS, claims, errors, Better Auth, isolation
- Create reference docs with threat mitigation strategies
- Follow canonical format from building-chat-interfaces
Your skill appears at .claude/skills/chatkit-auth-security/.
Integration: Secure TaskManager Server
# chatkit_server.py (production-ready with auth)
from chatkit.server import ChatKitServer
from chatkit.types import RequestContext, ThreadMetadata, UserMessageItem
from typing import AsyncIterator, Any
from jwt import PyJWKClient
import jwt
from sqlmodel import select
class TaskManagerServer(ChatKitServer):
def __init__(self, jwks_url: str, db_url: str, **kwargs):
super().__init__(**kwargs)
self.jwks_client = PyJWKClient(jwks_url)
self.db_url = db_url
def _validate_user(self, context: RequestContext) -> str:
"""Extract and validate user ID from JWT."""
auth_header = context.request_headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
raise PermissionError("Missing Authorization header")
token = auth_header.removeprefix("Bearer ")
signing_key = self.jwks_client.get_signing_key_from_jwt(token)
payload = jwt.decode(token, signing_key.key, algorithms=["RS256"])
user_id = payload.get("sub")
if not user_id:
raise PermissionError("Token missing 'sub' claim")
return user_id
async def _verify_thread_access(self, thread_id: str, user_id: str):
"""Verify authenticated user owns the thread."""
async with self.db.session() as session:
result = await session.exec(
select(Thread).where(
Thread.id == thread_id,
Thread.user_id == user_id
)
)
if not result.first():
raise PermissionError("Thread not found or access denied")
async def respond(
self,
thread: ThreadMetadata,
input: UserMessageItem | None,
context: RequestContext,
) -> AsyncIterator[ThreadStreamEvent]:
# Security layers
user_id = self._validate_user(context)
await self._verify_thread_access(thread.id, user_id)
# Scoped data retrieval
thread_items = await self._get_user_thread_items(thread.id, user_id)
# Agent execution (secure - user owns this thread)
agent = Agent(
name="TaskManager",
instructions=f"User ID: {user_id}. Help manage their tasks."
)
result = Runner.run_streamed(agent, input.content)
async for event in stream_agent_response(context, result):
yield event
Output (successful auth flow):
1. Client sends: Authorization: Bearer <valid_token>
2. Backend validates JWT → extracts user_id = "user_abc"
3. Backend checks: Thread "thread_123" owned by "user_abc"? → Yes
4. Backend loads thread items for "user_abc" only
5. Agent responds with scoped data
Output (blocked attack):
1. Attacker sends: Authorization: Bearer <user_a_token>
2. Attacker requests: thread_id = "user_b_thread"
3. Backend validates JWT → user_id = "user_a"
4. Backend checks: Thread "user_b_thread" owned by "user_a"? → No
5. Backend returns: 403 Forbidden "Thread not found or access denied"
6. Attacker never sees user_b's data
Security Threat Mitigation Summary
| Threat | Mitigation | Verification |
|---|---|---|
| Unauthorized Thread Access | _verify_thread_access() checks ownership before loading data | Test: User A cannot access User B's thread (403 response) |
| Thread Hijacking | Thread IDs scoped to user_id in all queries | Test: Iterate thread IDs, all return 403 except owned threads |
| Metadata Tampering | Identity from JWT sub claim only, ignore client headers | Test: Send fake user_id in header, access still scoped to JWT user |
| Missing Authentication | _validate_user() required for all requests | Test: Request without token → 401 Unauthorized |
| Cross-Tenant Leakage | user_id column on Thread + ThreadItem tables | Test: Query returns 0 rows for other users' threads |
Safety Note
CRITICAL: Never store sensitive data (passwords, API keys, PII) in thread metadata without encryption. Metadata is JSON-serialized and may be logged. Use metadata for UI state (page context, preferences), not secrets.
Recommended: Store sensitive associations (user_id → encrypted_secrets) in separate table with row-level encryption. Reference by ID in thread metadata if needed.
Try With AI
Apply your chatkit-auth-security skill to real security challenges. These prompts build your authentication pattern recognition.
Prompt 1: Implement Rate Limiting
Setup: Open your chatkit_server.py
Prompt:
Using the chatkit-auth-security skill patterns, add rate limiting:
- Max 10 requests per minute per user_id
- Return 429 Too Many Requests if exceeded
- Store counters in Redis or in-memory dict
- Reset counters every 60 seconds
Show the middleware implementation.
What you're learning: Security isn't just authentication—it's also abuse prevention. Rate limiting by authenticated user prevents DoS attacks and ensures fair resource allocation.
Expected: Middleware that extracts user_id from JWT, checks request count, enforces limit.
Prompt 2: Audit Logging
Setup: Review your _verify_thread_access() function
Prompt:
Add security audit logging for thread access:
- Log every thread access attempt (user_id, thread_id, timestamp, success/failure)
- Include IP address from RequestContext
- Write to structured log file (JSON format)
- Log suspicious patterns (rapid thread ID iteration)
Show the logging integration.
What you're learning: Audit trails are critical for security forensics. When an attack happens, logs tell you what data was accessed and by whom.
Expected: Logging calls that capture access events with structured data for analysis.
Prompt 3: Role-Based Access Control (RBAC)
Setup: Consider multi-role scenarios (admin, user, guest)
Prompt:
Extend the authentication pattern to support roles:
- Extract 'roles' claim from JWT (e.g., ["admin", "user"])
- Admin users can access ANY thread (support/debugging)
- Regular users can only access their own threads
- Guest users have read-only access
Modify _verify_thread_access() to handle role-based permissions.
What you're learning: Authorization isn't binary. Real systems have nuanced permissions (admin overrides, read-only access, hierarchical roles). RBAC patterns scale to enterprise complexity.
Expected: Permission logic that checks role before enforcing ownership constraints.
Next Lesson: Integrate your secure ChatKit server with React using useChatKit, building a production-ready UI with authentication, streaming responses, and error handling.