Skip to main content

Authentication and Security

Your TaskManager chat works—users can create tasks, agents respond conversationally, history persists. But there's a critical problem: anyone can access any conversation. User A could read User B's sensitive task data. There's no authentication, no access control, no security boundary between tenants.

This isn't just a technical gap—it's a business blocker. SaaS applications require multi-tenant isolation. Healthcare, finance, legal domains demand strict data segregation. Without authentication, your ChatKit server is a prototype, not a product.

In this lesson, you'll secure your ChatKit server using RequestContext validation, JWT/JWKS authentication, and thread ownership enforcement. More importantly, you'll extract these patterns into a chatkit-auth-security skill—reusable intelligence you can apply across projects.

The Security Problem

Current State: Unprotected Access

# chatkit_server.py (INSECURE - current state)
from chatkit.server import ChatKitServer
from chatkit.types import ThreadMetadata, UserMessageItem

class TaskManagerServer(ChatKitServer):
async def respond(
self,
thread: ThreadMetadata,
input: UserMessageItem | None,
context: Any, # ⚠️ Context not validated
) -> AsyncIterator[ThreadStreamEvent]:
# ⚠️ No user identity check
# ⚠️ No thread ownership check
# ⚠️ Anyone can access any thread_id

thread_metadata = await self.store.load_thread(thread.id, context)
# Responds to ANY request with thread data
...

Threat Model:

ThreatAttack VectorImpact
Unauthorized Thread AccessUser A sends request with User B's thread_idUser A reads User B's private tasks
Thread HijackingAttacker iterates thread IDs, finds valid onesMass data exfiltration across tenants
Metadata TamperingClient sends {"user_id": "admin"} in contextPrivilege escalation, impersonation
Missing AuthenticationNo token validation on requestsAnonymous access, no identity verification
Cross-Tenant LeakageThread IDs not scoped to usersUser data visible across organizational boundaries

Target State: Defense in Depth

# chatkit_server.py (SECURE - target state)
class TaskManagerServer(ChatKitServer):
async def respond(
self,
thread: ThreadMetadata,
input: UserMessageItem | None,
context: RequestContext, # ✓ Typed as RequestContext
) -> AsyncIterator[ThreadStreamEvent]:
# ✓ Layer 1: Validate user identity from context
user_id = self._validate_user(context)

# ✓ Layer 2: Verify thread ownership
await self._verify_thread_access(thread.id, user_id)

# ✓ Layer 3: Enforce tenant isolation via RequestContext
thread_metadata = await self.store.load_thread(
thread.id,
context # Contains user_id for access control
)
...

Security Layers:

  1. Identity Verification: Validate JWT token, extract user claims
  2. Access Control: Check thread belongs to requesting user
  3. Tenant Isolation: Scope all data queries to authenticated user

RequestContext Validation

Understanding RequestContext

RequestContext is ChatKit's identity container. It flows from frontend → HTTP proxy → ChatKit server:

// Frontend (React)
const { useChatKit } = require("@openai/chatkit-react");

function ChatUI() {
const kit = useChatKit({
baseUrl: "/api/chatkit",
// Custom fetch injects auth headers + page context
fetch: async (url, options) => {
const token = await getAccessToken(); // From auth provider
return fetch(url, {
...options,
headers: {
...options.headers,
Authorization: `Bearer ${token}`,
"X-Page-Context": JSON.stringify({ page: "tasks" }),
},
});
},
});

return <ChatKitUI chatKit={kit} />;
}

Output:

POST /api/chatkit/respond HTTP/1.1
Authorization: Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...
X-Page-Context: {"page":"tasks"}
Content-Type: application/json

{"thread_id": "thread_abc123", "message": "Create task..."}

Backend: Extracting Identity

# chatkit_server.py
from chatkit.server import ChatKitServer
from chatkit.types import RequestContext
from typing import Any
import jwt
from jwt import PyJWKClient

class TaskManagerServer(ChatKitServer):
def __init__(self, jwks_url: str, **kwargs):
super().__init__(**kwargs)
# JWKS client for public key retrieval
self.jwks_client = PyJWKClient(jwks_url)

def _validate_user(self, context: RequestContext) -> str:
"""
Extract and validate user identity from RequestContext.

Args:
context: RequestContext with headers/metadata from request

Returns:
user_id: Validated user identifier

Raises:
PermissionError: If token invalid or missing
"""
# Extract Authorization header
auth_header = context.request_headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise PermissionError("Missing or invalid Authorization header")

token = auth_header.removeprefix("Bearer ")

try:
# Get signing key from JWKS endpoint
signing_key = self.jwks_client.get_signing_key_from_jwt(token)

# Verify and decode JWT
payload = jwt.decode(
token,
signing_key.key,
algorithms=["RS256"],
options={"verify_exp": True, "verify_aud": False}
)

# Extract user ID from claims
user_id = payload.get("sub") # 'sub' = subject claim
if not user_id:
raise PermissionError("Token missing 'sub' claim")

return user_id

except jwt.ExpiredSignatureError:
raise PermissionError("Token expired")
except jwt.InvalidTokenError as e:
raise PermissionError(f"Invalid token: {e}")

Output (successful validation):

user_id = _validate_user(context)
# user_id = "user_abc123" (extracted from JWT 'sub' claim)

Output (failed validation):

try:
user_id = _validate_user(context)
except PermissionError as e:
# Log: "Invalid token: Signature verification failed"
# Return 401 Unauthorized to client

What's Happening

  1. Extract Token: Get Authorization: Bearer <token> from request headers
  2. Fetch Public Key: Retrieve signing key from JWKS URL (e.g., https://auth.example.com/.well-known/jwks.json)
  3. Verify Signature: Ensure token signed by trusted auth server (Better Auth, Auth0, etc.)
  4. Check Expiration: Reject expired tokens (exp claim)
  5. Extract Identity: Get user ID from sub (subject) claim

Why JWKS? Public key rotation support. Auth server can rotate signing keys without server reconfiguration.

Thread Ownership Enforcement

Database Schema for Ownership

# models.py
from sqlmodel import SQLModel, Field
from datetime import datetime

class Thread(SQLModel, table=True):
id: str = Field(primary_key=True)
user_id: str = Field(index=True) # ← Owner of this thread
created_at: datetime = Field(default_factory=datetime.utcnow)
metadata: dict = Field(default_factory=dict)

class ThreadItem(SQLModel, table=True):
id: str = Field(primary_key=True)
thread_id: str = Field(foreign_key="thread.id", index=True)
user_id: str # ← Redundant but fast for queries
role: str # "user" | "assistant" | "tool"
content: str
created_at: datetime = Field(default_factory=datetime.utcnow)

Key Design:

  • user_id column on Thread table for ownership
  • user_id column on ThreadItem for scoped queries (denormalized for performance)
  • Index on user_id for fast filtering

Access Control Check

# chatkit_server.py
class TaskManagerServer(ChatKitServer):
async def _verify_thread_access(self, thread_id: str, user_id: str):
"""
Verify authenticated user owns the requested thread.

Args:
thread_id: Thread being accessed
user_id: Authenticated user from JWT

Raises:
PermissionError: If user doesn't own thread
"""
async with self.db.session() as session:
result = await session.exec(
select(Thread).where(
Thread.id == thread_id,
Thread.user_id == user_id # ← Ownership check
)
)
thread = result.first()

if not thread:
# Thread doesn't exist OR user doesn't own it
raise PermissionError(
f"Thread {thread_id} not found or access denied"
)

async def respond(
self,
thread: ThreadMetadata,
input: UserMessageItem | None,
context: RequestContext,
) -> AsyncIterator[ThreadStreamEvent]:
# Layer 1: Validate identity
user_id = self._validate_user(context)

# Layer 2: Verify access BEFORE loading data
await self._verify_thread_access(thread.id, user_id)

# Layer 3: Scoped data retrieval
thread_items = await self._get_user_thread_items(thread.id, user_id)

# Safe to proceed - user owns this thread
async for event in self._generate_response(thread_items, input):
yield event

Output (authorized access):

# User "user_abc" accessing their own thread "thread_123"
await _verify_thread_access("thread_123", "user_abc")
# ✓ Check passes, continues execution

Output (unauthorized access):

# User "user_abc" trying to access "user_xyz"'s thread
try:
await _verify_thread_access("thread_789", "user_abc")
except PermissionError as e:
# ❌ "Thread thread_789 not found or access denied"
# Return 403 Forbidden to client

Why This Works

Attack Scenario: User A tries to access User B's thread

POST /api/chatkit/respond
Authorization: Bearer <user_a_token>
{"thread_id": "thread_belonging_to_user_b"}

Defense in Depth:

  1. JWT validated → extracts user_id = "user_a"
  2. Access check queries: SELECT * FROM threads WHERE id = 'thread_b' AND user_id = 'user_a'
  3. Query returns 0 rows (thread exists but different owner)
  4. PermissionError raised → 403 Forbidden response
  5. User A never sees User B's data

Metadata Validation

The Tampering Threat

// MALICIOUS CLIENT (attacker's code)
const kit = useChatKit({
fetch: async (url, options) => {
return fetch(url, {
...options,
headers: {
Authorization: "Bearer <valid_user_token>",
// ⚠️ Attacker injects fake metadata
"X-User-Context": JSON.stringify({ role: "admin", user_id: "admin_user" }),
},
});
},
});

Problem: If backend trusts metadata from headers without validation, attacker escalates privileges.

Secure Pattern: Trust Only JWT Claims

class TaskManagerServer(ChatKitServer):
def _validate_user(self, context: RequestContext) -> dict:
"""
Extract user identity AND roles from VERIFIED JWT only.
NEVER trust client-sent metadata for identity/permissions.
"""
# ... JWT validation code ...

payload = jwt.decode(token, signing_key.key, algorithms=["RS256"])

# Extract from JWT claims (signed by auth server)
return {
"user_id": payload["sub"],
"email": payload.get("email"),
"roles": payload.get("roles", []), # From token, not headers
}

def _get_page_context(self, context: RequestContext) -> dict:
"""
Extract non-sensitive metadata (page context, UI state).
OK to trust for UX, NOT for authorization.
"""
page_header = context.request_headers.get("X-Page-Context", "{}")
return json.loads(page_header)

async def respond(self, thread, input, context):
# Identity from JWT (trusted)
user_info = self._validate_user(context)
user_id = user_info["user_id"]

# UI context from headers (untrusted for auth, OK for UX)
page_context = self._get_page_context(context)

# Use user_id from JWT for access control
await self._verify_thread_access(thread.id, user_id)

# Use page_context for agent instructions (UX enhancement)
agent_instructions = f"User is on {page_context.get('page', 'unknown')} page"
...

Output (attack mitigated):

# Attacker sends: X-User-Context: {"role": "admin"}
# Backend ignores header, uses JWT:
user_info = _validate_user(context)
# user_info = {"user_id": "regular_user", "roles": ["user"]}
# Access check uses "regular_user", not attacker's fake "admin"

Security Rule: Identity/authorization from JWT only. UI context from headers OK for UX, never for access control.

Extract Your chatkit-auth-security Skill

You've implemented JWT validation, thread ownership enforcement, and metadata validation. These security patterns will recur across every production ChatKit server. Time to harvest this intelligence.

Layer 3 trigger: Pattern recurs 2+ times across projects → Create reusable intelligence

Step: Extract the Skill

Copy and paste this prompt:

Using your skill-creator skill, study the authentication and security
implementation code I just wrote in this lesson and extract it into a
reusable chatkit-auth-security skill.

Encapsulate all 5 patterns: JWKS public key fetching, claim extraction,
error handling, Better Auth integration, and multi-tenant isolation.

Claude will:

  1. Review the 5 security patterns from this lesson
  2. Use skill-creator to generate the skill structure
  3. Extract patterns: JWKS, claims, errors, Better Auth, isolation
  4. Create reference docs with threat mitigation strategies
  5. Follow canonical format from building-chat-interfaces

Your skill appears at .claude/skills/chatkit-auth-security/.


Integration: Secure TaskManager Server

# chatkit_server.py (production-ready with auth)
from chatkit.server import ChatKitServer
from chatkit.types import RequestContext, ThreadMetadata, UserMessageItem
from typing import AsyncIterator, Any
from jwt import PyJWKClient
import jwt
from sqlmodel import select

class TaskManagerServer(ChatKitServer):
def __init__(self, jwks_url: str, db_url: str, **kwargs):
super().__init__(**kwargs)
self.jwks_client = PyJWKClient(jwks_url)
self.db_url = db_url

def _validate_user(self, context: RequestContext) -> str:
"""Extract and validate user ID from JWT."""
auth_header = context.request_headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
raise PermissionError("Missing Authorization header")

token = auth_header.removeprefix("Bearer ")
signing_key = self.jwks_client.get_signing_key_from_jwt(token)
payload = jwt.decode(token, signing_key.key, algorithms=["RS256"])

user_id = payload.get("sub")
if not user_id:
raise PermissionError("Token missing 'sub' claim")

return user_id

async def _verify_thread_access(self, thread_id: str, user_id: str):
"""Verify authenticated user owns the thread."""
async with self.db.session() as session:
result = await session.exec(
select(Thread).where(
Thread.id == thread_id,
Thread.user_id == user_id
)
)
if not result.first():
raise PermissionError("Thread not found or access denied")

async def respond(
self,
thread: ThreadMetadata,
input: UserMessageItem | None,
context: RequestContext,
) -> AsyncIterator[ThreadStreamEvent]:
# Security layers
user_id = self._validate_user(context)
await self._verify_thread_access(thread.id, user_id)

# Scoped data retrieval
thread_items = await self._get_user_thread_items(thread.id, user_id)

# Agent execution (secure - user owns this thread)
agent = Agent(
name="TaskManager",
instructions=f"User ID: {user_id}. Help manage their tasks."
)

result = Runner.run_streamed(agent, input.content)
async for event in stream_agent_response(context, result):
yield event

Output (successful auth flow):

1. Client sends: Authorization: Bearer <valid_token>
2. Backend validates JWT → extracts user_id = "user_abc"
3. Backend checks: Thread "thread_123" owned by "user_abc"? → Yes
4. Backend loads thread items for "user_abc" only
5. Agent responds with scoped data

Output (blocked attack):

1. Attacker sends: Authorization: Bearer <user_a_token>
2. Attacker requests: thread_id = "user_b_thread"
3. Backend validates JWT → user_id = "user_a"
4. Backend checks: Thread "user_b_thread" owned by "user_a"? → No
5. Backend returns: 403 Forbidden "Thread not found or access denied"
6. Attacker never sees user_b's data

Security Threat Mitigation Summary

ThreatMitigationVerification
Unauthorized Thread Access_verify_thread_access() checks ownership before loading dataTest: User A cannot access User B's thread (403 response)
Thread HijackingThread IDs scoped to user_id in all queriesTest: Iterate thread IDs, all return 403 except owned threads
Metadata TamperingIdentity from JWT sub claim only, ignore client headersTest: Send fake user_id in header, access still scoped to JWT user
Missing Authentication_validate_user() required for all requestsTest: Request without token → 401 Unauthorized
Cross-Tenant Leakageuser_id column on Thread + ThreadItem tablesTest: Query returns 0 rows for other users' threads

Safety Note

CRITICAL: Never store sensitive data (passwords, API keys, PII) in thread metadata without encryption. Metadata is JSON-serialized and may be logged. Use metadata for UI state (page context, preferences), not secrets.

Recommended: Store sensitive associations (user_id → encrypted_secrets) in separate table with row-level encryption. Reference by ID in thread metadata if needed.

Try With AI

Apply your chatkit-auth-security skill to real security challenges. These prompts build your authentication pattern recognition.

Prompt 1: Implement Rate Limiting

Setup: Open your chatkit_server.py

Prompt:

Using the chatkit-auth-security skill patterns, add rate limiting:
- Max 10 requests per minute per user_id
- Return 429 Too Many Requests if exceeded
- Store counters in Redis or in-memory dict
- Reset counters every 60 seconds

Show the middleware implementation.

What you're learning: Security isn't just authentication—it's also abuse prevention. Rate limiting by authenticated user prevents DoS attacks and ensures fair resource allocation.

Expected: Middleware that extracts user_id from JWT, checks request count, enforces limit.


Prompt 2: Audit Logging

Setup: Review your _verify_thread_access() function

Prompt:

Add security audit logging for thread access:
- Log every thread access attempt (user_id, thread_id, timestamp, success/failure)
- Include IP address from RequestContext
- Write to structured log file (JSON format)
- Log suspicious patterns (rapid thread ID iteration)

Show the logging integration.

What you're learning: Audit trails are critical for security forensics. When an attack happens, logs tell you what data was accessed and by whom.

Expected: Logging calls that capture access events with structured data for analysis.


Prompt 3: Role-Based Access Control (RBAC)

Setup: Consider multi-role scenarios (admin, user, guest)

Prompt:

Extend the authentication pattern to support roles:
- Extract 'roles' claim from JWT (e.g., ["admin", "user"])
- Admin users can access ANY thread (support/debugging)
- Regular users can only access their own threads
- Guest users have read-only access

Modify _verify_thread_access() to handle role-based permissions.

What you're learning: Authorization isn't binary. Real systems have nuanced permissions (admin overrides, read-only access, hierarchical roles). RBAC patterns scale to enterprise complexity.

Expected: Permission logic that checks role before enforcing ownership constraints.


Next Lesson: Integrate your secure ChatKit server with React using useChatKit, building a production-ready UI with authentication, streaming responses, and error handling.