Jobs API: Scheduled Tasks
Your Task API needs to clean up completed todos older than 30 days. You could run a cron job on your server, but what happens when you scale to multiple pods? Three pods means three cleanup runs. You need exactly-once scheduled execution across your distributed system.
This is what Dapr's Jobs API solves. Instead of managing cron jobs in your infrastructure, you schedule jobs through Dapr. The Scheduler service tracks when jobs should run and triggers your application at the right time. Whether you have one pod or twenty, the job runs once.
In this lesson, you'll schedule a daily cleanup job for your Task API and implement the handler that receives the trigger. You'll also learn when to use Jobs API versus bindings, since both can trigger your application on a schedule.
Jobs API vs Bindings: When to Use Each
Both Jobs API and input bindings can trigger your application on a schedule. The difference is who controls the schedule:
| Aspect | Jobs API | Input Bindings |
|---|---|---|
| Use case | Schedule future work from your code | Receive triggers from external systems |
| Storage | Dapr Scheduler stores in embedded Etcd | External system manages the schedule |
| Control | You schedule via API calls | External system triggers you |
| Examples | "Run cleanup at midnight" | "Trigger when file arrives in S3" |
| Who initiates | Your application | External system or cron binding |
Use Jobs API when:
- Your application decides when work should happen
- You need to schedule jobs dynamically (user-requested reports, delayed notifications)
- You want Dapr to manage the schedule state
Use Input Bindings when:
- An external system controls the trigger (Kafka, RabbitMQ, S3)
- You need a simple cron trigger without application logic to create it
- The schedule is static and defined in YAML
For the Task API cleanup, Jobs API makes sense because your application knows when cleanup should run and can adjust the schedule programmatically.
The Scheduler Control Plane Service
The Jobs API is backed by the Scheduler service, which runs as part of the Dapr control plane alongside the sidecar injector, operator, and sentry.
┌─────────────────────────────────────────────────────────────────┐
│ Dapr Control Plane (dapr-system namespace) │
├─────────────────────────────────────────────────────────────────┤
│ dapr-scheduler-server │
│ ├── Jobs API backend │
│ ├── Actor reminders (from v1.15) │
│ ├── Workflow reminders │
│ └── Embedded Etcd database ← Jobs stored here │
├─────────────────────────────────────────────────────────────────┤
│ dapr-sidecar-injector │ dapr-operator │ dapr-sentry │
└─────────────────────────────────────────────────────────────────┘
Key architecture facts:
-
Embedded Etcd: The Scheduler service runs its own Etcd instance by default. Your jobs are persisted here and survive restarts.
-
No leader election: Unlike some distributed systems, all Scheduler replicas are peers. They load-balance job triggers across instances.
-
Already deployed: When you installed Dapr with Helm in Lesson 3, the Scheduler service was included. Check with:
kubectl get pods -n dapr-system | grep scheduler
Output:
dapr-scheduler-server-0 1/1 Running 0 2d
Schedule Expressions
Dapr accepts two formats for job schedules:
Human-Readable Period Strings
Simple patterns for common intervals:
| Expression | Meaning |
|---|---|
@yearly or @annually | Once per year (Jan 1, midnight) |
@monthly | First day of each month (midnight) |
@weekly | Sunday at midnight |
@daily or @midnight | Every day at midnight |
@hourly | Start of every hour |
@every <duration> | Custom interval (e.g., @every 1h30m) |
Cron Expressions (6 fields)
More precise control with systemd-style cron (note: 6 fields, not 5):
┌───────────── second (0-59)
│ ┌─────────── minute (0-59)
│ │ ┌───────── hour (0-23)
│ │ │ ┌─────── day of month (1-31)
│ │ │ │ ┌───── month (1-12)
│ │ │ │ │ ┌─── day of week (0-6, 0=Sunday)
│ │ │ │ │ │
* * * * * *
Examples:
| Expression | Meaning |
|---|---|
0 0 0 * * * | Every day at midnight (0 seconds, 0 minutes, 0 hours) |
0 30 * * * * | Every hour at the 30-minute mark |
0 0 9 * * 1 | Every Monday at 9 AM |
0 0 */2 * * * | Every 2 hours |
Scheduling a Job
The Jobs API uses HTTP endpoints on the Dapr sidecar. Here's how to schedule a daily cleanup job.
Using httpx (Direct HTTP)
import httpx
import os
import json
DAPR_HTTP_PORT = os.getenv("DAPR_HTTP_PORT", "3500")
JOBS_URL = f"http://localhost:{DAPR_HTTP_PORT}/v1.0-alpha1/jobs"
async def schedule_cleanup_job():
"""Schedule a daily cleanup job for completed todos."""
job_name = "daily-todo-cleanup"
job_data = {
"schedule": "@daily", # Human-readable: every day at midnight
"data": {
"action": "cleanup-completed-todos",
"retention_days": 30
}
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{JOBS_URL}/{job_name}",
json=job_data
)
if response.status_code == 204:
print(f"Job '{job_name}' scheduled successfully")
else:
print(f"Failed to schedule job: {response.status_code}")
print(response.text)
Output:
Job 'daily-todo-cleanup' scheduled successfully
One-Time vs Recurring Jobs
One-time job (runs once at a specific time):
async def schedule_reminder():
"""Schedule a one-time reminder for tomorrow at 9 AM."""
job_data = {
"dueTime": "2025-01-15T09:00:00Z", # RFC3339 timestamp
"data": {
"action": "send-reminder",
"user_id": "user-123",
"message": "Review quarterly report"
}
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{JOBS_URL}/quarterly-reminder",
json=job_data
)
Recurring job with limited repeats:
async def schedule_hourly_sync():
"""Sync data every hour, but only 24 times."""
job_data = {
"schedule": "@hourly",
"repeats": 24, # Stop after 24 executions
"data": {
"action": "sync-external-system"
}
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{JOBS_URL}/hourly-sync",
json=job_data
)
Implementing the Job Handler
When a scheduled job triggers, Dapr sends a POST request to your application at /job/{job-name}. You implement this endpoint to handle the job.
FastAPI Job Handler
from fastapi import FastAPI, Request
from datetime import datetime, timedelta
app = FastAPI()
@app.post("/job/daily-todo-cleanup")
async def handle_cleanup_job(request: Request):
"""Handle the daily cleanup job trigger from Dapr Scheduler."""
job_data = await request.json()
action = job_data.get("action")
retention_days = job_data.get("retention_days", 30)
if action == "cleanup-completed-todos":
cutoff_date = datetime.utcnow() - timedelta(days=retention_days)
print(f"Cleaning up todos completed before {cutoff_date}")
# Your cleanup logic here
# deleted_count = await cleanup_old_todos(cutoff_date)
return {"status": "SUCCESS"}
return {"status": "DROPPED", "reason": f"Unknown action: {action}"}
Output (when job triggers):
Cleaning up todos completed before 2024-12-15 00:00:00
Handler Response Format
Your handler must return a response indicating success or failure:
| Response | Dapr Behavior |
|---|---|
{"status": "SUCCESS"} | Job completed, no retry |
{"status": "RETRY"} | Job failed, retry based on failure policy |
{"status": "DROPPED"} | Job failed permanently, don't retry |
Multiple Job Handlers
You can schedule multiple jobs and route them to different handlers:
@app.post("/job/daily-todo-cleanup")
async def handle_cleanup(request: Request):
job_data = await request.json()
# Cleanup logic
return {"status": "SUCCESS"}
@app.post("/job/weekly-report")
async def handle_weekly_report(request: Request):
job_data = await request.json()
# Report generation logic
return {"status": "SUCCESS"}
@app.post("/job/hourly-sync")
async def handle_sync(request: Request):
job_data = await request.json()
# Sync logic
return {"status": "SUCCESS"}
Complete Example: Task API with Scheduled Cleanup
Here's a complete FastAPI application that schedules a cleanup job on startup and handles the trigger:
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from datetime import datetime, timedelta
import httpx
import os
DAPR_HTTP_PORT = os.getenv("DAPR_HTTP_PORT", "3500")
JOBS_URL = f"http://localhost:{DAPR_HTTP_PORT}/v1.0-alpha1/jobs"
async def schedule_cleanup_job():
"""Schedule daily cleanup if not already scheduled."""
job_data = {
"schedule": "@daily",
"data": {
"action": "cleanup-completed-todos",
"retention_days": 30
}
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{JOBS_URL}/daily-todo-cleanup",
json=job_data
)
if response.status_code == 204:
print("Cleanup job scheduled")
elif response.status_code == 400:
# Job already exists
print("Cleanup job already scheduled")
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: Schedule the cleanup job
await schedule_cleanup_job()
yield
# Shutdown: Nothing to clean up
app = FastAPI(lifespan=lifespan)
@app.post("/job/daily-todo-cleanup")
async def handle_cleanup_job(request: Request):
"""Handle daily cleanup trigger from Dapr Scheduler."""
job_data = await request.json()
retention_days = job_data.get("retention_days", 30)
cutoff_date = datetime.utcnow() - timedelta(days=retention_days)
print(f"Running daily cleanup for todos older than {cutoff_date}")
# In production, you'd query your database:
# deleted_count = await db.execute(
# "DELETE FROM todos WHERE completed_at < :cutoff",
# {"cutoff": cutoff_date}
# )
# print(f"Deleted {deleted_count} old todos")
return {"status": "SUCCESS"}
@app.get("/health")
async def health():
return {"status": "healthy"}
Output (on startup):
Cleanup job scheduled
Output (when job triggers at midnight):
Running daily cleanup for todos older than 2024-12-15 00:00:00
Managing Jobs
Check if a Job Exists
async def get_job(job_name: str) -> dict | None:
"""Retrieve job details."""
async with httpx.AsyncClient() as client:
response = await client.get(f"{JOBS_URL}/{job_name}")
if response.status_code == 200:
return response.json()
return None
Delete a Job
async def delete_job(job_name: str) -> bool:
"""Cancel a scheduled job."""
async with httpx.AsyncClient() as client:
response = await client.delete(f"{JOBS_URL}/{job_name}")
return response.status_code == 204
Update a Job Schedule
To change a job's schedule, delete and recreate it, or use the overwrite parameter:
async def update_job_schedule(job_name: str, new_schedule: str):
"""Update an existing job's schedule."""
job_data = {
"schedule": new_schedule,
"overwrite": True, # Replace existing job
"data": {
"action": "cleanup-completed-todos",
"retention_days": 30
}
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{JOBS_URL}/{job_name}",
json=job_data
)
return response.status_code == 204
Key Vocabulary
| Term | Definition |
|---|---|
| Jobs API | Dapr building block for scheduling future work |
| Scheduler service | Control plane component that stores and triggers jobs |
| Due time | One-time execution at a specific timestamp |
| Schedule | Recurring execution pattern (cron or human-readable) |
| Job handler | Your endpoint that receives job triggers |
Reflect on Your Skill
You built a dapr-deployment skill in Lesson 0. Test and improve it based on what you learned.
Test Your Skill
Using my dapr-deployment skill, schedule a job that runs every hour to check for stale tasks.
What endpoint do I need to implement to receive the job trigger?
Does your skill include Jobs API patterns?
Identify Gaps
Ask yourself:
- Does my skill explain the difference between Jobs API and cron bindings?
- Does it include the job handler endpoint pattern (
/job/{name})? - Can it help me write both cron expressions and human-readable schedules?
Improve Your Skill
If you found gaps:
My dapr-deployment skill doesn't cover the Jobs API.
Update it to include:
1. Jobs API for scheduling future work (vs bindings for external triggers)
2. HTTP pattern: POST /v1.0-alpha1/jobs/{name} with schedule and data
3. Handler pattern: POST /job/{name} endpoint returns {"status": "SUCCESS"}
4. Human-readable schedules: @daily, @hourly, @every 1h30m
Try With AI
You now understand Dapr's Jobs API for scheduled tasks. Use AI to explore scheduling patterns for your domain.
Setup
Open your AI assistant with your Task API context. These prompts help you apply job scheduling to real scenarios.
Prompt 1: Schedule a Daily Cleanup
Create a Dapr job that runs daily at midnight to archive completed todos.
Use the Jobs API with an async httpx client.
Show me both the scheduling code and the FastAPI handler.
What you're learning: This prompt practices the complete Jobs API workflow: scheduling a job with a human-readable expression and implementing the handler that receives the trigger. You'll see how the job data flows from schedule time to trigger time.
Prompt 2: Jobs API vs Cron Bindings
What's the difference between Dapr's Jobs API and a cron input binding?
When should I use each?
Give me a concrete example where Jobs API is better and one where cron binding is better.
What you're learning: Understanding when to use Jobs API vs bindings prevents architectural mistakes. AI will explain that Jobs API is for application-controlled scheduling while bindings are for static schedules or external triggers.
Prompt 3: Implement a Job Handler
Create a FastAPI handler for receiving job triggers from Dapr Scheduler.
The job should:
1. Process data from the job payload
2. Return SUCCESS, RETRY, or DROPPED based on the outcome
3. Log when the job runs
Show me error handling for when the job processing fails.
What you're learning: Job handlers need proper error handling and response formatting. AI will show you how to structure handlers that correctly communicate success or failure back to Dapr.
Safety Note
The Jobs API is currently in alpha (v1.0-alpha1). While stable for production use, the API surface may change in future Dapr versions. Pin your Dapr version in production and test thoroughly when upgrading.