给你的 AI Agent 一套神经系统
15 个概念,覆盖约 80% 的真实用法:感官(triggers)、反射(durable execution)和平衡(flow control)。
你已经构建了一个能工作的 agent。但它也只在你盯着它的时候才工作。你打开 Claude Code 或 OpenCode,输入内容,它回复。你一走开,它就停下来。从「一个由你操作的 agent」到「一个自己运转的 worker」之间的这道鸿沟,正是这门课的全部主题。
让人意外的是,填平这道鸿沟的并不是一个更聪明的 agent。你的 agent 已经具备了完成工作所需的一切:一个负责思考的 LLM,用来行动的 tools 和 MCP servers,以及它已经掌握的 workflows 所对应的 skills。它缺少的,是一套神经系统。想想你自己的身体:你的大脑负责思考,肌肉负责行动,但底层还有第二套系统在你毫不知情的情况下运转,那就是你的心跳和反射,是那些在你睡着时也维持你生命的信号。你停止注意,心脏依然跳动;而 agent 没有任何与之对应的东西,所以你一停止驱动它,它就停下来。神经系统是那层把闭环自己合上的连接组织,不需要人类去驱动每一个回合:它感知世界,在事情发生时唤醒 agent;它在某一步失败时凭反射做出反应(并在等待一个人或一个慢速 API 时,把位置守住数小时);当五百个请求同时落下时,它让 agent 保持平衡。这就是「一个由你操作的 agent」和「一个自己运转的 FTE」之间的分界线。你为 agent 配上这套神经系统;你不去重写 agent。这就是整门课围绕的那一个核心想法。
为 agent 配上神经系统的那个工具有一个技术名字,叫 durable execution engine,本课用的是其中一个,叫 Inngest。这套模式同样适用于 Temporal、Restate 和 Dapr Agents。这不只是一个教学比喻。Day AI 是一家为 AI-native 公司打造的 CRM,它把 Inngest 称为自己产品的「神经系统」,并且在本课要讲的每一个部分上运行。Inngest 免费的 Hobby tier 是最容易上手的地方:不需要信用卡,一条命令启动 dev server,还有一个 dashboard 让你边构建边查看。
例子刻意做得很薄:一个客户支持 agent,它查询几个示例客户,起草一封回复,并且只在人类批准后才发起退款。它故意做得很薄:难点不在 agent 身上,所以我们把 agent 保持得很小,把精力花在包在它外面的神经系统上。你将在这里从零构建它。它与之前的 Digital FTE 课程共享一些想法,但不以任何那门课的内容为前提。在下面的 Quick Win 中一次性把环境搭好,然后 Part 4 会用七个「粘贴并观察」的 prompts 构建出这个 worker。它以 Python 为先,基于 inngest-py:你用平实的英语指挥你的 coding agent,由它来写代码。如果你是在动手中学习的人,就略读 Part 1-3,直接跳到 Part 4。
单个 agent 在任务中途崩溃会很烦。让五十个 agents 组成的 workforce 在没有底层神经系统的情况下承接面向客户的工作,则根本不可行:要么采用一个能为你提供这套能力的平台,要么花六个月自己造一个更差的版本。有四个性质,使得这套神经系统对 agents 尤其重要: Day AI,这家为 AI-native 公司打造的 CRM,在本课要讲的每一个原语上运行自己的产品:durable LLM workflows、wait-for-event 协调、失败后的 replay、debounce 加 throttle 加 concurrency,以及多租户公平性。他们的两位创始工程师彼此独立地想到了同样的神经系统比喻。这是生产语言,不是课程包装。 The Agent Factory thesis 描述了任何生产级 agent 系统都必须满足的 Seven Invariants。你在这里构建的 worker 满足 Invariant 4(一个 engine)和 Invariant 5(一个 system of record,这里是一个小型 audit trail)。本课再增加两个,外加 Invariant 1 的一部分:为什么 AI agent 需要一套神经系统(四个性质)
step.wait_for_event(概念 15),你就得自己做审批队列:database table、polling、timeout handling、audit trail。这是一个项目,不是一个功能。这门课在 Agent Factory 论点中的位置
step.wait_for_event 是任何平台上最清晰的表达:agent 暂停,人类发出等待中的 event,agent 恢复。
15 个概念一览。 它们映射到神经系统要做的三件事:感官(triggers 唤醒 worker)、反射(durable execution 让它在出问题时保持正确),以及平衡(flow control 让它在负载下保持健康)。这是第一遍版本,概念加一句话要点。当你在构建时遇到问题,文末的 Quick reference 里有一张「症状到概念」的诊断表,会把你指回这个失败所属的概念。15 个概念,每个一句话(展开看完整地图)
# Concept 一句话要点 Senses (Triggers) 世界如何触达 worker 1 Events vs requests Request 是同步的,有人在等;event 是异步的,世界已经继续向前。 2 Cron triggers Schedule 唤醒函数。一行代码: TriggerCron(cron="0 9 * * *")。3 Webhook triggers 入站 HTTP payload 变成一个具名 event;你的函数响应这个 name。 4 Idempotency and event semantics Event ID 和 step name 让重复 event(或 retry)变成 no-op。 5 Fan-out and sub-agent delegation 一个 event,N 个订阅函数;或者一个 parent 发出 N 个 child events。 Reflexes (Durable execution) 出问题时仍让 worker 保持正确 6 step.run and the durable function model每个 step.run 都是 checkpoint;函数可以在 steps 之间崩溃并恢复。7 Memoization, the mechanic underneath 已完成 steps 返回存储的 output,而不是重新执行。 8 step.sleep and step.wait_for_event两者都会耐久地暂停函数:等待一段时间,或等待一个 event。 9 Retries, error handling, dead-letter 自动 backoff retries;N 次后仍失败的 run 会保留,供 replay。 10 step.run for AI calls in Python把 OpenAI calls 包进 step.run;step.ai.infer 可外包 inference(step.ai.wrap 仅 TypeScript)。Balance (Flow control) 负载下让 worker 保持健康 11 Concurrency and throttling concurrency 限制活跃 runs;throttle 限制每秒启动数。12 Priority and fairness Priority 排队;per-key concurrency 让每个 tenant 都有公平份额。 13 Batching 把 events 聚合成一次 batched function call,用于低成本批量工作。 14 Replay and bulk cancellation 用新代码 replay 失败 runs;bulk-cancel 不再需要的 runs。 15 HITL gates with step.wait_for_event函数暂停到人类批准,再携带决策恢复执行。
先修条件。 四件事,除此之外本课可以独立成立(Part 4 会从零构建自己的 worker)。
- 你会驱动一个 coding agent。 Claude Code 或 OpenCode,已安装并已认证。Plan mode、rules files、先读后写的 workflow:如果这套节奏对你来说很熟悉,你就对齐了。如果还不熟悉,Agentic Coding Crash Course 会讲到它。
- 你有一个
OPENAI_API_KEY(或你的 coding agent 能用的另一个 model key),以及一个 Neon account,用作 worker 的 Postgres system of record。这个 worker 会运行一个真实的 model,并在 Neon 中读写它的 customers 和 audit trail。Neon 是免费的(不需要信用卡),在搭建时你只需在浏览器里点一下就能授权;如果你还没有账号,在 neon.com 注册大约一分钟即可。Inngest dev server 本身不需要任何账号。- 你有可用的 Node.js 20+,即使这个 worker 是 Python。Inngest dev server 以 Node CLI 形式分发(
npx inngest-cli@latest dev)。- 你对「event-driven」与「request/response」有一个可用的心智模型。 如果「世界发出一个 event,零个、一个或多个函数对此响应」读起来很熟悉,你就对齐了。如果不是,概念 1 会给你这个形状。
做过 From Agent to Digital FTE 了吗?那你有一个更丰富的 worker 可以来包;Part 4 结尾有一个 callout 会把神经系统对准它。这是加分项,不是门槛。
第一遍怎么读这一页,外加你会遇到的术语表
第一遍。 展开任何标有 "Done when" 或 "What to watch" 的内容:那是可运行的行为,用来对照你的预测。在 Part 4 中,第一遍阅读时你可以略读那些承重的代码片段;每一段周围的叙述会告诉你这一层做什么,等你真正构建时,你的 agent 会写代码。"Try with AI" blocks 是可选的扩展 prompts。第一遍的目标,是把神经系统模型及其三层装进脑子;第二遍把手放在键盘上,才是真正构建的时候。每个概念都以一个 Predict(在往下读之前先给出一个答案)或一个 Quick check(检验你刚读到的规则)收尾;两者都是为了让你停下来,而不是给你打分。
术语表(每个术语在首次出现处也会结合上下文解释):
- Production Worker:一个被神经系统包裹的 AI agent:唤醒它的感官(triggers)、熬过失败的反射(durable execution),以及在负载下扩展它的平衡(flow control)。
- Event:一个具名、不可变的消息,描述某件事已经发生。例如:
{"name": "customer/email.received", "data": {"customer_id": "..."}}。这是 trigger surface。 - Inngest function:用
@inngest_client.create_function装饰的 Python 函数,声明 triggers 和 steps。它是耐久工作的单位。 - Step:Inngest function 内部的一项工作,用
ctx.step.run()、ctx.step.sleep()、ctx.step.wait_for_event()或ctx.step.ai.infer()包裹。每个 step 都会独立 retry,并独立 memoize。 - Memoization:当函数崩溃并重启时,Inngest 会从顶部重新运行函数代码,但对任何结果已经缓存的
step.run返回存储 output。函数会追到崩溃点,而不重做已经完成的工作。 - Flow control:每个函数的策略:
concurrency(最大活跃 runs)、throttle(每秒最大启动数)、priority(队列顺序)、batch_events(调用前先聚合)。 - HITL (Human In The Loop):函数先暂停,等待人类审批或输入,再继续。
step.wait_for_event是这个原语。 - Replay:把失败 runs 作为全新 runs 从顶部重新运行,使用 bug 修复后的当前代码(区别于 run 内部的自动 retry,后者从 memo 恢复)。对应 dashboard 上的 Rerun 按钮。
- Dev server:Inngest 的本地开发环境,通过
npx inngest-cli@latest dev启动。Dashboard 在http://127.0.0.1:8288;MCP endpoint 在/mcp。
截至 2026 年 5 月有效。 整个 Part 4 构建过程都已在一个实时运行的 Inngest dev server 和一个真实 model 上端到端跑过,版本为 inngest 0.5.18、openai-agents 0.17.3、fastapi 0.136.3、Python 3.12 以及 Inngest CLI。Part 4 中的每个片段都来自那次可运行的构建,不是凭记忆写的。本课讲的架构不会因为 SDK 变化而改变;SDK 只是今年接触这套架构的接口。如果某个实时文档页面与本页在某个语法细节上出现分歧,以文档为准: 固定你的版本,并在构建时查阅 Inngest Python quick start 和 OpenAI Agents SDK docs。
Claude Code 和 OpenCode 之间有差异的 section 会带一个 switcher;选定一个后,页面会在你后续访问中保持同步。
十五分钟的快速成效:搭好基础底座,看见反射
在读那 15 个解释这套架构为什么成立的概念之前,先把整门课要运行的环境搭起来,并亲眼看一个 durable function 熬过一次崩溃。这个 setup 只做一次;Part 4 会在完全相同的底座上构建客户支持 worker。做完后你会拥有:
- 在你的 coding agent 里打开的底座,已安装的 Skills,以及接好的三个 MCP servers(Neon、Context7,以及 dev-server 的
inngest-dev), - 一个全新的 Neon database,里面有两张表
customers和audit_log,是你通过 MCP 创建、并在 console 里看到的,它的DATABASE_URL已写入.env,供 worker 稍后使用, - 一个极小的 durable function(一个
step.run、一个step.sleep、一个 FastAPI host),运行在 Inngest dev server 上, - 一个你触发并看着它在 sleep 处暂停、消耗 零 compute 的 run,
- 以及一个你故意弄坏的 run,然后看着 Inngest retry,它从 memo 返回那个已经完成的 step,而只有坏掉的 step 重新执行。
最后那一拍,就是整门课的全部要点的缩影:那个你能亲眼看见的反射,一个 step 失败了,而系统在不重做已经完成的工作的前提下恢复过来。这不是 Part 4 的 worked example(那是完整的 Worker,七个 prompts);这是一坐就能做完的事。 做完它,再回来读概念。
一个 Production Worker 是并排的两个进程,把它们分清楚就是这个心智模型:一个是 Python function host(你的代码,把 function 提供给 Inngest),另一个是 Inngest dev server(那套神经系统,它触发 runs、memoize steps,并把 dashboard 给你看)。你的 coding agent 把两者接好,安装那些教会它 Inngest 模式的 Skills,并通过 inngest-dev MCP 与 dev server 对话。
还有一条边界很重要,它正是 Digital FTE 课程画过的那条。你的 worker 把它的 customers 和 audit trail 放在一个 Neon Postgres database 里,而这个 database 被触达的方式有两种截然不同。你的 coding agent 用 Neon MCP 去构建和检查它:创建表、读取 rows、拉取 connection string,全部在开发时用平实的英语完成。你的 worker 用它自己的 Postgres connection(DATABASE_URL)在 runtime 读写它。Worker 从不调用 Neon MCP,而 Neon 自己的文档把原因说得很直白:MCP server 用于开发和检查,绝不接进一个正在运行的 app。Neon 只需一次 OAuth 点击即免费使用;Inngest dev server 完全不需要账号。
拿到底座并打开它
下载底座,在你的 coding agent 里打开这个文件夹。Agent 会按照下面的 prompts 自己完成 setup。你只 setup 一次:ai-agent-nervous-system/ 是你整门课的文件夹,Quick Win 和 Part 4 共用。你永远不需要重新下载或重新解压。
下载 ai-agent-nervous-system-base.zip
cd ai-agent-nervous-system
claude
这个底座假设你用的是一个有能力的通用 agent(Claude Code,或运行 Claude Sonnet 或 Opus、GPT-5 或类似模型的 OpenCode)。较小的模型会在构建 prompt 上跑偏;如果它的第一份 plan 看起来含糊而不具体,就在继续之前换一个更强的。
准备底座(约 3 分钟)
底座在 AGENTS.md 里带了它的规则,也带了它的 MCP 接线;Skills、你的 key,以及 Neon 授权是接下来的事。让你的 agent 自己 setup。粘贴这段:
Read AGENTS.md, then get this base ready: install the Skills it lists for whichever agent you are, copy
.env.exampleto.envfor me, and tell me exactly what you need from me to bring the Neon and Context7 MCP servers online.
注意观察: agent 安装那四个 Inngest Skills 和 neon-postgres Skill(你会看到安装运行过程和 Installed 确认),创建 .env,然后向你要两样东西:你的 OPENAI_API_KEY(粘进 .env),以及一次浏览器点击来通过 OAuth 授权 Neon。Neon 是免费的;如果你还没有账号,在 neon.com 注册大约一分钟,或者直接在授权界面里建一个。INNGEST_DEV=1 已经在 .env 里,所以 SDK 会以 local dev 模式运行,不需要 signing key。安装和接线完成后,agent 会让你启动 dev server(下一步),然后重启它,因为新的 Skills 和 inngest-dev MCP 不会在会话中途加载。
完成的标志: Skills 已安装,.env 里有你的 key,Context7 可达,Neon 已授权。inngest-dev MCP 会在 dev server 运行后上线,那就是下一步。
启动 dev server,并确认 agent 能连上它(约 2 分钟)
这门课新增了两条 agent 通过 MCP 触达的边界:一个它构建和检查的 Neon database,以及一个它向其发送 events 并观察的正在运行的 dev server。所以在构建任何东西之前,先把两者都拉起来,并确认它们是活的。
在它自己的 terminal 里启动 Inngest dev server(它是一个 Node CLI;让它保持运行):
npx inngest-cli@latest dev
Dashboard 会在 http://127.0.0.1:8288 出现,dev server 会在 /mcp 暴露它的 MCP endpoint。现在重启你的 coding agent(退出并在 ai-agent-nervous-system 文件夹里重新启动),让刚安装的 Skills 和 inngest-dev MCP 都加载进来。然后粘贴这段:
List the Neon tools and the inngest-dev tools you can see.
注意观察: 两份真实的列表。Neon tools(创建 project、运行 SQL、描述表、获取 connection string,诸如此类)是你的 agent 在 database 上的那只手。inngest-dev tools(list_functions、send_event、invoke_function、get_run_status 等等)是它在正在运行的 dev server 上的那只手。下面的一切都骑在这两者之上。
Gate 打开: 回复里列出了真实的 Neon tool names 以及真实的 inngest-dev tool names。如果 Neon tools 缺失: OAuth 没有完成;从准备步骤重做 Neon 授权。如果 inngest-dev tools 缺失: dev server 没在运行(启动它),或者你跳过了重启(退出,在这个文件夹里重新启动,再问一次)。
构建 store,并拿到它的 connection string(约 3 分钟)
现在通过 Neon MCP 创建 worker 的 system of record,然后把它稍后触达它所需要的那一样东西交给 worker:一个 connection string。你在 Part 4 构建的 worker 会在这里读它的 customers、写它的 audit trail。粘贴这段:
Paste this to your coding agent. Plan first; execute on approval.
On a fresh Neon project, create two tables:
customers(id, email, tier) andaudit_log(a record of every action the worker takes). Then call the Neon tool that returns the connection string and write that URL into my.envasDATABASE_URL. Use the Neon tools for all of it; don't write SQL for me to run.
注意观察: agent 调用 Neon MCP tools 来创建 project 和那两张表(你看到的是这些 tool calls,不是你手敲的 SQL),然后把 DATABASE_URL 写进 .env。那个字符串就是交接:Neon MCP provision 了这个 store,而你的 worker 会用这个字符串,而不是 MCP server。
完成的标志: 一个全新的 Neon project 存在,带一张 customers 表和一张 audit_log 表,并且 .env 里有一个 DATABASE_URL。打开 console.neon.tech,选 agent 刚建的那个 project,打开 Tables:customers 和 audit_log 就在那里,眼下是空的。当 worker 运行时,你会在 D0 看到 rows 出现。(一张表就是一个电子表格:每行一件事,每列一个细节。)
构建第一个 durable function,并从 dashboard 驱动它(约 3 分钟)
现在用你刚安装的 Skills 构建最小的 durable function。Inngest Skills 在它们的示例里是 TypeScript 优先的,所以你的 agent 从它们那里取模式(什么是 step,一个 durable function 长什么样),而从文档确认确切的 Python 签名(dev-server MCP 的 grep_docs/read_doc,或 Context7),不是凭记忆。粘贴这段:
Using the Inngest Skills, write one tiny Inngest durable function (call it
greet-customer, triggered by ademo/greetevent) that composes a greeting in onestep.run, sleeps fifteen seconds withstep.sleep, then composes a farewell in a secondstep.runand returns both. Serve it from a FastAPI host in local dev mode, and start the host on port 8000 with auto-reload on, so edits I make later are picked up without a manual restart.
它写出来的形状,方便你一看就认得:function 是普通的 async def,两个 step.run calls 包住应该 memoize 的工作,中间的 step.sleep 让 run durably 暂停(进程可以在 sleep 期间崩溃、重启或重新部署;timer 触发后,run 在下一行恢复)。在 agent 的代码里有一个细节要确认:Inngest client 用 is_production=False 构造,或者读取 .env 里已有的 INNGEST_DEV=1。两者缺一,SDK 会悄悄默认走 Cloud,你的 function 永远不会在本地注册。
完成的标志: function host 运行在 port 8000,dev server(从上一步起就在运行)已自动发现它。打开 http://127.0.0.1:8288,点 Functions,greet-customer 在列表里。其余的你从浏览器里驱动。
触发它,看一个 step 以零 compute 睡眠(由你驱动)
发送 trigger event。最简单的路径是 dashboard:在 http://127.0.0.1:8288 里点 Events,再点 Send event,粘贴这个,点 Send:
{
"name": "demo/greet",
"data": { "name": "Sara" }
}
(更想留在 agent 里?让它通过 MCP 发送 event:"Send a demo/greet event with name Sara using the inngest-dev send_event tool." 两种方式启动的是同一个 run。)
点 Runs 并打开新 run。第一个 step 完成;sleep step 显示 Sleeping 并带一个恢复时间。你代码里没有任何东西在运行,host terminal 是空闲的,而这正是要点:一次 durable wait 消耗零 compute。十五秒后,run 自己恢复,farewell step 完成,状态翻成 Completed。Output 面板显示返回的 dict。
弄坏一个 step,看 retry 跳过它已经做过的工作(回报)
现在故意让一个 step 失败,这样你就能看着 memoization 把已完成的工作带过这次 retry。把这段粘给你的 agent:
Make the farewell step raise an error on purpose, so I can watch a run fail. Keep everything else the same.
再次发送同一个 demo/greet event,然后打开 run 并读它的 trace。回报就在这一个失败的 run 里:greeting step 显示一次完成的 attempt,而 farewell step 显示好几次 Attempts,每次都带 backoff 重试(Inngest 默认重试若干次),然后 run 落到 Failed。在那个 attempt 计数上停留片刻:完成的 greeting step 只付了一次费,不是每次 retry 付一次。那就是你能亲眼看见的 durable execution。为什么那个完成的 step 立即返回而不是重新运行,是你将在概念 7 遇到的机制;现在,只管看它发生。
(这个 dev-server 构建不显示单独的 "memoized" 徽章。memo 就是那个 attempt 计数:完成的 step 停在一次 attempt,而坏掉的 step 一路往上爬,这就是「从 memo 返回,没有重新运行」在这里的样子。)
现在修好它:
Now revert the farewell step to the working version.
Host 会自动 reload(那就是 --reload 给你买到的;如果你跳过了它,就手动重启 host)。发送一个全新的 demo/greet event,整个 function 现在在修好的代码上干净地跑到 Completed。关于恢复,有一条诚实的提醒,因为它常常咬到人:dashboard 的 Rerun 按钮会用你当前的代码从顶部启动一个全新的 run,每个 step 都从零重新执行。那是 incident recovery 的正确工具(一次糟糕的部署弄坏了一批 runs;你发布修复并 rerun 它们),但它不是那个保留 memo 的恢复。保留 memo 的恢复,是你刚刚在那个失败 run 内部看到的自动 retry,那里完成的 step 留在原地。
你刚刚搭好了整门课的环境,并亲眼看见了神经系统工作: Skills 已安装,你的 Neon store 已 provision 且 .env 里有 DATABASE_URL,dev-server MCP 是活的,你运行了一个 durable function,看着一个 step 在不消耗 compute 的情况下睡眠,然后弄坏一个 step,看着自动 retry 从 memo 返回那个完成的 step,而只有坏掉的那个重新运行。那就是这门课所讲的架构。课程其余部分把它放大:真实的感官(cron、webhook、fan-out)、更强的反射(step.run 内部的 agent invocation)、负载下真实的平衡,以及把「agent 可能搞砸这件事」变成「agent 起草,人类批准,动作发出」的人工审批 gate。
如果有东西没成功,四个问题几乎覆盖全部:
- dev server 连不上 function host:确认 host 运行在 port 8000。
- client 处于 Cloud 模式:agent 丢了
is_production=False,而.env缺INNGEST_DEV=1,所以 functions 永远不会在本地注册。让它设其中一个(显式的is_production值优先于 env var)。 - function 没出现在 dashboard 里:host 没有 reload;重启它。
- 一个 run 挂住,既无错误也无进展:一个失同步的 host 会静默停顿;把 host 和 dev server 一起重启,并让一个 host 对一个 dev server。(一个隐蔽的成因:如果
:8288被占用而 dev server 起在了8289+,仅仅重新指向inngest-devMCP URL 是不够的;host 仍然在跟:8288说话。在 host 上设INNGEST_BASE_URL=http://127.0.0.1:<port>,让它跟随 dev server 到新端口。)
如果你撞上其中任何一个,那个万能的恢复动作在这里也管用:"Something didn't work. Read the error, tell me in plain language what you see, and propose one fix I can approve."
你构建了什么,它往哪里生长
环境搭好了:底座已打开,Skills 已安装,三个 MCP servers 全部接好(Neon、Context7、inngest-dev),你的 Neon store 有它的 customers 和 audit_log 表且 .env 里有 DATABASE_URL,dev server 在运行。你也亲眼看见了整门课所依赖的那一个想法,durable execution 的反射。Part 4 会在这同一个底座、同一个文件夹里构建客户支持 worker:它读那些 customers、写那些 audit rows,然后把整件事包进完整的神经系统,一个真实的 event trigger、一个会扇出的每日 cron、flow control,以及退款上那个 durable 的人工审批 gate。Part 4 把这个 step.run-加-step.sleep 的骨架放大成一个在你的 Neon store 上做真实工作的 worker。如果这个 Quick Win 成功了,接下来的概念会解释每一块为什么是这个形状。
第 1 部分:感官,世界如何触达 worker
一个你手动调用的 AI agent,只在你调用它时运行。一个真正的 Production Worker 有感官:它在世界触达它时运行。一个客户发来邮件,一个 webhook 到达,一个 cron 每天 09:00 触发,另一个 worker 把工作交过来。这每一个都是一个进来的信号,而 trigger 就是 agent 感知它的方式。第 1 部分的五个概念就是这些感官:事件驱动的心智模型、世界伸进来的三种方式(cron、webhook、event)、防止重复处理的语义,以及让一个信号唤醒许多 workers 的 fan-out 模式。
概念 1:Events vs requests,那一次 durable 的心智转变
这门课接下来的一切,都建立在一次心智转变之上:从 requests 到 events。
Request 是一次同步对话。有人调用;你处理;你返回;他们继续。一个连接保持打开;一个人或一个服务正在等待。如果你崩溃,调用方会收到错误。一个你在 prompt 处与之聊天的 agent 就是一个 request:你输入,它流式返回,这场对话属于你的 terminal session。
Event 是一条异步消息。世界里发生了某件事(客户注册、邮件到达、付款清算),发起方发出一条描述这个事实的具名记录。零个、一个或多个函数会各自独立地对这个 event 做出反应。没有连接保持打开。发起方不知道谁在监听,不等待结果,也不被阻塞。世界已经继续向前。
# A request: I'm here, waiting, blocking
result = await agent.handle_customer_message(text=user_input)
print(result) # I unblock when the agent finishes
# An event: I fire-and-forget
await inngest_client.send(events=[
inngest.Event(
name="customer/email.received",
data={"customer_id": "c-4429", "body": email_body, "subject": subject},
),
])
# I return immediately. Somewhere else, one or more Inngest
# functions react to this event on their own schedule.
这个转变听起来很小。它并不小。一旦你用 events 思考,durability 和 scale 几乎就免费地掉了出来,因为:
- producer 不会被 consumer 拖慢(email-receiver 不会等 agent 写完回复草稿)。
- consumer 可以崩溃并重启,而不丢掉工作(event 被 durably 存储;Inngest 会重新投递它)。
- 可以添加新的 consumers 而不改 producers(第二个函数,比如一个 analytics counter,可以订阅
customer/email.received,而 email-receiver 不需要知道)。 - Backpressure 变成一个 flow-control policy,而不是一次代码改动(Inngest 限制 concurrency;producer 继续发;events 排队)。
Predict。 你的客户支持 Worker 回复一封邮件需要 8 秒:三秒用于 agent 的推理,四秒用于两次 MCP tool calls,一秒用于 database write。高峰负载下你每分钟收到 50 封邮件。如果你用 request 模型(email parser 阻塞直到 agent 完成),这意味着到你的 email parser 有多少个并行 HTTP connections?如果你用 event 模型(email parser 发出一个 event 并立即返回),又是多少?Confidence 1-5。
答案:request 模型需要约 7 个 concurrent parsers(50/min × 8 秒 = 约 6.7 个并行 handlers,再加一点 headroom)。Event 模型只需要一个 parser(它发出 event 后约 10ms 返回;event queue 吸收 50/min 的峰值;Inngest functions 按你允许的 concurrency 消费这个 queue)。Event 模型把生产速率和消费速率解耦。这不只是一个扩展性事实;它是一个架构事实。Event 成为「世界里发生了什么」与「Worker 对此做什么」之间一道 durable boundary。让 consumer 在处理中途崩溃,event 仍然在那里供 retry。再加三种 consumer 类型,producer 也不会察觉。Events 是你不再亲自拥有工作时序的方法。Try with AI
Walk me through three scenarios. For each, classify it as REQUEST-MODEL
or EVENT-MODEL, and explain which one fits better:
A) A user clicks "Submit refund request" in the support portal and
expects to see "Refund issued: $30" within 2 seconds.
B) A nightly cron job at 02:00 runs a customer-health-check across
all 5,000 customers and writes a report to Slack.
C) A customer sends an email to support@; we want a draft response
ready within 60 seconds for the on-call agent to review and send.
For each, name (a) what the human's expectation of timing is and
(b) what failure looks like if the model crashes mid-execution.
概念 2:Cron triggers,因为时间流逝而运行的工作
最简单的 trigger 是时钟。Production Worker 做的很多事并不是对外部 events 的反应;它们是 scheduled work:每日健康报告、每周清理、每小时重新计算。Inngest 的 cron trigger 是一行代码。
import inngest
@inngest_client.create_function(
fn_id="daily-customer-health-check",
trigger=inngest.TriggerCron(cron="0 9 * * *"), # 09:00 every day, UTC
)
async def daily_health_check(ctx: inngest.Context) -> dict[str, int]:
"""Run a customer-health pass for every Pro/Enterprise customer."""
customers = await ctx.step.run("fetch-pro-customers", fetch_pro_customer_ids)
# fan out: one event per customer, one Worker run per event
await ctx.step.run("fan-out", fan_out_per_customer_events, customers)
return {"customers_scheduled": len(customers)}
注意三件事:
-
Schedule 就是标准 cron 语法。
0 9 * * *是每天 09:00 UTC;*/15 * * * *是每 15 分钟;0 9 * * 1是每周一 09:00。Inngest 按 UTC 计算 cron;如果你需要别的 timezone,那是一个 function parameter,不是另一个概念。 -
函数仍然用
ctx.step.run。 不管是 cron-triggered 还是 event-triggered,函数的形状完全一样。Steps 一样工作。Durability 一样工作。Flow control 一样工作。Trigger 只是函数如何启动。 -
Cron 的 output 是一个普通的 Inngest function run。 它出现在 dashboard 里,有 run ID,有 trace,支持 replay。如果你周一早上的 cron run 在 step 3 失败,周二的 cron 会正常运行,而周一的失败会一直保留,等你修好 bug 后 replay。
如果 cron 触发时你的 service 正好 down 了,会发生什么? 这个问题区分了一个 durable scheduler 和一个脆弱的 scheduler。Inngest 的 cron runs 在 schedule 触发的那一刻就被 durably 记录;如果你的 function endpoint 不可达,Inngest 会按 backoff retry,直到成功或撞上 retry ceiling。在 09:00 触发的 cron 不会因为你 09:00 正在滚动部署而「错过」;run 会等待,你完成部署,run 完成。开发中的 cron triggers 有一个值得知道的小怪癖:本地 dev server 只在它运行时才触发 crons。生产环境在 Inngest 的基础设施上运行它们,那是一直运行的。
Quick check。 三个论断。每个标 True 或 False。(a) 如果一个 cron function 需要 45 分钟运行,而它每 15 分钟调度一次,那么任意时刻都会有三个 concurrent instances 在运行。(b) 你可以在一个 cron-triggered function 内部用
step.sleep,把工作摊到一天中。(c) 一个 cron-triggered function 也可以从 dashboard 手动 invoke 来测试。
答案:(a) 取决于 concurrency policy:默认情况下 Inngest 会把重叠的 runs 排队;如果你设 concurrency=1 它们会串行;如果你设 concurrency=10 它们会并行。默认值是合理的。(b) True,而且这是「把每日工作摊到数小时以平滑负载」的常见模式。(c) True:Inngest dashboard 允许你按需 invoke 任意 function 来测试,与它的 trigger 无关。Try with AI
With my AI coding assistant connected to the Inngest dev server MCP,
write a cron-triggered Inngest function in Python that:
1. Runs every Monday at 09:00 UTC.
2. Queries the audit_log table for all conversations resolved in the
prior week (status='resolved' in that window).
3. Computes per-agent metrics: total conversations resolved, average
resolution time, count of escalations, count of refunds issued.
4. Returns the metrics as a JSON object.
After you write the function, use the MCP's `invoke_function` tool to
test it manually (instead of waiting for Monday). Confirm the audit
SQL is correct by using `grep_docs` to search Inngest's docs for
"step.run" examples.
概念 3:Webhook triggers,当外部世界打进来
第二个 trigger surface 是 HTTP。一个外部系统(Stripe、你的 email provider、一个 customer-portal 表单、一个 GitHub webhook)想调用你的 Worker。没有 Inngest,你就得:立起一个 HTTPS endpoint、解析 payload、验证来源、写入一个 queue、写一个从 queue 消费的 worker、处理 retries、处理 idempotency、发 telemetry。每一项都是一周的基础设施工作。
有了 Inngest,endpoint 是现成的。你在 Inngest dashboard 里配置一个 webhook,URL 形如 https://inn.gs/e/<your-key>,让 Stripe(或别的什么)指向那个 URL,webhook payload 就变成你 event stream 里的一个 event。任何带有匹配 event-name trigger 的函数现在都会触发。
@inngest_client.create_function(
fn_id="handle-stripe-refund-failed",
trigger=inngest.TriggerEvent(event="stripe/charge.refund.failed"),
)
async def on_refund_failed(ctx: inngest.Context) -> dict[str, str]:
"""Triggered by Stripe webhook → Inngest event → this function."""
charge_id = ctx.event.data["charge_id"]
customer_id = ctx.event.data["customer_id"]
# Look up which support ticket originated this refund
ticket = await ctx.step.run(
"find-ticket-for-refund", lookup_ticket_by_charge, charge_id,
)
# Wake the customer-support Worker with the full context
await ctx.step.run(
"notify-support-agent",
notify_support_agent_of_refund_failure,
ticket_id=ticket["id"], charge_id=charge_id,
)
return {"ticket": ticket["id"], "action": "notified"}
流程是:Stripe 退款一笔 charge 失败 → Stripe POST 到 Inngest webhook URL → Inngest 创建一个名为 stripe/charge.refund.failed 的 event → 上面那个函数(匹配那个 event name)触发 → 函数用 steps 查找 ticket 并通知 support agent。没有任何 HTTP 管道是你要写的。没有 endpoint,没有 parser,没有 queue,没有 consumer。
有两个相邻的模式值得点名:
- 通用 JSON webhooks。 如果来源不是一个已知的 vendor,你就把任何发 JSON 的服务指向同一类 endpoint,并自己挑 event name。Slash-namespaced names(
vendor/event.subtype)是惯例;没人强制它,但你遵循它时 dashboard 会排得很整齐。 - Webhook transforms。 如果进来的 payload 不符合你想要的形状,Inngest 允许你定义一个 "transform" 函数,它在接收时于服务端运行,在 event 进入你的 event stream 之前重塑它。这让你的函数代码不必沾上 provider-specific 字段。
Predict。 一个 Stripe webhook 在恰好同一毫秒发出
stripe/charge.refund.failed,而你的客户支持 Worker 也正在调用inngest_client.send发出另一个名为customer/refund.investigation_needed的 event。两个 events 同时到达系统;上面那个函数只在 Stripe event 上触发。这个函数会运行一次还是两次?Confidence 1-5。
答案:一次。这个函数注册时只在 stripe/charge.refund.failed 上触发;customer/refund.investigation_needed event 有一个不同的 name,匹配一个不同的函数(如果你写了的话,否则不匹配任何函数)。一个 event 的 name 就是它的 routing key。两个 name 不同的 events,永远不会因为同一瞬间到达就意外触发同一个函数。这是命名纪律重要的原因之一:一个 event name 里的 typo(customer/email_received vs customer/email.received)意味着函数永远不会触发,而症状是静默的。Inngest 的 dashboard 帮你抓这个:未匹配的 events 出现在一个你可以审计的单独 stream 里。Try with AI
I need to handle three webhook sources for my customer-support Worker:
A) Stripe: refund failed, charge disputed
B) Postmark (email service): bounced email, complaint
C) My internal admin UI: manual "investigate this ticket" button
For each, decide:
1. What event names you'd use (vendor/event.subtype format).
2. Whether the function reacting to it should run synchronously (the
caller is waiting) or asynchronously (fire and continue).
3. Whether you'd write a webhook transform to reshape the payload, or
consume it raw.
Then write the Inngest function for the Stripe refund-failed case in
Python, using the MCP's grep_docs to find the current syntax for
TriggerEvent and the dev-server MCP's send_event tool to test it.
概念 4:Idempotency 和 event 语义,同一个 event 触发两次
Webhooks 不是 exactly-once。它们是 at-least-once:如果发送方没收到 acknowledgment,它就 retry。网络丢包,服务重启,你的 endpoint 超时而发送方 retry,哪怕你其实已经成功了。没有 idempotency,每个 webhook system 最终都会给某个人重复扣款、重复发邮件或重复退款。这不是一个理论上的顾虑;它是 event systems 里最常见的生产 bug。
两层防御,两层都内置在 Inngest 里。
第 1 层:源头上的 Event ID seeds。 当你自己发送一个 event(而不是从 webhook 接收它)时,你可以附上一个 idempotency key:
await inngest_client.send(events=[
inngest.Event(
name="customer/refund.requested",
data={"order_id": "o-4429", "amount_cents": 5000},
id=f"refund-request-{order_id}-{request_timestamp}", # idempotency key
),
])
如果第二个带有同一个 id 的 event 在 dedup window 内(默认 24 小时)被发送,Inngest 会丢弃这个 duplicate。同一个逻辑 event,同一个 id,只有一个 function run。
第 2 层:Step-level idempotency。 在一个函数内,每个 step.run 由它的 name 标识。如果函数在 step 3 和 step 4 之间崩溃,retry 会从顶部重新运行函数代码,但对于 steps 1、2、3,Inngest 返回存储的 outputs,而不重新执行 step body。Step 4 第一次正常运行。这就是让一个函数 "durable" 的东西:已完成 steps 的副作用不会在 retry 时再次发生。
@inngest_client.create_function(
fn_id="issue-customer-refund",
trigger=inngest.TriggerEvent(event="customer/refund.requested"),
)
async def issue_refund(ctx: inngest.Context) -> dict[str, str]:
# Step 1: look up the order. If the function retries, this returns
# the SAME order data it computed the first time, from Inngest's memo.
order = await ctx.step.run(
"lookup-order", lookup_order_by_id, ctx.event.data["order_id"],
)
# Step 2: call Stripe. If the function retries AFTER this step
# succeeded, the Stripe call does NOT happen again. The refund is
# issued exactly once even if the function runs three times.
refund = await ctx.step.run(
"issue-stripe-refund", call_stripe_refund_api,
charge_id=order["stripe_charge_id"],
amount=ctx.event.data["amount_cents"],
)
# Step 3: write the audit row. Same property: runs at most once.
await ctx.step.run(
"audit-refund", write_audit_refund_issued,
order_id=order["id"], refund=refund,
)
return {"refund_id": refund["id"]}
如果这个函数在 step 3 期间崩溃,retry 会重新进入 step 1(拿到 cached order data,没有 DB call),重新进入 step 2(拿到 cached refund data,没有 Stripe call),真正运行 step 3,然后返回。客户的卡只被扣一次,即使函数运行了三次。 这是杀手级特性。它让 Inngest 与一个带 retry loop 的 queue 在本质上不同。
从函数的视角看,Inngest 的 memoization 给你 exactly-once 的step completion:一旦 step.run 把一个 step 记录为成功,它就不会重新执行。但有一个很窄的窗口。如果你 step 的 body 调用 Stripe(副作用发生在 Stripe 的服务器上),然后在 Inngest 记录结果之前崩溃,retry 会重新调用 Stripe。从 Inngest 的视角看,这个 step「没有完成」。从 Stripe 的视角看,这笔 charge 已经发生。生产级的模式是 Inngest step memoization 加上 provider-level idempotency keys:Stripe 的 Idempotency-Key header、Postmark 的 MessageID 复用、你自己的 MCP server 的 idempotency contract。把 step.run 和 provider idempotency keys 当作互补,而不是互相替代:step.run 让你函数的内部逻辑 exactly-once;provider 的 idempotency key 让外部副作用 exactly-once。
Quick check。 True or false。(a)
step.run只有在它内部的函数也 idempotent 时,才让 step idempotent。(b) 一个在 dedup window 之外、带 duplicate ID 的 event,会被当作一个新 event。(c) 如果step.run在执行中途失败(step 的代码抛出一个异常),Inngest 会存储这个失败,并在下一次 attempt 上 retry 这个 step,而不重新运行之前的 steps。
答案:(a) False:step.run 让step 的 invocation idempotent(成功后它最多运行一次),但如果内部的函数不是 idempotent 的(比如调用 Stripe),这个 at-most-once 保证恰恰是你想要的。整个要点就是你不必自己让调用 Stripe 这件事 idempotent。(b) True:Inngest 的 dedup window 默认是 24 小时;那个 window 之后,相同 ID 的 events 被当作新的。(c) True:自动 retry 是 memoized 的;Inngest 知道 step 3 在 attempt 1 失败,并在 attempt 2 只 retry step 3。之前成功的 steps 不会重新执行。(这是 run 内的 retry,不是 dashboard 的 Replay 按钮,后者是一个全新的 run,见概念 14。)Try with AI
Here are three scenarios. For each, decide: idempotency PROBLEM or
NO PROBLEM, and if it's a problem, what's the fix:
A) Stripe sends the same charge.refund.failed webhook three times
in 90 seconds (because their first two attempts timed out at
your endpoint). Your function emails the customer.
B) A customer clicks "Issue refund" three times because the page
was slow. Your function calls Stripe and writes audit_log.
C) Your nightly cron at 09:00 sends a customer-health-check event
to each Pro customer. If two crons fire at the same time (a deploy
bug), what happens?
For each problem case, propose ONE specific fix: event ID seed
inside the function, idempotency key in inngest_client.send, or
function-level deduplication on the trigger.
概念 5:Fan-out 和 sub-agent delegation,一个 event 唤醒多个 Workers
一个 event 常常需要在许多地方触发工作。Stripe 的 charge.refund.failed event 可能需要:通知 support agent、写 audit、更新客户的 risk score、提醒 finance ops、发 Slack。五个反应,全都独立,全都来自一个 event。
Inngest 的模式:让许多函数订阅同一个 event。 不需要 fan-out 代码;只要多个 @inngest_client.create_function decorators 用同一个 TriggerEvent。每个函数独立运行,有自己的 retries,有自己的 step trace,与其他函数独立地失败。
@inngest_client.create_function(
fn_id="refund-failed-notify-support",
trigger=inngest.TriggerEvent(event="stripe/charge.refund.failed"),
)
async def notify_support(ctx: inngest.Context) -> dict[str, str]:
# ... runs the customer-support Worker to draft a response ...
return {"status": "drafted"}
@inngest_client.create_function(
fn_id="refund-failed-update-risk-score",
trigger=inngest.TriggerEvent(event="stripe/charge.refund.failed"),
)
async def update_risk_score(ctx: inngest.Context) -> dict[str, float]:
# ... runs the risk-scoring Worker ...
return {"new_risk_score": 0.42}
@inngest_client.create_function(
fn_id="refund-failed-post-slack",
trigger=inngest.TriggerEvent(event="stripe/charge.refund.failed"),
)
async def post_to_slack(ctx: inngest.Context) -> None:
# ... posts a Slack notification ...
return None
一个 Stripe webhook 到达。Inngest 创建一个 event。三个函数触发,每个在自己的 run 里。如果 post_to_slack 因为 Slack down 而失败,另外两个不受影响,正常完成。那个失败的 run 留在 dashboard 里,等 Slack 恢复后 replay。这是多 Worker 协调的核心,也是你未来的 manager layer(后续课程)将在规模上组合的架构模式。
另一种 fan-out 模式:parent 发出 N 个 children。 有时 fan-out 是动态的。你的每日 cron 需要为每个 Pro customer 发出一个 customer-health event,那可能是 500 个,也可能是 5,000 个,取决于这一周。parent 函数发送 N 个 events:
from datetime import date
async def fan_out_per_customer_events(
customers: list[str],
) -> int:
events = [
inngest.Event(
name="customer/health_check.requested",
data={"customer_id": cid},
id=f"daily-health-{cid}-{date.today().isoformat()}", # idempotency
)
for cid in customers
]
await inngest_client.send(events=events)
return len(events)
5,000 个 events 在一次 send call 里发出。5,000 个 function runs 触发,每个带自己的 customer_id,每个隔离,每个可独立 retry。Flow control(概念 11)限制同时运行多少个,让你不会熔毁下游 APIs。Cron 函数在几秒内返回;fan-out 按 Inngest flow-control policies 允许的速率运行。
Sub-agent delegation 是 fan-out 的一个特例。在一次 Worker run 内部,你可以调用 await inngest_client.send(...),把 sub-tasks 委派给其他 Worker types。parent 不会等待 children,除非它显式用 step.invoke 同步运行它们并收集结果。
Predict。 你有三个函数都由
customer/email.received触发:起草回复的客户支持 agent(15 秒)、一个 analytics counter(50ms),以及一个检查客户是否高价值的 "VIP detector"(200ms)。当一封邮件到达时,每个函数的 user-visible latency 是什么样?三个选项:(a) 三者相加约 15 秒;(b) 三者并行运行,总 latency 约 15 秒(最慢的那个);(c) 各自独立运行,完全没有共享 latency。Confidence 1-5。
答案:(c)。每个函数都是它自己的 run,在它自己的 process slot 里。客户支持 agent 不阻塞 analytics counter;VIP detector 不阻塞 agent。从外部看,任何一个特定函数的 latency 就只是那个函数自己的时间。没有任何函数会等待一个兄弟函数。这就是 fan-out 能扩展的原因:consumers 是隔离的。如果 agent 崩溃,analytics counter 不受影响。Try with AI
Design the fan-out architecture for these three scenarios. For each,
sketch the event names and the functions that subscribe:
A) New customer signs up. Need to: send welcome email, create
Stripe customer, post to Slack #new-customers, write to
audit_log, schedule a 7-day follow-up.
B) Customer support email arrives. Need to: draft a reply (agent),
detect sentiment, check if VIP, update customer's "last contact"
timestamp, attach to the right ticket thread.
C) Daily cron at 09:00 needs to run customer-health-check on
~5,000 Pro customers. Each check takes ~30 seconds. We want
the whole batch to complete by 11:00 (a 2-hour window).
For each, decide: how many event types, how many subscriber
functions, what the idempotency story is, and one specific failure
mode this design protects against.
第 2 部分:反射,当某件事出问题时会发生什么
感官唤醒 worker。反射让 worker 熬过接下来发生的事。一个 worker 调用一个 agent,agent 调用几个 tools,tools 调用一个 database、一个支付 API 和一个 model:单个回合里有好几次外部调用,其中任何一次都可能失败。没有 durability,一次回合中途的瞬时失败就会让整个流程从顶部重新开始。反射是自动的:它行动得快,不需要 agent 的意识去做决定。那就是 durable execution 给你的东西。Durability 就是这个性质:当某件事在执行中途失败时,已经完成的工作保持完成,而执行从它断掉的地方恢复。 Inngest 用一个原语(step.run)和它下面的一套 memoization 机制来交付这个。第 2 部分会解释这两者,外加基于时间的变体(step.sleep、step.wait_for_event)、retry 语义,以及 step.ai 原语。
第一遍的压缩提示。 如果你在快速浏览,承重的概念是 6(
step.run)和 7(memoization)。概念 8-10 建立在它们之上。仔细读 6 和 7;一旦你脑子里有了这两个,其余的会读得很快。
概念 6:step.run 和 durable function 模型
一个普通的 Python 函数运行一次,从上到下。如果它在中途崩溃,你从顶部重新开始。如果它在崩溃前做了三次 API calls,下一次 attempt 会再做这三次调用,并为它们付费,还可能再次给某人重复扣款。
一个 Inngest 函数是 durable 的。每一个你想要被 checkpoint 的操作,都被包进 step.run(name, fn, ...)。函数在每次 attempt 仍然从上到下运行,但已经完成的 steps 返回它们存储的 outputs,而不重新执行。函数「追上」它断掉的地方,然后继续向前。
@inngest_client.create_function(
fn_id="customer-support-conversation",
trigger=inngest.TriggerEvent(event="customer/email.received"),
)
async def handle_email(ctx: inngest.Context) -> dict[str, str]:
customer_id = ctx.event.data["customer_id"]
# Step 1: load the customer record (one DB call)
customer = await ctx.step.run(
"load-customer", load_customer_by_id, customer_id,
)
# Step 2: load the conversation thread (one DB call)
thread = await ctx.step.run(
"load-thread", load_thread_for_customer, customer_id,
)
# Step 3: run the OpenAI Agents SDK agent (your worker)
response = await ctx.step.run(
"run-agent",
run_customer_support_agent,
customer=customer,
thread=thread,
email_body=ctx.event.data["body"],
)
# Step 4: write the draft reply to the database
await ctx.step.run(
"save-draft-reply", save_reply,
customer_id=customer_id, text=response.draft,
)
# Step 5: notify the on-call human reviewer via Slack
await ctx.step.run(
"notify-reviewer", post_slack_for_review, response=response,
)
return {"status": "drafted", "reviewer_notified": True}
五个 steps。每一个都被独立 checkpoint。
Durability 在这里给你买到什么,三个失败场景:
-
场景 A:agent step 抛出一个 timeout。 如果没有用
step.run包住 agent call,这个函数的下一次 retry 会重新加载 customer、重新加载 thread,并从头重新运行 agent,为 agent 已经部分完成的工作再次支付 OpenAI tokens。有了step.run,customer 和 thread 的加载被 memoized(steps 1-2 不重新执行);只有 step 3 retry。Inngest 的自动 retries 处理瞬时 OpenAI 错误,而你的代码不必知道。 -
场景 B:函数进程在 step 3 和 step 4 之间被杀掉(一次部署滚出来、一个 node 重启、container OOM 了)。没有 durability,agent 的 response 丢失,客户的邮件无人回复,直到有人注意到。有了 durability,函数在重启后恢复:steps 1、2、3 在毫秒级返回它们存储的 outputs,step 4 真正运行,step 5 真正运行,客户拿到那封起草好的回复。
-
场景 C:Slack 在 step 5 返回一个 503。 没有
step.run,你要么丢掉工作,要么专门为这个 Slack call 手写 retry-and-backoff 逻辑。有了step.run,Inngest 会用 exponential backoff 对 step 5 retry,直到 Slack 恢复;与此同时 steps 1-4 保持完成,不会重新执行。草稿回复已经在 database 里了;唯一 pending 的是那条 notification。
你不写任何 retry loops、任何「我是不是已经做过这件事了」的检查、任何 state machines。state machine 就是那一串 step.run calls。每个 step 是一个 node;每次 transition 都是 durable 的。
step.run 的那一条规则。 传给 step.run 的函数应该在给定它的 inputs 时是 deterministic 的:用相同的 arguments 调用它两次,应该产生相同的结果。
- 对 pure functions,这是自动成立的。
- 对 idempotent 的 API calls,也是自动成立的(Stripe 的
idempotency_key、你自己的 MCP server tools)。 - 对「生成一个随机 ID」或「用默认 temperature 调用一个 LLM」这类事,需要小心(一次 retry 可能产生与原 attempt 不同的 output,而有时这很重要)。
当操作不是 deterministic 时,你把它变成 deterministic:传入一个 seed、在 step 外预先生成那个随机值,或者接受 retry 可能与原 attempt 不同(对一个 agent response 来说常常没问题)。
Quick check。 True or false。(a) 函数 body 在每次 retry 都从顶部重新执行,包括所有 imports,以及
step.runcalls 之外的变量赋值。(b) 如果一个 step 需要 30 秒完成,而函数在第 25 秒崩溃,retry 会从这个 step 的第 25 秒继续。(c)step.run的 outputs 存储在 Inngest 的基础设施里,不在你的 application 里。
答案:(a) True,而这正是你把工作放在 step.run 内部的原因。step.run 外的代码在每次 retry 都重新运行;里面的代码每次 attempt 运行一次,成功后被 memoize。(b) False:step.run 是那个原子单位;如果一个 step 被打断,retry 会重新运行整个 step。如果你的 step 长到不能被允许重启,你就把它拆成更小的 steps。(c) True:step output store 是 Inngest 的一部分,不是你的 DB。这就是为什么即使你的 database schema 已经变了,你仍然能 replay runs。Try with AI
With my AI coding assistant connected to the Inngest dev server MCP,
shape a customer-support worker into an Inngest durable function.
Take a Runner.run call that processes a customer email and wrap each
of these inside its own step.run:
1. Load the customer record
2. Load the related conversation thread
3. Run the agent (the OpenAI Agents SDK Runner)
4. Persist the draft reply
5. Notify the on-call reviewer
Use grep_docs to find the current Python SDK syntax. Use
invoke_function to test it with a synthetic email payload. Then
deliberately raise an exception in step 4 and use get_run_status
to confirm steps 1-3 don't re-execute on retry.
概念 7:Memoization,resumability 底下的那套机制
概念 6 说「已经完成的 steps 返回它们存储的 outputs,而不重新执行」。那个机制就是 memoization,它的机理值得理解,因为每一个其他 Inngest 原语都用到它。
当你调用 await ctx.step.run("load-customer", load_customer_by_id, "c-4429") 时,在第一次 attempt 上发生三件事:
- Inngest 检查它的 memo store:「这个 run 里有没有 step
load-customer的存储结果?」没有。 - 函数
load_customer_by_id("c-4429")运行。它返回{"id": "c-4429", "tier": "pro", ...}。 - Inngest 把那个结果写入 memo store,key 是
(run_id, step_name="load-customer")。然后它把结果返回给你的代码。
如果函数在 step 3 之后崩溃,Inngest retry,那么在第二次 attempt 上,函数 body 从顶部重新运行。当执行到达同一行时,发生三件不同的事:
- Inngest 检查它的 memo store:「这个 run 里有没有 step
load-customer的存储结果?」有,它在 attempt 1 时被存了。 - 函数
load_customer_by_id("c-4429")不运行。那个 DB call 不发生。 - Inngest 在毫秒级把存储的结果返回给你的代码。
这就是 retries 便宜的原因:昂贵的工作已经被缓存。这就是 durability 正确的原因:昂贵的工作不会发生两次。这也是为什么「函数 body 从上到下重新运行」尽管听起来浪费却没问题:steps 内部的工作其实不重新运行;只有 steps 之间的 orchestration 代码重新运行。
让新用户意外的那个推论。 step.run 之外的代码在每次 attempt 都运行。如果你这么写:
async def handle_email(ctx: inngest.Context) -> dict[str, str]:
# ANTI-PATTERN: this runs on every retry. Don't do this.
expensive_thing: dict = await fetch_expensive_data(ctx.event.data["id"])
await ctx.step.run("do-something", do_something_with, expensive_thing)
return {"status": "done"}
fetch_expensive_data 在每次 retry 都运行。如果它每次调用花 $0.10,而函数 retry 5 次,你就刚刚花了 $0.50 去获取同一份数据五次。修复方法是把那件昂贵的事包进它自己的 step:
async def handle_email(ctx: inngest.Context) -> dict[str, str]:
expensive_thing: dict = await ctx.step.run(
"fetch-expensive-data", fetch_expensive_data, ctx.event.data["id"],
)
await ctx.step.run("do-something", do_something_with, expensive_thing)
return {"status": "done"}
现在 fetch_expensive_data 被 memoized 了;retries 不会再为它付费。
Step name 就是 memo key。 这就是 step names 必须在一个函数内唯一的原因。如果你在同一个函数里有两个 step.run("load-customer", ...) calls,Inngest 会把第一个的存储 output 返回给两个 calls。那几乎从来不是你想要的。如果你有一个 loop 调用一个 step N 次,给它们唯一命名(step.run(f"load-customer-{i}", ...)),让每次 iteration 都有它自己的 memo slot。
Predict。 你的函数有三个 steps。Step 1(
load-customer)在 DB calls 上花 $0.01,需要 100ms。Step 2(run-agent)在 OpenAI tokens 上花 $0.20,需要 12 秒。Step 3(save-draft)在 DB calls 上花 $0.005,需要 50ms。由于 OpenAI rate limits,Step 2 有 30% 的概率失败;Inngest 用 backoff retry。(a) 把三个都包进step.run与 (b) 只把 step 2 包进step.run,两者的成本差异是多少?Confidence 1-5。
答案:用 (a),一次 retry 只花你 step 2 的成本($0.20)。customer 和 save-draft 被 memoized;它们不重新执行。用 (b),每次 retry 都花你 steps 1 和 3 加上 step 2:每次 retry $0.215。在一千封邮件、30% retry rate 下,那是约 $4.50 的纯浪费,外加搞清楚当 step 3 运行两次时到底有什么被部分写入的运维复杂度。把你不想重新执行的一切都包进 step.run。 一旦你理解了这套机制,它就不是可选的。Try with AI
With my AI coding assistant: review the Inngest function we built
in Concept 6's Try-with-AI and identify any code BETWEEN step.run
calls that should be wrapped in its own step but isn't. Common
candidates:
- Computed values (timestamps, IDs, formatting) that we want to be
stable across retries
- Calls to logging or metrics services
- Reads from Redis, environment variables, secret managers
Then propose a refactor that moves each of these into its own step
with a meaningful name. For each, explain whether the side effect
is one you want to happen once (use step.run) or every retry
(leave it outside).
概念 8:step.sleep 和 step.wait_for_event,穿过时间的 durability
有些工作必须等待。一个 welcome-email pipeline 立即发一封邮件,然后等三天,再发一封 follow-up。一个 refund-investigation 需要等一个人来批准。一个 trial-conversion 流程会在 7 天内观察 "user upgraded to paid",并根据它看到的发送不同的邮件。
在一个普通的 Python 函数里,「等三天」意味着把一个进程打开三天。那不可行:你的进程会重启,你的 hosting 会为 72 小时的 idle compute 向你收费,你的 timer 会丢失。在 Inngest 里,「等三天」是一行:
from datetime import timedelta
@inngest_client.create_function(
fn_id="trial-welcome-series",
trigger=inngest.TriggerEvent(event="user/trial.started"),
)
async def welcome_series(ctx: inngest.Context) -> dict[str, str]:
user_id = ctx.event.data["user_id"]
await ctx.step.run("send-welcome-email", send_welcome_email, user_id)
# Wait three days. The function gets paged out of memory. Nothing
# is consuming compute. Three days later, Inngest pages it back in
# and resumes execution at the next line.
await ctx.step.sleep("wait-three-days", timedelta(days=3))
await ctx.step.run("send-followup", send_followup_email, user_id)
return {"status": "completed"}
step.sleep 是 durable 的,是休息中的神经系统。函数暂停;Inngest 存储恢复时间;等待期间没有东西消耗 compute;函数在正确的时间恢复,之前所有的 step outputs 仍然 memoized。step.sleep(以及 step.sleep_until)在 paid plans 上最多可等一年,在免费 Hobby plan 上最多七天(Inngest usage limits)。七天的 Hobby ceiling 足够覆盖本课用到的每一次 sleep。
更强大的那个 sibling 是 step.wait_for_event。不是等待时间,而是等待另一个 event。函数暂停,直到一个匹配的 event 到达,或者直到你设的一个 timeout 到期。这就是让 Inngest 成为 HITL(概念 15)和 inter-agent 协调模式最清晰表达的东西:
@inngest_client.create_function(
fn_id="refund-with-approval",
trigger=inngest.TriggerEvent(event="customer/refund.requested"),
)
async def refund_with_approval(ctx: inngest.Context) -> dict[str, str]:
request = ctx.event.data
request_id = request["request_id"]
# If amount is over $100, require approval before issuing
if request["amount_cents"] >= 10_000:
# Notify a human via Slack/email/whatever
await ctx.step.run("notify-approver", notify_human_approver, request)
# Wait for an approval event. Up to 24 hours; expires otherwise.
approval = await ctx.step.wait_for_event(
"wait-for-approval",
event="refund/approval.decided",
timeout=timedelta(hours=24),
if_exp=f"async.data.request_id == '{request_id}'",
)
if approval is None or not approval.data.get("approved"):
return {"status": "rejected_or_timeout"}
# Either it was under $100, or it was approved
refund = await ctx.step.run(
"issue-stripe-refund", call_stripe_refund_api, request,
)
return {"status": "issued", "refund_id": refund["id"]}
正在发生的是:
- 函数到达
wait_for_event。它暂停。消耗零 compute。 - 一个人看着 Slack notification,在你的 admin UI 里点 "Approve",你的 UI 调用
inngest_client.send(events=[Event(name="refund/approval.decided", data={"request_id": "...", "approved": True})])。 - Inngest 把这个 event 匹配到正在等待的函数(
if_exp确保只有属于这个 request_id 的 events 才匹配),并用这个 event 作为approval返回值恢复函数。 - 函数继续到 refund step。Stripe 退款在人批准之后才发生。
step.sleep 和 step.wait_for_event 是你不为之付费的 timeouts。函数在你的代码里看起来是同步的(「等三天,然后发邮件」),但 runtime 语义是异步且 durable 的。这是 Inngest 著名的两件事之一(另一个是 durable retries)。没有它,替代方案是一个 queue 加一个 state machine 加一个 database 加一个 poller,你会写一千行而不是三行。
Quick check。 三个论断。每个标 True 或 False。(a) 如果
step.sleep设为 30 天,而你的 service 在这 30 天里重新部署了五次,在 paid plan 上这个 sleep 会不被打断地继续。(b) 如果step.wait_for_event超时,函数会抛出一个异常。(c) 同一个函数里的两个step.wait_for_eventcalls 可以同时等待同一个 event。
答案:(a) 在 paid plan 上 True:sleeps 存在 Inngest 的基础设施里,不在你 service 的内存里,所以重新部署不会丢掉它们。注意 tier ceiling:一个 30 天的 sleep 在 paid plan 上没问题,但超过免费 Hobby plan 的七天 sleep 上限。(b) False:超时时,wait_for_event 返回 None。你的代码检查它并决定怎么办(rejection、escalation、default-approval,无论 policy 是什么)。(c) True,但值得警惕:当一个匹配的 event 到达时两者都会触发。如果这两个 wait_for_event calls 有不同的 if_exp filters,这没问题。如果它们完全相同,你大概是在看一个重构机会。Try with AI
Build a delayed-investigation flow with my AI coding assistant.
Specification:
1. Triggered by event 'customer/refund.failed'.
2. Immediately notify the on-call human via Slack with the refund
details and a "Investigate" button.
3. Wait for the human to click the button (which fires
'customer/refund.investigation_started') for up to 4 hours.
4. If the click arrives in time: run the agent to draft an
investigation summary.
5. If 4 hours pass without a click: escalate to a senior reviewer
by firing 'customer/refund.escalated'.
Use the dev-server MCP's send_event tool to simulate the
human-click event during testing. Use get_run_status to inspect
how the suspended function shows up in the dashboard. Before
writing, use list_docs to scan the Inngest documentation tree
for the right page on wait_for_event semantics, then
read_doc on the page you find to get the exact syntax for
the if_exp filter expression.
概念 9:Retries、error handling、dead-letter
这是反射的特写。默认情况下,Inngest 会 retry 失败的 steps。默认值是合理的:约 4 次 retries,采用 exponential backoff,attempts 之间从几秒到几分钟不等。最后一次 retry 失败后,run 进入 failed 状态,并停留在那里,供检查和(可选)replay。你可以按函数调它:retries=10、retries=0(完全不 retry)、不应该被 retry 的特定异常类型。
@inngest_client.create_function(
fn_id="charge-customer",
trigger=inngest.TriggerEvent(event="order/checkout.completed"),
retries=2, # only retry twice; this involves Stripe; don't keep hammering
)
async def charge_customer(ctx: inngest.Context) -> dict[str, str]:
try:
charge = await ctx.step.run(
"call-stripe", call_stripe_charge, ctx.event.data,
)
return {"status": "charged", "charge_id": charge["id"]}
except StripeCardDeclinedError as e:
# A declined card is not a transient failure. Don't retry.
# Mark the order as failed in our database and emit an event
# for the dunning flow.
await ctx.step.run(
"mark-failed", mark_order_failed,
ctx.event.data["order_id"], reason=str(e),
)
await ctx.step.run(
"emit-dunning-event", emit_dunning, ctx.event.data["order_id"],
)
return {"status": "card_declined"}
三个模式很重要。
模式 1:瞬时 vs 永久失败。 Inngest 默认 retry 一切,但有些错误不是瞬时的。来自 Stripe 的 card-declined 错误,retry 时还会被拒。你下游 API 的一个 401-unauthorized,不会因为你等一会儿就变成 200。你的函数应该专门 catch 这些并处理它们:写入你的 DB、发出一个下游 event、干净地返回,让它们不在无望的 attempts 上浪费 retry budget。Inngest 的 NonRetriableError 明确告诉 Inngest 对一个抛出的异常跳过 retries。
模式 2:step-level vs function-level 错误。 一个抛出的 step 会被 retry。step-level retries 耗尽后,函数失败。有时你想让一个函数熬过一个失败的 step:记录这个失败,把这件工作标为 "partial",继续。把 step.run 包进 try/except。这个 step 仍然得到它的 retries;如果所有 retries 都失败,异常传播到你的 catch block,你在那里决定怎么办。
模式 3:dead-letter 和 replay。 当一个函数彻底失败时,它不会消失。它进入 Inngest dashboard 的 "failed runs" view,带完整的 trace、所有 step outputs、那个异常,以及一个 Replay 按钮。在你发布一个 bug 修复之后,你可以 replay 那些失败的 runs:每一个都在修好的代码上从顶部重新运行(一个全新的 run,不是一个保留 memo 的恢复,那个区别是概念 14)。这是传统 queues 里的 "dead-letter queue" 模式,只是你不写那个 dead-letter handler。你只管修 bug 并 replay,让有副作用的 steps 保持 idempotent,这样一次重新运行不会重复动作。
Predict。 你的函数在 step 2 调用 Stripe,在 step 4 调用你的客户数据服务。Stripe 在 step 2 的第一次 attempt 返回 503(service unavailable,瞬时)。Step 2 用 exponential backoff retry 4 次(约 1s、2s、5s、12s);在第 4 次 retry,Stripe 回来了,charge 成功。现在 step 4 运行,而数据服务 down 了,返回 500。Inngest 会 retry 整个函数,还是只 retry step 4?多少次?Confidence 1-5。
答案:只 step 4,而且它得到它自己的 retry budget。 Steps 不共享 retries。Step 2 的四次 retries 与 step 4 的独立。Inngest 会 retry step 4(默认约 4 次),如果 MCP server 回来,step 4 完成,函数成功。Step 2 的那笔 Stripe charge 不会被重新发出,因为 step 2 的 output 在它成功 retry 之后被 memoized 了。客户被扣款恰好一次,即使这个函数在 retries 中花了 20 秒。Try with AI
With my AI coding assistant: extend the customer-support Worker
function from Concept 6 with explicit retry and failure handling.
Specification:
1. The OpenAI Agents SDK call should retry 3 times on transient
failures (rate limit, timeout), but NOT retry on a content-policy
refusal from the model.
2. The Slack notification should retry up to 10 times (Slack is
often flaky; don't lose the notification).
3. The Postgres write should retry once; if it fails again, log the
failure and continue (don't fail the whole function over a
transient DB blip).
For each step, decide what's transient vs permanent and structure
the try/except accordingly. Use grep_docs to find the Python SDK's
NonRetriableError equivalent.
概念 10:Python 中用 step.run 包 AI calls(step.ai.wrap 仅 TypeScript)
概念 6-9 对任何有副作用的代码都成立:DB writes、API calls、file writes、agent invocations。Inngest 也提供 AI-specific 的 step 原语,处理 LLM calls 容易出的那些模式:rate-limit retries、对 prompts 和 responses 的 observability,以及(可选的)降低 serverless compute 成本的 inference proxying。
先把重要的 Python-vs-TypeScript 区别说在前面。 Inngest 的
step.aimodule 有两个方法,它们的语言支持不同。step.ai.infer()在 TypeScript 和 Python 中都可用(Python SDK v0.5+):它把 inference 外包给 Inngest 的基础设施,并 trace 这次调用。step.ai.wrap()仅 TypeScript:今天没有 Python equivalent。对于 Python projects(比如这门课的 Worker),包裹一个 OpenAI Agents SDK call 的正确模式是ctx.step.run(...),它已经给你完整的 durability、retries,以及被包裹 step 的 inputs 和 outputs 的 observability。你只是拿不到 TypeScript 的step.ai.wrap额外加的那种 LLM-specific 的 prompt/response telemetry。 (截至 2026 年 5 月,已按 AI Inference docs 核对。)
Python 中 OpenAI calls 的 step.run(推荐模式)。 你的函数在 ctx.step.run("name", fn, ...) 内部做 OpenAI call。Inngest trace 这个 step 的 inputs 和 outputs(你传入的 arguments 以及返回的东西),在瞬时失败上 retry,并 memoize 结果,让后续 steps 的 retries 不再重复支付 OpenAI 成本。Prompt 和 response 作为这个 step 的 input/output 记录在 dashboard 里:
from openai import AsyncOpenAI
oai = AsyncOpenAI()
async def call_openai_summary(thread_text: str) -> str:
"""A normal async function. Inngest doesn't care that this is an LLM call."""
response = await oai.chat.completions.create(
model="gpt-5",
messages=[
{"role": "system", "content": "Summarize this support thread in 3 sentences."},
{"role": "user", "content": thread_text},
],
)
return response.choices[0].message.content
@inngest_client.create_function(
fn_id="summarize-customer-thread",
trigger=inngest.TriggerEvent(event="customer/thread.summary_requested"),
)
async def summarize_thread(ctx: inngest.Context) -> dict[str, str]:
thread: list = await ctx.step.run(
"load-thread", load_thread, ctx.event.data["thread_id"],
)
# The OpenAI call is wrapped in step.run. Inngest sees this as a step:
# the inputs (formatted thread text) are recorded, the output (summary
# string) is recorded, the call is memoized on success, and retries are
# automatic on transient failures.
summary: str = await ctx.step.run(
"openai-summary", call_openai_summary, format_thread(thread),
)
return {"summary": summary}
在 dashboard 里,这个 run 显示函数的 step trace(load-thread 后接 openai-summary),带每个 step 的 inputs 和 outputs。如果 OpenAI 返回一个 429(rate limited),Inngest 自动用 backoff retry openai-summary:memoization 语义与概念 7 相同,所以 retries 不会重复计费之前的 load-thread step。与 TypeScript 的 step.ai.wrap 相比你拿不到的是:自动的 LLM-specific telemetry,比如 token counts、model name,以及 dashboard 的 AI view 里按 provider 拆开的 traces。对大多数 Python 生产负载来说,标准的 step trace 加上你自己的 OpenAI client telemetry(例如 OpenAI Agents SDK 的 tracing)足以覆盖这个缺口。
因为 step.run 把每个 step 的 inputs 和 outputs 记录到 Inngest 的 observability store,你通过一个 step 传递的内容会被存储,并在 dashboard 里可见。如果你的 prompt 包含 PII(姓名、emails、addresses)、secrets(API keys、internal tokens)、合同或财务数据,或受监管的内容(HIPAA、GDPR-scoped data、PCI),不要把原始内容传进 step body。Redact、hash、summarize,或者传一个引用(一个 customer_id 和 ticket_id,而不是完整的 ticket 文本),并在 step body 内部从你的 authoritative store 重新加载敏感内容,那里的 retention 和 access controls 由你来配置。如果你启用 OpenAI Agents SDK 自己的 tracing,同样的纪律适用。把 step traces 当作你对待任何生产 log 那样:默认很有用,但受 policy 监管。
step.ai.infer:一个用于降低 serverless 成本的小众工具(Python 支持)。 你很少会用到它;step.run 是本课每一次 AI call 的默认。step.ai.infer 存在是为了一个特定情形:不是从你的函数进程调用 OpenAI,而是请求 Inngest 的基础设施去做这次调用,这样在请求 in flight 期间你的函数进程可以 deallocate。在为 in-flight time 计费的 serverless 平台(Vercel、Cloudflare Workers、AWS Lambda)上,这在等待期间省下 compute 成本。对于长时运行的 inferences(Deep Research、大型 embedding batches)这种节省是真实的。对于亚秒级的调用,它增加 latency 而没多少好处。
如果你确实要用它,从 AI Inference docs 拉取你所安装版本的确切签名:它位于 experimental 的 inngest.experimental.ai namespace 里,本课的构建没有用到它。
Quick check。 True or false。(a) 在 Python 中,
ctx.step.run("name", call_openai, ...)让 OpenAI call durable、在瞬时失败上 retry、并在成功后 memoize。(b)step.ai.infer是在 Python 中把 Inngest 与 OpenAI Agents SDK 一起用的硬性要求。(c) 把单次 OpenAI call 的step.run替换成step.ai.infer,总会让函数运行起来更便宜。
答案:(a) True:这是推荐的 Python 模式。OpenAI call 进入 step body;Inngest 把整个 step 当作工作单位。(b) False:step.run 对大多数情况足够。step.ai.infer 是对 serverless compute 成本的优化,不是要求。Worked Example 里的 OpenAI Agents SDK 集成用的是普通 step.run。(c) False:只有当 (i) 你在一个为 in-flight time 计费的 serverless 平台上,并且 (ii) 这次调用长到 request-offload 的节省盖过额外 orchestration overhead 时,step.ai.infer 才省钱。对于 always-on 服务器上的亚秒级调用,普通 step.run 胜出。Try with AI
With my AI coding assistant: take a customer-support agent
invocation and produce TWO versions of the Inngest function that
calls it:
Version A: Wrap the Runner.run call in step.run (the recommended
Python pattern: durable, retried on transient failures, memoized;
you get the standard step trace).
Version B: For comparison, write a SEPARATE small Inngest function
that calls a single OpenAI completion via step.ai.infer (the
Python-supported step.ai primitive that offloads inference to
Inngest's infrastructure to save serverless compute cost).
For each version, explain (a) what the dashboard trace shows for a
successful run, (b) what happens when the OpenAI call hits a 429
rate limit, and (c) on which kind of deployment (always-on server
vs serverless) Version B's offload saves real money.
第 3 部分:平衡与恢复,生产规模
平衡是第三层:它让 worker 在负载下保持健康,就像你的身体在你猛推它时让自己保持稳定。Concurrency 阻止 worker 熔毁下游系统。Throttling 让你远离 rate-limit 的墙。Priority 和 fairness 防止一个话多的客户饿死所有人。Batching 把「午夜的 10,000 个 events」变成「100 个可管理的 function runs」。Replay 把「昨天的 bug 让我们损失了 200 次失败的交互」变成「我们修好了;200 个对话恢复了」。人工审批 gate 让 agent 暂停,直到一个人批准。第 3 部分的五个概念给你那些生产 policies,把一个能工作的 worker 变成一个你敢放到付费客户面前的 worker。
概念 11:Concurrency 和 throttling
Concurrency 是一个函数能同时执行的最大 runs 数。Throttling 是单位时间内能启动的最大 runs 数。两者都按函数配置,各一行。两者也是团队从 prototype 走向规模时最常见的生产缺口。
from datetime import timedelta
@inngest_client.create_function(
fn_id="customer-support-conversation",
trigger=inngest.TriggerEvent(event="customer/email.received"),
concurrency=[inngest.Concurrency(limit=10)],
throttle=inngest.Throttle(limit=100, period=timedelta(minutes=1)),
)
async def handle_email(ctx: inngest.Context) -> dict[str, str]:
...
concurrency=10 的意思是:任意时刻最多 10 个这种函数在运行。第 11 个 event 在 queue 里等待,直到那 10 个里有一个完成。throttle=100/minute 的意思是:每分钟最多启动 100 个新的 runs。第 101 个 event 会等待,即使还有 concurrency headroom。
为什么实践中两者都重要。 Concurrency 保护下游系统:如果你的客户支持 Worker 跟 OpenAI 和 Postgres 说话,有 1,000 个 concurrent runs 就意味着 1,000 个同时的 OpenAI calls 和 1,000 个同时的 Postgres connections。你会耗尽你的 OpenAI rate limit、耗尽你的 connection pool,或者两者都耗尽。Throttle 防的是 bursts:如果 500 封客户邮件在 9:00am 整点到达,你不想要 500 个函数在同一秒启动;throttle 平滑那个启动速率。
Per-key concurrency。 单个 concurrency limit 全局地作用于这个函数。一个更有意思的模式是 per-key concurrency:按 event 的某个属性来限制。
@inngest_client.create_function(
fn_id="customer-support-conversation",
trigger=inngest.TriggerEvent(event="customer/email.received"),
concurrency=[
inngest.Concurrency(limit=10), # global cap
inngest.Concurrency(limit=2, key="event.data.customer_id"), # per-customer cap
],
)
async def handle_email(ctx: inngest.Context) -> dict[str, str]:
...
这意思是:全局最多 10 个函数运行,而且每个客户同时最多 2 个。如果单个客户一分钟内发 100 封邮件,只有其中 2 封被同时处理;另外 98 封在后面排队。与此同时,其他客户的邮件正常流动;它们不被那个话多的客户阻塞。这就是两行代码里的多租户公平性。概念 12 把这个模式进一步展开。
Quick check。 三个论断,True or False。(a) 如果你设
concurrency=10而 1,000 个 events 一次到达,其中 990 个被丢弃。(b) Throttling 和 concurrency limits 都会降低总吞吐量。(c) Per-key concurrency 需要一个从 event data 中 deterministic 得出的 key。
答案:(a) False:events 不被丢弃;它们排队。Inngest 的 queue 是 durable 的;那 990 个 events 等待直到 concurrency slots 打开。(b) False。 Throttling 限制启动速率;concurrency 限制 in-flight runs。两者都不丢工作;两者都塑造工作何时执行。如果你的平均负载在 limits 之下,长时间窗口里的吞吐量不变。峰值上的吞吐量被塑形:bursts 由 queue 吸收。(c) True:key 表达式在 event data 上求值;它必须为相同的逻辑 scope 产生一个稳定的字符串(customer_id 可以;current_timestamp 不行)。Try with AI
With my AI coding assistant: design the concurrency and throttling
policy for the customer-support Worker. Constraints:
- OpenAI rate limit: 30 requests per minute, hard cap.
- Postgres connection pool: 20 max connections (the Worker takes 1 per run).
- Some customers send bursts of 30+ emails in a minute (an angry
customer); these shouldn't starve other customers.
- We expect ~1,000 emails per day, with peaks around 9am and 2pm.
Propose:
1. A global concurrency value
2. A per-customer concurrency value
3. A throttle (limit and period)
For each, explain what production failure it protects against and
what the cost is (in queue latency at peak).
概念 12:Priority 和 fairness,多租户扩展
Concurrency limits 有效。Per-key concurrency 加上了基本的公平性。生产级的多租户系统需要更多:priorities(Enterprise 客户不应为同一份 compute 排在 hobbyists 后面)和 fair-share scheduling(任何单个 tenant 都不能垄断系统,即便是在它自己的 concurrency cap 之内)。
Priority。 Inngest 对每个 event 求一个 priority 表达式;priority 更高的 runs 跳到 priority 更低的 runs 前面。
@inngest_client.create_function(
fn_id="customer-support-conversation",
trigger=inngest.TriggerEvent(event="customer/email.received"),
concurrency=[inngest.Concurrency(limit=10)],
priority=inngest.Priority(
# Enterprise tier = high priority; Pro = 0; Free = low priority
run="100 - (event.data.customer_tier_priority * 100)",
),
)
async def handle_email(ctx: inngest.Context) -> dict[str, str]:
...
当 concurrency queue 里有 50 个 runs 在等待时,Enterprise 客户的 runs 先走,然后 Pro,然后 Free。同一 tier 内部,按 FIFO 顺序。Priority 不会覆盖 concurrency 或 throttle limits;它只决定等待中的 runs 里哪一个拿到下一个空 slot。一个 Enterprise 客户仍然要等一个 slot 打开;他们只是拿到下一个。
Fair-share scheduling。 当你有几百个 tenants 争夺同一个全局 concurrency pool 时,FIFO 加 priority 还不够。单个 tenant 发一个 burst 仍然能占据大部分 slots 好几分钟。Fair-share scheduling,通过 concurrency 上的 key parameter 加上深思熟虑的 sizing 来实现,给每个 tenant 一块有保证的份额:
concurrency=[
inngest.Concurrency(limit=50), # global pool
inngest.Concurrency(limit=3, key="event.data.tenant_id"), # max 3 per tenant
],
有了这个:总共 50 个 slots,没有 tenant 拿超过 3 个。如果 20 个 tenants 活跃,那最多请求 60 个 slots,但只有 50 个可用。Fair-share 轮转它们,每个 tenant 拿到一些份额,没有人被关在门外。
Predict。 你有一个客户支持函数,带
concurrency=10和 per-customerconcurrency=2。你还配了 priority:Enterprise = high,Free = low。在 9:00am,queue 里有:来自 Customer A(Free)的 5 个 events,来自 Customer B(Enterprise)的 5 个 events,以及来自一个全新的 Customer C(Free,刚买他们第一份 plan)的 10 个 events。它们以什么顺序执行?Confidence 1-5。
答案:这是一个多层决策。首先,per-customer cap 为 2 意味着每个客户最多有 2 个 events 有资格同时运行。所以候选池是:A 的 2 个、B 的 2 个、C 的 2 个:六个 runs 立即 eligible。其次,priority 决定这六个里哪些先填满头几个 slots:B 的两个先运行(Enterprise),然后 A 的两个和 C 的两个(Free,FIFO)。所以在 t=0:B 的 2 个运行,然后 A 的 2 个启动,然后 C 的 2 个启动。总共:6 个 active。每当一个完成,它那个客户的下一个排队 event 变得 eligible,下一个 slot 按 priority 填充。这种 policy 在 Inngest 里是一个功能,在你自己的代码里是一个一千行的 scheduler。Try with AI
With my AI coding assistant: extend the customer-support Worker
configuration with a priority and fair-share scheme. Requirements:
1. Three customer tiers: Enterprise, Pro, Free.
2. Enterprise customers should never wait more than 5 seconds at
peak load.
3. Free tier customers should get fair access: no Free customer
should be starved for more than 60 seconds, even when the
global queue is full.
4. A single noisy customer (regardless of tier) should not occupy
more than 3 slots.
Write the concurrency + priority configuration. For each line of
config, explain which requirement it satisfies.
概念 13:Batching,划算的批量处理
有些工作天然是成批的。你不会把 10,000 个客户对话各自独立地 summarize;你每次用一批 50 个去调用 LLM。你不会一条一条地写 10,000 行 audit;你 COPY 它们。Inngest 的 batch trigger 让你积累 events,并用这一批作为 input 调用单个函数一次。
@inngest_client.create_function(
fn_id="batch-embed-tickets",
trigger=inngest.TriggerEvent(event="ticket/resolved"),
batch_events=inngest.Batch(
max_size=50, # invoke when 50 events accumulated, OR
timeout=timedelta(seconds=30), # invoke when 30 seconds pass, whichever first
),
)
async def batch_embed_resolved_tickets(ctx: inngest.Context) -> dict[str, int]:
# ctx.events (plural) instead of ctx.event
ticket_ids = [e.data["ticket_id"] for e in ctx.events]
tickets = await ctx.step.run(
"load-tickets", load_tickets_by_ids, ticket_ids,
)
# One embedding call for 50 tickets, not 50 calls for 1 ticket each
embeddings = await ctx.step.run(
"embed-batch", embed_texts_batch,
[t["text"] for t in tickets],
)
await ctx.step.run(
"store-embeddings", store_embeddings_batch,
ticket_ids, embeddings,
)
return {"batched": len(ctx.events)}
变化在于:ctx.events 是一个 list,不是单个 event。函数每一批运行一次,而不是每个 event 运行一次。OpenAI embedding API 用一批 50-text 调用,而不是 50 次单文本调用,这显著更便宜(你按 token 付费,但 per-request overhead 没了)也更快(一次 API 往返,而不是 50 次)。
Batching 是对的工具,当这件工作天然可成批(embeddings、bulk DB writes、bulk emails),并且你能容忍在工作发生之前最多等到 timeout 那么久。它是错的工具,当每个 event 都需要交互式响应,或者跨 events 的顺序以不可预测的方式重要时。
Quick check。 True or false。(a) Batched 函数仍然得到 retries 和 memoization;整批作为一个整体被 durably memoized。(b) 如果 batch timeout 在只积累了 3 个 events 时到期,函数不会运行,直到接下来的 47 个到达。(c) 你可以把
batch_events和concurrency组合,来限制有多少批并行运行。
答案:(a) True:这一批是工作单位;retries 重放整批,它的所有 events 都还在 scope 里。(b) False:那正是 timeout 的全部要点。30 秒后函数用积累到的东西运行,即使只有 1 个 event。(c) True:这是生产模式。Batch 加 concurrency 一起,把你的下游负载限制得很好。Try with AI
With my AI coding assistant: write a batched Inngest function that
embeds resolved support tickets, converting a per-ticket event
handler into one batched call.
Triggers: 'ticket/resolved' event, batched at 50 events or 30 seconds.
The function should:
1. Load the ticket bodies in one query
2. Call OpenAI embeddings API with a 50-text batch (faster + cheaper)
3. Store the embeddings
4. Emit a 'ticket/embedded' event per ticket for downstream consumers
Use grep_docs to find the OpenAI batch-embedding pattern.
概念 14:Replay 和 bulk cancellation,生产恢复
有时一切同时出错。你发布了一个 bug;过去六小时里一千个 runs 失败了。或者你的下游 API down 了 30 分钟;那个窗口里所有试图调用它的东西都死了。或者你发现了一个逻辑错误,想在修好之后重做一天的工作。
先说那个绊倒所有人的区别。 Inngest 给你两种失败的 step 可以重新运行的方式,它们的行为不同:
- 自动 retry(在同一个 run 内)。 当一个 step 抛出时,Inngest 用 backoff retry 这个函数,从顶部重新进入。完成的 steps 从 memo 返回,不重新执行;只有失败的那个 step 再次运行。这是那个保留 memo 的恢复,是你在 Quick Win 里看到的那一个,也是让「在 step 3 花的 $0.20 不被重花」这个性质成真的那一个。它是自动的,发生在原来的 run 内部。
- Replay / Rerun(dashboard 按钮,跨许多 runs)。 这用你当前部署的代码从顶部启动一个全新的 run,每个 step 从零重新执行(一次 rerun 拿到一个新的 run id,并重新运行第一个 step,不是旧 run 的恢复)。所以实践中旧 run 的 memo 在这里帮不了你。它用于 incident recovery,不是用来跳过已完成的工作。
把这两个分清楚就是整个概念。memo 的回报存在于自动 retry 里;Replay 是一次全新的开始。
两个方向相反的恢复原语。 Replay 说*「这件工作失败了,我想让它在修好的代码上再跑一次。」* Bulk cancellation 说*「这件工作排了队,但我不再想让它发生。」* 同一个 dashboard surface,相反的意图。大多数团队在跑真实流量的头三个月里两者都需要。
Replay 是那个恢复原语。失败的 runs 带着它们完整的 step history、那个 input event,以及失败 step 的异常,被保留下来。从 dashboard,你打开 Functions view,筛选到一个有失败 runs 的函数,选一个时间窗口和一个 failure pattern(任何具体的 error message,或者只是 "all failures"),点 Replay。Inngest 把每一个调度为当前部署的代码上的一次从顶部开始的全新 run。
关于 replay 要理解三件事。
- Replay 用你当前部署的代码。 如果你在 runs 失败和你 replay 它们之间部署了一个修复,replayed runs 用新代码。这就是全部要点:拿一批死在某个 bug 上的 runs,发布修复,并不用手地把它们全部重跑。
- Replay 重新执行每个 step;它不复用旧 run 的 memo。 一个 replayed run 是一个新 run,所以每个 step 在修好的代码上从零再跑一次。从成本看,按每个 replayed run 整个函数的成本来规划,而不只是失败的那个 step。让一次 replay 不发出第二个真实世界副作用(一次重复退款、一封重复邮件)的,不是 memo,而是那个副作用上的一个 idempotency key(概念 4):你从请求中导出一个稳定的 key(对一次退款,类似
(order_id, request_id)),provider 把重复当作一个 no-op。本课里那个最小的 worker 为了简洁省掉了那个 key,它的退款按客户匹配并无条件写入,所以一个生产版本会在任何真钱动起来之前加上一个。Memo 在一个 run 内保护;idempotency key 跨重跑保护。 - Replay 是 opt-in 的。 失败的 runs 坐在 dashboard 里,直到你对它们采取行动。它们不会永远 retry;它们不会消失。它们等你。
Bulk cancellation 是反过来的。有时你有成千上万个排队中或睡眠中的 runs,你不再想要它们:一个 campaign 被取消了,一个客户流失了而你不再想给他们发 follow-up emails,一个 feature 被回滚了。从 dashboard,你选一个函数和一个时间窗口或 event filter,点 Cancel。匹配的 runs 干净地终止:它们的 step.sleep 和 step.wait_for_event calls 不恢复,排队的 runs 不启动,in-flight 的 runs 在下一个 step 边界检查 cancellation 并退出。Cancellation 尊重 step 边界;一个 in-flight 的 step.run 在终止前先完成它正处于的那个 step,所以你不会得到半完成的 Stripe charges 或撕裂的 DB writes。
把 replay vs cancellation 当作一个决策。 当一批 runs 出了问题,问一个问题:我想让这件工作成功,还是想让它不发生? 如果工作应该成功(bug-fix 恢复),replay。如果工作不应该发生(取消的 campaign、流失的客户、回滚的 feature),cancel。如果你不确定(比如失败的 runs 里有些你想恢复、有些本就不该触发),把你的 dashboard query 筛得更窄,让每个子集得到对的处理。
实践中它支持三个模式:
- 「我们发布了一个 bug」的恢复。 在那次糟糕部署的时间窗口里找到失败的 runs,修 bug,发布修复,replay 那些失败。客户体验:他们的邮件有一个小时没收到回复,但最终收到了,而你没写任何恢复代码。
- 「campaign 取消了」的回滚。 一个 welcome series 在 14 天里发三封 follow-up emails;客户在第 4 天流失。你不想发第 7 天和第 14 天的 follow-ups。Bulk-cancel 匹配的
wait-for-event和sleepruns。 - 「schema migration」的 replay。 你改了 agent 格式化 summaries 的方式;你想用新格式重新 summarize 昨天的 tickets。找到那些 runs(成功的或不成功的)并 replay 它们;因为一次 replay 是一次从顶部开始的全新 run,agent 在新代码上重跑每个 step,这正是你这里想要的。让你那些有副作用的 steps 保持 idempotent,这样重跑它们不会重复扣款或重复发送。
dev-server MCP 让恢复在不离开你的 coding agent 的情况下可达。开发期间你可以让 AI 用 get_run_status 检查一个失败的 run,然后通过在修好的代码上重新发出那个 event 来恢复这件工作(给它一个新的 event id,因为用相同的 id 重新发出会被概念 4 的 idempotency 语义去重成一个 no-op)。dashboard 的 Rerun 按钮是那个等价的一键路径。无论哪种方式,你得到的都是当前代码上的一次全新 run,不是一次保留 memo 的恢复。
Quick check。 True or false。(a) 一次 dashboard Replay 在新部署的代码上重新运行这件工作。(b) 一次 dashboard Replay 从 memo 返回原 run 成功的 steps,只重新运行失败的那个。(c) 一个失败 run 内部的自动 retry 从 memo 返回完成的 steps,只重新运行失败的那个 step。(d) Bulk-canceling 一个 in-flight 的函数会 mid-step 中止当前正在执行的
step.run,以便更快终止。
答案:(a) True:一次 replay 是当前部署的代码上从顶部开始的一次全新 run,这就是它是 bug-fix 恢复工具的原因。(b) False:这是那个陷阱。一次 replay 是一个从顶部重新执行每个 step 的新 run,所以旧 run 的 memo 不带过来。阻止一个 replayed 副作用发出两次的,是那个 idempotency key,不是 memo。(c) True:这是那条保留 memo 的路径,也是你在 Quick Win 里看到的那一条。完成的 step 停在一次 attempt,而失败的 step 重试。(d) False:cancellation 尊重 step 边界;当前的 step.run 在 run 终止前先完成(或失败)。这防止撕裂的 writes。Try with AI
Walk through a recovery scenario with my AI coding assistant:
Yesterday at 14:00 we deployed a change to the worker's agent step.
A bug in the new code made the agent step throw on every run.
From 14:00 to 18:00, 47 customer-support runs failed at that step.
At 18:30 we noticed, fixed the bug, and re-deployed.
Use the dev-server MCP's grep_docs to find Inngest's replay docs,
then:
1. Outline the exact dashboard steps to identify the 47 failed runs.
2. Explain what a dashboard Replay does for one of those runs: is it
a fresh run from the top on the fixed code, or a resume that
reuses the old run's memo? What does that mean for the cost of
replaying all 47?
3. Confirm whether the customers will see one reply or several if a
replayed run re-sends the email, and name the mechanism that
keeps it to one (hint: it is not memo).
4. Identify ONE scenario in this story where you'd prefer to
bulk-cancel instead of replay, and explain why.
概念 15:用 step.wait_for_event 做 HITL gates,runtime 里的 Invariant 1
Agent Factory 的 Invariant 1 说人类是 principal:在高风险决策上,runtime 必须尊重的是 authored intent,而不是 agent 的自主判断。这是一个人的意识重新踏进 loop 的那一个地方。其他每个地方,神经系统都凭反射自己运转;在这里它暂停并等一个人。这在生产里表现为 approval gates:agent 做分析、起草动作,但在一个人批准之前不执行这个动作。
Inngest 的 step.wait_for_event(概念 8)是今天任何平台上对此最清晰的表达。Agent 运行到决策点,暂停,等待一个 approval event。人来 review(在 Slack 里、在一个 admin UI 里、在 email 里)并点 approve 或 reject。Event 触发。函数带着人的裁决恢复并据此行动。这就是 spec-driven 在 runtime 的含义:神经系统执行那个计划,哪个动作需要人、以什么顺序、用什么 timeout。它不监管 agent 的推理;它控制 agent 被允许做什么。
@inngest_client.create_function(
fn_id="refund-with-hitl-gate",
trigger=inngest.TriggerEvent(event="customer/refund.investigated"),
concurrency=[inngest.Concurrency(limit=5)],
)
async def refund_with_gate(ctx: inngest.Context) -> dict[str, str]:
request_id = ctx.event.data["request_id"]
amount_cents = ctx.event.data["amount_cents"]
# Step 1: the agent's analysis (your worker, run durably)
analysis = await ctx.step.run(
"agent-investigates",
run_refund_investigation_agent,
request_id=request_id,
)
# Step 2: if the agent thinks refund is warranted AND amount > $100,
# gate behind human approval
needs_approval = analysis.recommends_refund and amount_cents >= 10_000
if needs_approval:
await ctx.step.run(
"notify-approver",
send_slack_approval_request,
request_id=request_id,
analysis=analysis,
amount_cents=amount_cents,
)
# === THE HITL GATE ===
approval = await ctx.step.wait_for_event(
"wait-for-human-approval",
event="refund/approval.decided",
timeout=timedelta(hours=24),
if_exp=f"async.data.request_id == '{request_id}'",
)
if approval is None:
# Timeout: no human responded in 24h. Escalate.
await ctx.step.run(
"escalate-timeout",
escalate_to_senior_reviewer,
request_id=request_id,
)
return {"status": "escalated_timeout"}
if not approval.data["approved"]:
await ctx.step.run(
"notify-rejected", notify_customer_rejected,
request_id=request_id,
)
return {"status": "rejected_by_human"}
# Either it was approved, or it didn't need approval
refund = await ctx.step.run(
"issue-refund", call_stripe_refund,
request_id=request_id, amount_cents=amount_cents,
)
await ctx.step.run(
"audit-approved-refund", audit_refund,
request_id=request_id, refund=refund,
approved_by="human" if needs_approval else "auto",
)
return {"status": "issued", "refund_id": refund["id"]}
你在代码里看到的:一串 steps,中间有一个 wait_for_event。runtime 里正在发生的:
- agent 运行(step 1,durably)。
- 函数决定 gate 是否适用(代码内逻辑,没有副作用)。
- 如果被 gated:一条 Slack notification 触发(step 2,durable)。函数暂停。最多 24 小时消耗零 compute。
- Slack 里的一个人点 Approve 或 Reject。admin backend 用
refund/approval.decided和那个request_id调用inngest_client.send。 - Inngest 把这个 event 匹配到那个暂停的函数(
if_expfilter 确保只有匹配的 request IDs 匹配)。函数在下一行恢复。 - 函数用人的决定,要么发出退款,要么通知 rejection。两条路径都 audit 这个决定和这个 approver。
这就是让 Inngest 与一个 queue-plus-state-machine 在本质上不同的东西。HITL 模式是一个原语。函数的代码从上到下读,gate inline 在里面。没有 callback,没有 state restoration,没有 if state == waiting_for_approval: ... 的 dispatching。runtime 处理 suspend/resume 机制;你的代码表达policy。
后续课程会从架构上展开 Invariant 1:authored intent、spec-driven workflows,以及决定哪些 gates 适用于哪些动作的 manager-of-workers layer。这门课给你的是那个runtime 原语。当那个 manager layer 到来时,它实现的 gate 正是这个 wait_for_event 模式,只是组合到 fleet scale。现在就知道这个原语,意味着以后那个架构模式读起来像「一次合理的组合」,而不是「魔法」。
这是你在 Part 4 的 Decision 5 里构建的那块拱顶石:退款审批,做成 durable 的。这里的概念是那个形状;worked example 把它接到一个真实的 needs_approval tool,并证明退款恰好发出一次。
Predict。 你有一个 HITL gate,设
timeout=timedelta(hours=24)。一个客户的退款请求在周五 17:00 进来。整个周末没有人在线。gate 的 timeout 在周六 17:00 触发。你的 timeout handler 记录一笔被阻止的退款。reviewer 在周一 9:00am 读这个请求。走一遍时间线:周末期间有多少个 function runs 是 active 的?Inngest 为多少 compute 收费?Confidence 1-5。
答案:周末期间 零个 active function runs。函数被暂停了:Inngest 存储它的 state,把函数从内存里换出,并等待那个 event 或那个 timeout。Inngest 不为暂停的时间收费。当周六 17:00 到来、timeout 触发,函数恢复,用几百毫秒写那行 blocked-refund audit,然后完成。reviewer 直到周一才看,这从 worker 这边来说不花一分钱。Inngest 上 HITL workflows 的经济学,与那些为每一秒「批准了吗?」轮询都收费的 polling-based queues 截然不同。Try with AI
With my AI coding assistant: design a durable refund-approval gate.
Specification:
1. The agent investigates and decides a refund is warranted, but the
refund tool needs human approval before it runs.
2. The gate should:
- Notify the on-call reviewer with the agent's recommendation
- Wait up to 4 hours for the reviewer to approve or reject
- On approve: issue the refund.
- On reject: do not issue; record a blocked refund.
- On 4-hour timeout: do not issue; record a blocked refund.
3. Every branch (approve/reject/timeout) writes an audit row from a
small fixed set of action names, capturing what was decided.
Use the dev-server MCP's send_event to simulate each branch of
the reviewer's decision during testing.
第 4 部分:Worked example,一个客户支持 Production Worker
这就是你动手构建的地方。先是 worker(一个 prompt),然后是包在它外面的神经系统,一层一个 prompt。你用简短、平实的英语 prompts 指挥你的 coding agent,由它写代码;下面展示的片段是每一层那几行承重的代码,不是文件。完整实现已在一个实时运行的 dev server 和一个真实 model 上端到端跑过,所以你看到的形状就是会跑起来的东西。如果某个签名看起来陌生,你的 agent 会查当前的文档。
形状是:七个 prompts,在你已经搭好的那个底座上。
- D0 构建 worker 本身,独立的。
- D1 让 agent 的运行变得 durable。
- D2 让一个 event 能唤醒它。
- D3 加一个会扇出的每日 cron。
- D4 加 flow control。
- D5 是那块拱顶石:退款上一个 durable 的人工审批 gate。
- D6 证明 worker 熬得过一个坏掉的 step:retry 而不重做已完成的工作,然后恢复。
开始之前。 你的环境已经从 Quick Win 搭好了:打开同一个
ai-agent-nervous-system文件夹,已安装 Inngest 和neon-postgresSkills,.env里有你的OPENAI_API_KEY和你的 NeonDATABASE_URL,你的customers和audit_log表已 provision,三个 MCP servers(Neon、Context7、inngest-dev)全部接好。只有两个提醒:
- dev server 在运行。 如果你关了它,再启动一次:在它自己的 terminal 里
npx inngest-cli@latest dev。Dashboard 在http://127.0.0.1:8288。(当你以后部署到 Inngest Cloud 时,免费的 Hobby tier 是 $0、无需信用卡;它的 ceilings 在 Part 5。)- 下面 MCP calls 的一个大小写提醒。 dev-server 的 tool names 是
snake_case(send_event、get_run_status、invoke_function),但它们的参数是camelCase(get_run_status收runId,invoke_function收functionId)。Python SDK 全程是snake_case;只有 MCP call 的参数是camelCase。
The brief
你构建一个小型客户支持 worker,并给它一套 Production Worker 神经系统。这个 worker 从 Neon customers 表(id、email、tier)读它的示例客户,为一封进来的邮件起草一封热情的回复,只在人工批准下才能发起退款,并为每一个动作向 Neon audit_log 表写一行 audit,动作来自一个固定的小集合:message_received、message_sent、refund_issued、refund_blocked。然后那七个 prompts 在它外面加上 Inngest:一个 event 唤醒它,agent call 跑得 durable,一个每日 cron 为每个 eligible 客户扇出一次健康检查,flow control 限制 concurrency 和 throttle,退款在一个 durable 的人工 gate 上暂停,一条 replay 路径恢复失败的 runs。
关于接下来那些 prompts 的一个说明。 每一个都写成你真正会对一个 coding agent 说的样子:简短、平实、信任它去处理细节。它们冷粘贴也能用,而如果你先让 agent 定位一下("read the project and tell me what you see, then ask me anything unclear before you start")会更好,因为文件越堆越多。这些 prompts 是目的地;先定位是上匝道。
D0:构建 worker,独立的
你在哪:底座已打开,dev server 在运行,你的 Neon store 已 provision,但还没有 worker 存在。这个 Decision 构建那个独立的 worker;做完时它能在一封示例邮件上运行,并向 Neon 写一行 audit。
底座已经带了一个你的 agent 在打开时读过的 AGENTS.md,所以它知道这个项目;那就是这些 prompts 保持简短的原因。其中那一条值得你自己也记住的规则,是整门课的架构不变量:worker 自己的代码从不从 inngest import。 agent 和它的 tools 保持纯 Python;神经系统从外面包住它们。那种分离,agent 和神经系统分开,正是让你以后能把 Inngest 换成 Temporal 或 Restate、而 worker 原封不动的东西。
你的 Neon system of record 已经从 Quick Win provision 好了:customers 和 audit_log 表存在,DATABASE_URL 在你的 .env 里。所以这个 worker 从一开始就读写那个 database。现在构建 worker。粘贴这段:
Build me a minimal customer-support agent with the OpenAI Agents SDK, running in a local sandbox. It reads the sample customers from my Neon
customerstable (each row has an id, email, and tier), drafts a warm reply to an incoming customer email, and can issue a refund, but the refund tool needs human approval before it runs. Write an audit row into my Neonaudit_logtable for every action, using a small fixed set of action names and theDATABASE_URLin.env. Seed thecustomerstable with five sample rows first if it is empty. Keep it small; it exists to be wrapped, not shipped. Then run it on a sample email and show me the reply.
创建:worker.py 和 db.py(一个扁平项目,没有 src/ 嵌套)。D1 把 Inngest host 作为第三个文件加进来。agent 通过 DATABASE_URL 触达 Postgres,从不通过 Neon MCP server,后者只是你 build-time 的工具。
那份 seed data 小到可以留在页面上,五个示例客户跨三个 tiers,agent 会把它们插入 customers 表:
[
{ "id": "cust_001", "email": "ada@example.com", "tier": "enterprise" },
{ "id": "cust_002", "email": "grace@example.com", "tier": "pro" },
{ "id": "cust_003", "email": "linus@example.com", "tier": "pro" },
{ "id": "cust_004", "email": "edsger@example.com", "tier": "standard" },
{ "id": "cust_005", "email": "alan@example.com", "tier": "standard" }
]
你的 agent 写两个简短的 Python 文件。db.py 持有 Postgres 访问:一个基于 DATABASE_URL 的小型带池 asyncpg connection,一个 load_customers() 读,以及一个带封闭词汇表的 record() audit-write helper,任何在那四项集合之外的动作名都会抛错,把一个 typo 变成一个响亮的错误,而不是一行静默的坏数据。worker.py 是一个带两个调用进 db.py 的 tools 的 SandboxAgent。它里面只有一行对课程其余部分是承重的,退款 tool 的 decorator:
@function_tool(needs_approval=True)
def issue_refund(order_id: str, amount_cents: int, reason: str) -> str:
...
那个 needs_approval=True 让 agent 暂停而不是发出退款:run 带着 pending 的退款回来,由一个人来决定。它是整块 HITL 拱顶石(D5)挂靠的那个钩子。(这个底座对每一笔退款都设 gate,这让拱顶石保持简单;一个生产 worker 通常只对超过某个阈值的退款设 gate,即概念 15 里那个超过 $100 的模式。无论哪种,接线都完全一样。)
在 agent 写的东西里有一个结构性细节要确认,因为 D5 依赖它:把 build_agent() 和 sandbox 的 run_config() 保持为分开的函数。当 D5 恢复一个暂停的 run 时,它把 agent 重建成同样的 tool 形状,并重新传入同一个 run_config();保存的 state 不携带 sandbox session,所以恢复必须再次提供它。现在就把它们拆开,拱顶石以后就是一小步。
完成的标志: agent 在一封示例邮件上运行并打印一封简短的回复,并且 Neon audit_log 表里有一行新 row(在 console 里查它,或让你的 agent 通过 Neon tools 把它读回来)。如果那封邮件描述的是一次退款,run 会在退款 tool 处暂停,而不是发出它;那个暂停就是全部要点,而 D5 让它 durable。
这个 Part 里的 prompts 假设一个 frontier-class 的 coding agent(Claude Sonnet 或 Opus、一个 GPT-5-class model,或 Gemini 2.5 Pro)。你正在学的 Inngest 架构(events、steps、memoization、flow control)是 SDK-level 的,无论什么 model 驱动你的 agent 都成立。但构建体验依赖强的 instruction-following,尤其是 D5 那块拱顶石。在一个较弱的 model 上,预期会不止一次地迭代一个 prompt,并把文件名讲明白。架构没坏;只是 prompting 需要更多脚手架。
D1:让 agent 的运行变得 durable
你在哪:一个只在你调用它时才运行的 worker,一旦在运行中途崩溃就丢掉一切。这个 Decision 把 agent call 包进 step.run;做完时一个完成的 run 会在 dashboard 里显示 agent step 被 memoized。
神经系统从这里开始:把整个 agent call 包进单个 step.run,让它 durable 且 memoized。粘贴这段:
Wrap the agent run in an Inngest durable function so it survives crashes and retries transient failures. The whole agent call goes inside a single
step.runso it is memoized. Run it in local dev mode against the Inngest dev server, with a FastAPI host. Confirm a completed run shows the agent step memoized in the dashboard.
创建:inngest_app.py(一个 dev 模式的 Inngest client、放在一个 helper 里的 agent call,以及一个 dev server 会发现的 FastAPI host)。
要紧的形状是一个 step.run 包住 agent call:
async def handle_customer_email(ctx: inngest.Context) -> dict:
email_text = ctx.event.data["email_text"]
outcome = await ctx.step.run("run-agent", functools.partial(_run_agent, email_text))
return {"replied": outcome["status"] == "done"}
在 agent 写的东西里有两个 idiom 要确认。step handler 自己不取任何 arguments,所以 functools.partial 提前绑定 email_text,那就是你把数据传进任何 step 的方式,从这里起你会在每个 step 上看到它。而 agent helper 用的是普通的 Runner.run,不是一个 streamed runner:它是那块人工审批拱顶石(D5)所基于的路径,所以从一开始就用它,让 D5 成为一小步而不是一次重写。client 用 is_production=False 构造(来自 Quick Win 的那个 dev 模式标志)。
把它作为两个进程运行,function host 和那个找到它的 dev server:
uv run uvicorn inngest_app:app --port 8000 --reload --log-level info # terminal 1: function host (your model key is sourced here; --reload picks up the D6 break/fix edits)
npx inngest-cli@latest dev -u http://127.0.0.1:8000/api/inngest # terminal 2: dev server, auto-discovers the host
完成的标志: dashboard 列出 handle-customer-email,并且一个完成的 run 显示 run-agent step。(你在 D2 用一个 event 正经地唤醒它;现在,function 可被发现就够了。)
为什么这是那个承重的动作。 agent call 是昂贵的那部分:model tokens,好几秒。在 step.run 内部它的结果被 memoized,所以当一个后面的 step 失败、函数 retry 时,agent 不再次运行。那一次包裹,就是一个在每次 retry 都重复付费、重复动作的 worker,和一个每件昂贵的事恰好做一次的 worker,二者之间的区别。
D2:在一个 event 上触发它
你在哪:一个已经由 customer/email.received 触发的 durable function(D1 的 decorator),但没有 audit trail。这个 Decision 在 agent 两侧各加一行 audit;做完时一个真实的 event 驱动一个 run,两行 row 都被写入。
在 agent 之前加一个 audit step,在它之后加一个,然后用一个真实的 event 唤醒 worker,而不是手动运行它。粘贴这段:
Make the worker wake on a
customer/email.receivedevent instead of being run by hand. Add an ingress audit step before the agent and a reply audit step after it. Send a test event and show me the run completing with both audit rows.
编辑:inngest_app.py(函数在 agent 两侧各多一个 audit step)。
形状是围绕 agent step 多两个 step.run calls:
customer_id = ctx.event.data.get("customer_id") # bound from the event, alongside D1's email_text
await ctx.step.run("audit-received", functools.partial(
db.record, "message_received", customer_id=customer_id, detail=email_text[:80]))
outcome = await ctx.step.run("run-agent", functools.partial(_run_agent, email_text))
await ctx.step.run("audit-sent", functools.partial(
db.record, "message_sent", customer_id=customer_id, detail=(outcome["reply"] or "")[:80]))
每一行用一个来自封闭集合的动作名:进来的 message_received、出去的 message_sent,而 db.record 通过 DATABASE_URL 把它写进 Neon audit_log 表。用 dev-server MCP 的 send_event tool 从 agent 发那个 test event(name: "customer/email.received",一个带 email_text 和 customer_id 的 data 对象)。dev server 接受任何 event,所以你不配置任何 webhook 就能在本地测试;在生产里你会把你的 email provider 指向一个 Inngest webhook URL,由它把 payload 重塑成这个 event,那是一个 dashboard 设置,不是代码。
完成的标志: run 完成,trace 按顺序显示三个 steps(audit-received、run-agent、audit-sent),并且 Neon audit_log 表里为那个客户有一行 message_received 和一行 message_sent。
为什么是两个 audit steps,不是一个。 每一个都是它自己的 step.run,所以每一个都独立地被 memoized。如果 reply step 失败、函数 retry,ingress row 不会被写两次(memo hit),agent 也不会运行两次(也被 memoized)。audit trail 跨 retries 保持恰好一次,这正是 D6 将要证明的性质。
D3:一个会扇出的每日 cron
你在哪:一个世界一次一封邮件地唤醒的 worker。这个 Decision 加一个每日 cron,为每个 eligible 客户扇出一个 event;做完时每个客户都得到它自己的 durable child run。
加上 scheduled work:一个每日 cron,为每个 Pro 和 Enterprise 客户触发一个 health-check event,每个 event 触发它自己的 durable run。粘贴这段:
Add a daily cron that fans out one
customer/health_check.requestedevent per Pro and Enterprise customer, each one idempotency-keyed so a re-delivered cron run never double-fires. Each child event triggers its own durable run that writes one audit row. Invoke the cron manually and show me one child run per eligible customer.
创建:一个扇出的 cron parent,和一个处理每个 child 的 event consumer,两者都注册到 host。
两个形状撑起这个 Decision。trigger 是一个一行的 cron decorator,扇出是 N 个 events,每个带一个 idempotency key:
@inngest_client.create_function(fn_id="daily-health-check", trigger=inngest.TriggerCron(cron="0 9 * * *"))
async def daily_health_check(ctx: inngest.Context) -> dict:
# ... select Pro/Enterprise customers, then:
events = [
inngest.Event(
name="customer/health_check.requested",
data={"customer_id": c["id"]},
id=f"health-{c['id']}-{ctx.event.id}", # idempotency key per (customer, cron run)
)
for c in eligible
]
await ctx.step.send_event("fan-out-health-checks", events)
那个 idempotency key 是承重的细节:id=f"health-{customer}-{cron_run}" 意味着如果同一次 cron run 被投递两次(一次 redeploy、一次 retry),重复的 event 会被丢弃,所以每个客户每天恰好得到一次检查。consumer 是一个普通的 event-triggered 函数,写一行 audit。用 MCP 的 invoke_function tool 从 agent 那里 invoke 这个 cron(不要等到明天 09:00)。一个 dev 怪癖:dev server 只在它运行时才触发 crons;生产在 Inngest 一直运行的基础设施上运行它们。
完成的标志: parent 在几秒内完成,dashboard 为每个 eligible 客户显示一个 customer-health-check child run,而 standard-tier 的客户被正确地跳过。
为什么是扇出,不是一个 loop。 parent 自己不处理这些客户;它发 N 个 events 然后返回。每个 child 是它自己的 run,隔离、可独立 retry、由它自己的 concurrency 限制。一个函数内部的 loop 会把它们耦合起来:一个慢客户拖住其余的,一次崩溃丢掉整批。扇出,就是一次定时的唤醒如何变成 N 个独立的 durable runs。
D4:Flow control
你在哪:一个处理每封邮件、但在一个 burst 下会一次性把它们全部发出的 worker。这个 Decision 加三条 flow-control policies;做完时一个二十-event 的 burst 在 cap 之下排队,没有丢掉或重复的 rows。
当五百封邮件在 9am 落下时,worker 不应该一次发出五百次 model calls:那会冲破 rate limit,并把那个话多的客户后面的所有人饿死。加一个全局 concurrency cap、一个 per-customer cap,和一个 throttle。粘贴这段:
Add flow control to the email handler: a global concurrency cap, a per-customer concurrency key so one noisy customer can't starve the rest, and a throttle to protect the OpenAI rate limit. Fire a burst of twenty events across five customers and show me they queue under the cap and all complete with no dropped or duplicated audit rows.
编辑:inngest_app.py(email 函数上的三个 decorator arguments)。
这三个 arguments 就是这堂课,D4 的全部都在它们里面:
concurrency=[
inngest.Concurrency(limit=10), # global cap
inngest.Concurrency(limit=2, key="event.data.customer_id"), # per-customer cap
],
throttle=inngest.Throttle(limit=100, period=datetime.timedelta(minutes=1)),
三个旋钮,三件事。全局的 limit=10 限制同时执行多少个 runs,保护两个真实的天花板:model 的 rate limit,和你的 Neon connection 预算。两样东西约束你的 connections,它们在不同的尺度上工作。在单个 worker replica 内,所有 runs 共享一个 asyncpg pool,所以是那个 pool 的 max_size 让 connections 保持平稳,不管有多少 runs 在 active(一个二十-run 的 burst 在一个 host 上仍然只骑在一把手可数的 pooled connections 上)。跨 replicas,那个本地 pool 不再帮忙了,replica 二有它自己的 pool,所以是那个 concurrency cap 约束了总的 runs、从而约束了 fleet-wide 的总 connections:十个 replicas 各 limit=10 就是一百个 runs 和大约一百个 connections,你按 Neon 的预算来 size 它(免费 tier 允许几百个 pooled)。Pool 和 cap 一起才是保护:pool 约束一个 replica,cap 约束整个 fleet。两者缺一,一个五百封邮件的 burst 跨没有 pool、没有 cap 的 replicas,会打开远多于 Neon 会接受的 connections。那个 keyed 在 event.data.customer_id 上的 per-customer limit=2 意味着一个客户的 burst 最多占两个 slots,所以一个账户的洪水永远不会饿死其他人。那个 throttle 限制每分钟启动多少个 runs,把一个尖峰抹平成一个稳定的速率。一个函数最多带两条 concurrency policies;全局-加-per-key 这一对是常见的形状。从 agent 那里发那个 burst:通过 send_event 跨五个客户发二十个 customer/email.received events。
完成的标志: burst 在 cap 之下排队(running 计数保持在 10 或以下,每个客户保持在 2 或以下),每个 run 都完成,并且 Neon audit_log 表恰好有二十行 message_received 和二十行 message_sent。没有丢掉的 runs,没有重复,在 burst 下没有 Neon connection-limit 错误,在这个单一 host 上 asyncpg pool 让 connections 保持平稳(即使 burst 在跑,你也只会看到一把手可数的几个在用),而当你扩出去时,是那个 cap 会跨 replicas 让它们保持平稳。
为什么这些是 policy,不是代码。 这些没有一个住在你的函数 body 里;它是 runtime 执行的配置。没有这些 cap,一个 burst 要么熔毁一个下游系统,要么让一个 tenant 垄断 worker。手写同样的公平性,是一个 queue 加一个 scheduler 加一个 rate limiter,几百行。在这里它是三个 decorator arguments。
D5:退款上一个 durable 的人工审批 gate(拱顶石)
你在哪:一个 worker,它的退款暂停(D0 的 needs_approval=True)是短暂的,活在正在运行的进程里。这个 Decision 让那个暂停变得 durable;做完时 run 在零 compute 处暂停,等待一个真实的 approval event,然后恢复并恰好一次地发出退款。
那个短暂的暂停就是那个缺口:一次崩溃、一次部署,或者一个去忙了一下午的 reviewer,那笔 pending 的退款就没了。这是整门课的拱顶石:让那个暂停变得durable,让函数在零 compute 处暂停,要多久就多久地等待一个真实的 approval event,然后恢复那同一个 agent run。粘贴这段:
The refund approval is currently an in-process pause that a crash or a slow reviewer would lose. Make it durable: when the agent pauses on the refund, persist its serialized run state as the step's output, then suspend the whole function on
step.wait_for_eventwaiting for arefund/approval.decidedevent (give it a four-hour timeout and match it to this customer). When the decision arrives, rehydrate the state, apply approve or reject, and resume the agent so the refund fires exactly once. Drive a refund, show me the run suspended and waiting, send an approval, and show me exactly one refund audit row. Then do it again with a rejection and show me a blocked row and no refund.
编辑:inngest_app.py(agent helpers 学会暂停和恢复;email 函数多出那个 gate)。
这个 Decision 比其他的值更多的代码,因为那套暂停-再-恢复的动作就是这堂课。当 agent 暂停时,它序列化它的 run state;当决定到达时,你重新水化那个 state,应用 approve 或 reject,并恢复:
async def _run_agent(email_text: str) -> dict:
agent = worker.build_agent()
result = await Runner.run(agent, email_text, run_config=worker.run_config())
if result.interruptions: # the refund tool paused for approval
return {"status": "needs_approval", "state": result.to_state().to_string()}
return {"status": "done", "reply": result.final_output}
async def _resume_agent(state_str: str, approved: bool, rejection_message: str | None) -> dict:
agent = worker.build_agent()
state = await RunState.from_string(agent, state_str)
for item in state.get_interruptions():
if approved:
state.approve(item)
else:
state.reject(item, rejection_message=rejection_message or "Refund denied.")
db.record("refund_blocked", detail=f"args={item.arguments}")
result = await Runner.run(agent, state, run_config=worker.run_config())
return {"status": "resumed", "reply": result.final_output}
在 email 函数内部,gate 是 agent 暂停处那一个 inline 的 wait_for_event;那个决定驱动一个 resume step:
decision = await ctx.step.wait_for_event(
"await-refund-approval",
event="refund/approval.decided",
timeout=datetime.timedelta(hours=4),
if_exp=f"async.data.customer_id == '{customer_id}'",
)
# (decision is None on timeout -> write a refund_blocked row and return)
resumed = await ctx.step.run("resume-agent", functools.partial(
_resume_agent, outcome["state"], bool(decision.data.get("approved")), decision.data.get("rejection_message")))
从上到下读它:gate 是一个原本普通的函数里的一个 inline call。没有 callback,没有 state-machine dispatch,没有跨 invocations 的 if status == waiting: 分支。runtime 处理暂停和恢复;你的代码表达 policy。四个细节值得它们的位置:
result.to_state().to_string()序列化那个暂停的 run,它成为run-agentstep 的 output,所以它被 durably 存储。to_state()是同步的;to_string()返回你持久化的那个字符串。RunState.from_string(agent, s)是被 await 的(它是一个 coroutine),并直接取那个存储的字符串。然后你在state.get_interruptions()上approve或reject,并调用Runner.run(agent, state, ...)来恢复。(一次 resume 可能留下还在 pending 的 approvals,所以真正的 helper 会循环直到一个都不剩。)- 同一个
run_config()在 resume 时被重新传入,并且 agent 被重建成同样的 tool 形状。序列化的 state 不携带 sandbox session,所以 resume 必须再次提供它。这是那个一旦漏掉就会让恢复的 run 失败的细节。(D0 为的正是这个把build_agent和run_config拆开。) if_exp把那个决定匹配到这个客户(async.data.customer_id == '...'),所以对一个客户的批准永远不会恢复另一个客户的 run。
从 agent 那里驱动它:发一个邮件描述了一次退款的 customer/email.received event,看着 run 在 await-refund-approval 处暂停(dashboard 显示它 WAITING,run status 为 RUNNING 但零 compute),然后通过 send_event 发 refund/approval.decided,带 {"approved": true, "customer_id": "cust_001"}。再用 {"approved": false} 做一次。
完成的标志: 在批准时,那个暂停的 run 恢复,Neon audit_log 表恰好有一行 refund_issued。在拒绝时,run 恢复,audit 有一行 refund_blocked 且没有 refund_issued,并且 agent 的回复解释了这次拒绝。
为什么这是那块拱顶石。 其他每一层(感官、反射、平衡)都让 worker 自己保持正确或健康。这一层是人的意识在一个高风险动作上重新进入 loop 的地方,durably 地,要多久就多久,而等待期间零成本。一个 queue-加-database-加-poller 的版本是一个小项目。在这里它是一个 wait_for_event 和一个 resume。
D6:证明 durability 熬得过一个坏掉的 step
你在哪:一个每一层都包好的完整 worker。这个 Decision 证明那个为这一切正名的性质;做完时你已经看着一个坏掉的 run 把它失败的 step 重试许多次、而它那个完成的 audit step 恰好运行一次,然后在一个全新的 run 上恢复了这件工作。
最后一个要证明的性质,就是那个为这一切正名的性质,概念 7 里的 memoization 机制。你在那里理解了它;现在在你自己的 worker 里证明它。粘贴这段:
Deliberately break the agent step so it fails, fire an event, and show me Inngest retrying it while the earlier audit step stays memoized, so the failing run writes its ingress audit row exactly once across all the agent retries. Then fix the step and recover the work, and show me the recovery completing.
故意弄坏 agent step(在 _run_agent 里 raise 一个 ValueError),为不同的客户发几个 customer/email.received events,并读每个 run 的 trace。这就是那个证明,它在每个失败的 run 内部:audit-received 显示一次完成的 attempt并写它的 row 一次;run-agent 显示好几次 Attempts,它带 backoff 重试(Inngest 默认重试若干次)然后失败;audit-sent 从不运行。那个 audit step 停在一次 attempt、而 agent step 一路往上爬,就是概念 7 里的 memoization,现在在你自己的 worker 里可见了:那个失败的 run 只写一行 message_received,无论 agent step 重试多少次。
然后还原那个破坏(如果你用 --reload 跑的 host 会自动 reload;否则重启它),并通过在修好的代码上重新发出那个 event 来恢复这件工作(或者,对于一次真实的坏部署批次,用 dashboard 的 Rerun 按钮;两者都在概念 14 里讲过,都从顶部启动一个全新的 run)。这里是那个让人意外、但它是正确行为而非 bug 的部分:那次恢复是一个全新的 run,所以它再次运行 audit-received 并写它自己的一行 message_received。在一次 break-then-recover 之后,那个客户合法地有两行 message_received,一行来自失败的 run,一行来自那次恢复。Memoization 是一个 within-run 的保证;它从不跨越两个分开的 runs。
完成的标志: 在那个失败 run 的 trace 里,audit-received 停在一次 attempt并写了一行,而 run-agent 累积了好几次 attempts 并失败,那个尽管有 N 次重试却只有一次 attempt 的情况,就是那个 memoization,被证明了。然后那个恢复的 run 在修好的代码上完成 run-agent 和 audit-sent。在 console 里查 Neon audit_log(或让你的 agent 通过 Neon tools 把它读回来):一个你 broke-and-recovered 的客户会有两行 message_received(失败的 run 加上那次恢复)和一行 message_sent(只有那次恢复走到了那一步),这恰好是对的。真正的诊断是 per-run 的,不是 per-customer 的:打开单个 run 的 trace 并确认 audit-received 显示一次 attempt。如果一个 run 的 trace 显示 ingress step 运行了两次,那才是一个 memoization bug(通常是一个不唯一的 step name);跨两个分开的 runs 散布的两行 row 不是。
为什么这是那条明线。 一个在坏部署上丢掉客户工作的 worker,只是一个你调用的 agent。一个吃下同样那个坏部署、响亮地失败、重试那个坏掉的 step 而不重做它已经完成的工作(agent step 的许多次 attempts,但 ingress audit 只写一次)、并在修复后在一个全新的 run 上干净恢复的 worker,是一个 Production Worker。那个证明是那个失败 run 自己的 trace,一次 ingress attempt 对许多次 agent attempts,而不是跨 runs 的一个 row 计数。
把这同一套神经系统对准你自己的 SandboxAgent worker,而不是这个最小的底座;包裹完全一样。而这个 step.wait_for_event 审批替换了那门课可选 Decision 10 里那个手搓的 run-state 表:你刚刚构建的这个 durable gate 就是那个持久化层,所以你可以删掉那张表。
刚刚发生了什么
你构建了一个小型客户支持 worker,并一层一层地给了它一套神经系统。worker 的内部在 D0 之后再没变过:同一个 SandboxAgent、同样的两个 tools、同一个 Neon Postgres audit trail。变了的是它周围的一切。它现在在一个 customer/email.received event 上、以及在一个按 eligible 客户扇出的每日 cron 上唤醒,运行得 durable(agent call 在 step.run 内部),尊重 flow control(全局和 per-customer concurrency,一个 throttle),把退款 gate 在一个 durable 的人工审批上(step.wait_for_event),并通过 replay 失败的 runs 从一次坏部署中恢复,audit trail 显示在任何单个 run 内每个 step 恰好触发一次,无论那个 run 重试了多少次。
agent 代码是一样的;它的触达不是。你从一个你操作的 agent 开始,prompt 它、看它、再 prompt 它。你现在有一个自己运转的 worker:世界唤醒它,它的反射把它带过失败,它在负载下保持平衡,而一个人只在赌注要求时才介入。那就是开篇画的那条线,在一个你操作的 agent和一个自己运转的 FTE之间,而你刚刚跨过它构建了起来。
剩下的关切是规模上的 observability、多 worker 协调,以及决定哪些 workers 处理哪些流量的 manager layer。那是这条路线里的下一门课。这门课覆盖那个生产就绪的执行单位;下一门把那些单位组合成一支 workforce。
第 5 部分:这门课在哪里收尾
一个 Production Worker 的成本形状
两个成本面要紧:基础设施成本(Inngest,以及你跑这个 worker 所用的任何 store 和 compute)和推理成本(model tokens)。基础设施随负载增长大致保持平稳;推理线性增长。下面那套方法才是要学的;任何一个美元数字在它发布的那一周就过时了,所以把这些数字当作示意,并在你把一个数字写进预算之前查当前的定价页。
Inngest 定价。 Inngest 按 execution 收费:每一次 function run,加上每一次 step-level retry,算作一次 execution。
| Tier | 价格 | 每月 Executions | Concurrent steps | 值得注意 |
|---|---|---|---|---|
| Hobby | $0 | 50,000 | 5 | 3 个 users、50 个 realtime connections、无需信用卡 |
| Pro | 从 $75 / 月 | 1,000,000 | 100+ | 1000+ 个 realtime connections、15+ 个 users、7 天 trace 保留 |
| Enterprise | 定制 | 定制 | 500-50,000 | SAML / RBAC、90 天 trace 保留、专属支持 |
Events 定价叠加在上面:每天头 1-5M 个 events 包含在内;超过之后,超额从大约每个 event $0.000050 起步,并在更高量级下递减。Pro 在你冲破 1M cap 后,每多 1M executions 加 $50。
这里要紧的 Hobby-tier 天花板。 那个 5-concurrent-step cap 意味着,即使你在代码里声明 concurrency=Concurrency(limit=10),平台的账户级 cap 也把你压在 5。你的代码对生产是对的;免费 tier 上观察到的 concurrency 是 5。step.sleep 和 step.sleep_until 也受 tier 约束:在免费 Hobby plan 上最多七天,在 paid plans 上最多一年(Inngest usage limits)。
推理成本占主导。 一次典型的客户支持 run,每个对话用几千到一万个 model tokens。把你的 per-token 价格乘以你每封邮件的 tokens 再乘以你每天的邮件数,你就得到那条要紧的线;对大多数 workers 来说它让其他一切相形见绌。这就是你要优化的。 其他一切都是凑整误差。两个最高价值的杠杆:保持一个稳定的 cached prompt prefix(让 model 把重复的那部分按更便宜的 cached 费率计费,而不是每次调用都按全价),并把容易的回合路由到一个更便宜的 model。
三个 Inngest-specific 的成本杠杆,一旦你进入优化区:
- 不要把 pure functions 包进
step.run。 如果一个函数没有副作用,它不需要 durability;包裹它为没有好处的事加上一笔 step-run 费用。把step.run留给 I/O 和副作用。 - 对批量路径用
batch_events。 一个 50-event 的批是一次 function run,不是 50 次。 - 用
step.sleep和step.wait_for_event廉价地暂停。 暂停的函数不为暂停时间计费。一个 3 天的 delayed-followup 和一个 3 秒的花一样多。
规模上的形状: 推理是那张随流量增长的账单;Inngest、你的数据 store 和 compute 相对保持平稳。在你的真实量级上做同样那次乘法,而不是相信这里印的某个数字。
替换指南:神经系统不变,平台会变
这门课在每一层都点了 Inngest 的名。那是因为一个教学例子需要具体的答案,而不是「用任何你喜欢的 orchestrator」。但这套架构对任何合规的替代品都成立。这门课的设计明确预期到的五个替换:
-
Trigger surface:Inngest events → Temporal signals、Restate handlers、AWS EventBridge + Lambda。 每个平台都有一种方式表达「当这件具名的事发生时这段代码运行」。event names、payload 形状,以及那套 idempotency 纪律全部迁移。变的是:SDK 的 decorator 语法和那个 dashboard。
-
Durable execution:Inngest
step.run→ Temporal activities、Restate handlers、自定义的 Postgres-backed state machines。 每个都给你「memoize 这次有副作用的调用、在瞬时失败上 retry、在崩溃后恢复」的语义。Temporal 是最接近的类比,也是更老、更经过企业检验的选项。Restate 是最新的,带更偏函数式编程的味道。自定义 state machines 是那些不能采用一个 managed 平台的团队会写的东西;通常是 1,000-10,000 行代码,重新造出 Inngest 免费给你的约 70%。 -
HITL 原语:
step.wait_for_event→ Temporal 的await Workflow.execute_activity(approval_signal)、Restate 的 awakeables、自定义的 Redis/Postgres 审批队列。 模式是一样的:函数暂停,一个外部信号恢复它,audit 捕获那个决定。Inngest 的表达写起来最干净;Temporal 的更啰嗦但在大规模上久经沙场。 -
Cron scheduling:Inngest cron triggers → Kubernetes CronJobs + queue、GitHub Actions schedules、AWS EventBridge schedules。 Cron triggers 是大路货。Inngest 的优势不是拥有 cron;而是 cron-triggered 函数自动得到与 event-triggered 函数同样的 durability/replay/flow-control。其他平台让你自己去接。
-
Flow control:Inngest concurrency + throttle → 带 worker concurrency 的 Temporal task queues、Redis-backed rate limiters、AWS SQS message visibility timeouts。 其他平台能做这个;Inngest 用我们见过的那种配置密度做(一个 decorator argument)。
Dapr 作为生产规模上那个开放的同伴。 一个值得点名的、更有野心的替换:Dapr Agents 作为生产规模上 Inngest 的结构性同伴,就像 OpenCode 之于 Claude Code 那样。Dapr Agents 在 2026 年 3 月 23 日在 CNCF 治理下达到 v1.0 GA(CNCF announcement,Dapr Agents core concepts)。DurableAgent 是那个生产就绪的类;更老的 Agent 类已弃用。当 Kubernetes-native 部署和多语言 SDKs 比 Inngest 的本地开发体验更要紧时,选 Dapr。Inngest 是更好的学习工具(dashboard 让那个心智模型可见);Dapr 是更好的规模工具,当你撞上 Inngest 的 tier 天花板、或需要 K8s-native 多语言部署时。
Inngest 也是开源的(github.com/inngest/inngest;1.0 release 在 2024 年 9 月加上了 self-hosting 支持),可通过 Helm + KEDA self-host。在规模上要紧的轴是治理、支持和成熟度:Inngest 由单一 vendor 治理,self-hosting 故事还年轻;Dapr 由 CNCF 治理,有更长的生产履历。
| 这门课的概念 | Inngest 原语 | Dapr 生产对应物 | 教学说明 |
|---|---|---|---|
| Scheduled work | TriggerCron | Cron input binding / Dapr Scheduler | 同一个想法:时间唤醒 Worker。Dapr 通常需要 component 配置。 |
| Webhook/event ingress | Inngest webhook endpoint → event | HTTP endpoint、input bindings 或 pub/sub ingress | Inngest 藏了更多管道;Dapr 给基础设施控制。 |
| Internal events | inngest_client.send() | Dapr pub/sub | 同样的事件驱动心智模型;Dapr 里 broker 可插拔。 |
| Fan-out | 一个 event 触发许多 functions | 一个 topic/event 被许多 services 消费 | 同样的架构;Dapr 用 broker/topic/subscriber 组合。 |
| Durable steps | step.run() + memoization | Dapr Workflows + activities | 相似的生产目的,不同的开发者模型。 |
| 不耗 compute 地等待 | step.sleep() | Durable workflow timers | 两者都避免在等待时把一个进程开着。 |
| 人工审批 gate | step.wait_for_event() | Workflow external events/signals、pub/sub、actors | Inngest 的表达更简单;Dapr 更可组合。 |
| Retries | Function/step retries | Workflow/activity retries + resiliency policies | Dapr 把 resiliency 既做成 runtime policy 也做成 workflow 行为。 |
| Dead-letter / failed runs | Inngest dashboard failed runs + replay | Broker DLQ + workflow status/restart/manual tooling | Inngest 这里更开箱即用;Dapr 更 infrastructure-native。 |
| Flow control | Concurrency、throttling、priority、batching | Kubernetes scaling、app concurrency、broker controls、resiliency policies、bulk pub/sub | Dapr 能做,但不是一个 decorator argument。Inngest 更密。 |
| Stateful coordination | wait_for_event、event keys、step state | Actors + state store + workflows | 对长期身份/有状态协调,Dapr Actors 更强。 |
| Agent runtime | 你的 agent 在 Inngest function 内 | DurableAgent / Dapr Agents v1.0 GA | Dapr Agents 显式让 agent 由 workflow 支撑、可恢复。 |
这张表是一份翻译指南,不是一个 API 相同的声明。Inngest 用一种紧凑的开发者体验教这个生产模式:triggers、steps、waits、replay 和 flow control 在一个产品面里。Dapr 通过分布式系统的构建块实现同样的生产架构:bindings、pub/sub、workflows、actors、state、resiliency 和 Kubernetes-native 运维。概念直接迁移;实现风格变了。截至 2026 年 5 月,已按 Dapr 的 bindings overview 和 Dapr Agents core concepts 核对。
在生产规模上伸手去拿 Dapr 的三个理由:
- CNCF 治理,按章程 vendor-neutral:没有单一 vendor 控制这个平台或你对它的依赖。
- 多语言且 Python 一等公民。 Dapr Agents 是 Python-first 的;同样的 agent 代码可以与用 JavaScript、Go、.NET、Java 或 PHP 写的 services 并肩运行,而不需要任何人学第二个框架。
- 按设计在 Kubernetes 上水平扩展。 跑在你自己的 cluster 里、跑在一个 managed offering(Diagrid Catalyst)里,或通过
dapr init在本地跑。扩展故事在每个环境里都是同一套架构。
诚实的告诫:Dapr 不是一个 getting-started 平台。在生产里跑它意味着 Kubernetes、state store、pub/sub broker、placement service、observability、YAML components、sidecars。当你的目标还是学这些模式时,那是很大一片运维面,这就是为什么这门课从 Inngest 开始:一条命令,dashboard 就出现了。等这些模式落地、问题转向在你控制的基础设施上以组织规模运行时,再伸手去拿 Dapr。
先在 Inngest 和 OpenAI Agents SDK 上学这些概念:快速的反馈循环、最少的基础设施、聚焦在模式上。当你到达 Kubernetes 治理、多语言团队或 vendor-neutrality 变得不可妥协的规模时,同样那些架构模式会借上面那张翻译表作为你的钥匙提升到 Dapr 上。模式迁移;底层基质改变;你在这门课里学到的,仍然是那份承重的知识。
这门课(暂时)没覆盖什么
你构建的这个 worker 满足论点提出的 Seven Invariants 中的四个。具体说:它运行在一个 engine 上(Invariant 4,那个 SandboxAgent),对着一个 system of record(Invariant 5,那个 audit trail),世界能调用它(Invariant 7,你加的那些 triggers),并且在一个被 gate 的决策上人类是 principal(Invariant 1,部分:runtime 机制在这里,更广的架构模式在后面)。剩下的三个 Invariants,以及那套把许多 workers 变成一支 workforce 的更广架构,是后续课程。每个一句话:
- Invariant 2:每个人都需要一个 delegate。 一个在边缘的个人 agent,它持有你的 context、代表你的判断,并把工作经纪给那支 workforce。论点把 OpenClaw 点名为当前的实现。
- Invariant 3:那支 workforce 需要一个 manager。 一个 orchestrator,它分派工作、执行预算、审计执行、把 hiring 暴露为一个可调用的能力。论点把 Paperclip 点名。
- Invariant 6:那支 workforce 在 policy 之下可扩展。 一个 meta-layer,在那里一个被授权的 agent 生成一个 prompt、provision 一个 runtime、并注册一个新 Worker,而不唤醒一个人。Claude Managed Agents 是一种实现。
一个在 events 上唤醒、运行得 durable、并在人类身上设 gate 的单个 worker,是这门课所教架构的最小单位。下一门课把那个 worker 扩展成一支 workforce:由一个 manager 协调的多个 workers,按需可扩展,由 triggers 唤醒,由 spec 治理。同样的 OpenAI Agents SDK 基础,同样的 audit 习惯,同样的 Inngest 神经系统。架构不变。
怎么真正把这件事做好
读这门速成课不会让你擅长构建 Production Workers。用它才会。你从构建那个 worker 开始,在你包裹它时感受那些摩擦,并让每一处摩擦教会你它属于哪个概念。
这门课的映射:
- 「为什么 event 到达时我的函数不触发?」→ event name typo 或 namespace 不匹配(概念 3)。把你
TriggerEvent里的 event name 字符串和inngest_client.send里的逐字节对比。 - 「为什么同一个逻辑 event 我的函数触发了两次?」→ 缺 idempotency key(概念 4)。给那个 event 加一个带 deterministic seed 的
id=。 - 「为什么一次部署后我的函数『丢了工作』?」→
step.run之外的代码在做工作(概念 7)。把 I/O 和副作用包进具名的 steps。 - 「为什么客户被扣了两次款?」→ Stripe call 在
step.run之外,或 step name 不唯一(概念 6 和 7)。把那次调用移进一个具名的step.run;让那个 step name 在函数内全局唯一。 - 「为什么 OpenAI 在 9am 峰值返回 429 错误?」→ 缺 throttle(概念 11)。加
throttle=Throttle(limit=N, period=timedelta(minutes=1))。 - 「为什么一个客户的 bursts 饿死了其他客户?」→ 缺 per-key concurrency(概念 12)。加第二个
Concurrency(limit=2, key="event.data.customer_id")。 - 「为什么我的 HITL gate 在周末静默地触发了?」→ 缺一个写入 audit 的 timeout handler(概念 15)。在
approval is None上分支,并显式写那行 audit。
一次构建架构的一块。 那就是为什么 Part 4 是七个 prompts,不是一个。构建那个 worker(D0)。把 agent 包进 step.run(D1),并看着当你故意在运行中途崩溃时有什么改变。在一个 event 上唤醒它(D2)。加那个 cron 扇出(D3),然后在你真正撞上一个 rate limit 之后加 flow control(D4),再在一个高风险动作真正需要一个人时加那个 durable 的审批 gate(D5)。每一层都是它自己的一次学习。合并成一次大重写,它们就是一堵墙。
这门课教的那套纪律(在 events 上唤醒、运行得 durable、在人类身上设 gate、在 bugs 上 replay)是那个架构不变量。无论什么平台实现它,那份四性质的契约才是你真正承诺的东西。这是那个 Lindy 赌注:你建在那些经受住了时间的部分上,普通的 functions、SQL、一门有类型的语言、一个 event bus,而不是这一季的 wrapper。产品可替换;纪律不可。
Quick reference
叙述性课程和构建时参考之间的一道分隔线。下面的 section 是用来搜的,不是从头读到尾的。每个概念的一句话要点在引言那张折叠的速查表里;这个 section 是构建时的诊断、那两棵决策树,以及文件布局。
决策树:挑 trigger surface
当世界里发生一件新事,那次唤醒从哪来?
- 一个外部系统给我们发了一个 HTTP request。 → Webhook trigger。在 Inngest dashboard 里配置来源;通过 transform 重塑 payload;消费由此产生的 event。
- 一个 schedule 说时间到了。 → Cron trigger。
TriggerCron(cron="...")。用 UTC;生产 crons 即使你的 service 在部署中途也会触发。 - 另一个 Inngest 函数在它运行期间发出了一个 event。 → Event trigger。
TriggerEvent(event="ns/name.subtype")。让一个或多个函数订阅同一个 name。 - 一个交互式 user 在等一个立即的响应。 → 不是一个 Inngest trigger。把 request/response 留在你正常的 web endpoint 里;如果那个响应涉及繁重工作,就从 request 内部发一个 event 并立即返回,让 Inngest 异步处理这件工作。
决策树:挑 step 原语
给定一个函数正在运行、你需要做某件事,你伸手去拿哪个 step.* call?
- 一次有副作用的调用(API、DB、file write、agent invocation)。 →
ctx.step.run("name", fn, ...)。默认的那个。成功后 memoized,瞬时失败上 retry。 - 在一个为 in-flight time 计费的 serverless 平台上的一次长时 OpenAI call。 →
ctx.step.ai.infer(...)。把 inference 外包给 Inngest 的基础设施,让你的函数进程能 deallocate。 - 在继续之前等待一个固定的时长。 →
ctx.step.sleep("name", timedelta(...))。Durable;等待时零 compute(免费 plan 上最多七天,paid 上一年)。 - 等待一个外部 event(人工审批、兄弟函数完成)。 →
ctx.step.wait_for_event("name", event="...", timeout=..., if_exp=...)。Durable;event 到达时恢复,或超时返回None。 - 纯粹的 deterministic 计算(格式化一个字符串、算一个日期)。 → 直接写代码。不需要
step.run;不收费。
文件位置速查
一个扁平项目,四个文件,没有 src/ 嵌套:
ai-agent-nervous-system/
├── .claude/
│ └── skills/ # the four Inngest skills (installed in the Quick Win)
│ ├── inngest-setup/SKILL.md
│ ├── inngest-events/SKILL.md
│ ├── inngest-steps/SKILL.md
│ └── inngest-durable-functions/SKILL.md
├── db.py # Neon Postgres access: pooled asyncpg, load_customers, record (closed-vocabulary audit) (D0)
├── worker.py # the worker: SandboxAgent + 2 tools (D0)
├── inngest_app.py # the nervous system: Inngest functions + FastAPI host (D1-D5)
├── .env # OPENAI_API_KEY, DATABASE_URL, INNGEST_DEV=1
└── AGENTS.md # the base's rules file (read on open)
customers 和 audit trail 住在你的 Neon database 里(在 Quick Win 里 provision,在 D0 里 seed),不在本地文件里。worker(db.py、worker.py)在 D0 之后再不变。每一层神经系统(D1 到 D5)都编辑 inngest_app.py。
诊断表,症状 → 根因 → 概念
| 症状 | 第一嫌疑 | 要重读的概念 |
|---|---|---|
| 期望的 event 到达时函数从不触发 | Event name typo,namespace 不匹配 | C3(webhooks),C5(fan-out) |
| 同一个逻辑 event 函数触发两次 | 缺 idempotency key | C4(idempotency) |
| 部署后函数「丢了工作」 | step.run 之外的代码在做工作 | C7(memoization) |
| Cron schedule 在一次部署中没有触发 | 仅本地 dev server,生产在 Inngest infra 上运行 | C2(cron) |
| 一笔退款客户被扣了两次 | Stripe call 在 step.run 之外,或 step name 不唯一 | C6(step.run),C7(memoization) |
| 9am 峰值期间出现 OpenAI rate-limit 错误 | 缺 throttle | C11(concurrency + throttle) |
| 一个客户的 bursts 饿死了其他客户 | 缺 per-key concurrency | C12(priority + fairness) |
| 函数永远暂停,从不恢复 | wait_for_event 里的 event name 与正在发送的 event 不匹配 | C8(wait_for_event),C15(HITL) |
| HITL timeout 在周末静默触发 | 缺一个写入 audit 的 timeout handler | D5(durable refund gate),C15(HITL) |
| 昨天失败的 runs 从 dashboard 消失了 | Runs 保留到手动 replay 或保留窗口之后 | C14(replay) |
| Replay 重复给客户扣了款 | Replay 是一个重新执行每个 step 的全新 run;那笔扣款没有 idempotency key | C4(idempotency),C14(replay 是一个全新的 run) |
| 函数 trace 不显示 OpenAI prompt | Step trace 显示函数 inputs/outputs,但没有 LLM-specific 的 prompt/token telemetry | C10(Python 用 step.run;LLM-specific telemetry 需要你自己的 OpenAI client tracing;step.ai.wrap 的 prompt-level traces 仅 TypeScript) |
附录:可选的来龙去脉和一张 Inngest 速查表
这门课独立成立:Part 4 从零构建那个 worker,所以下面没有一样是先修条件。两条简短的说明,作为背景。
A.1:如果你从 Digital FTE 课程过来
From Agent to Digital FTE 课程构建一个更丰富的客户支持 worker:可移植的 Skills、一个 Postgres system of record,以及一个自定义 MCP server。如果你做过它,你已经有一个 SandboxAgent worker 躺在磁盘上,你可以跳过 D0 那个最小底座:把神经系统(D1 起)对准你自己的 worker。包裹完全一样。一个加分项:你在 D5 构建的那个 durable 退款 gate(step.wait_for_event)替换了那门课可选 Decision 10 里那个手搓的 run-state 表,所以你可以删掉它。如果你没做过那门课,忽略这一切;D0 给你需要的一切。
A.2:这门课用到的 Inngest-specific 要点
如果下面任何一项感觉陌生,在扎进 Part 4 之前先扫一眼对应的文档页。
- Inngest client 实例化。 每个 Python project 一个
inngest.Inngest(app_id=...)实例,从一个 module 导出,并在你装饰函数的任何地方 import。Python quick start。 - 函数装饰。
@inngest_client.create_function(fn_id=..., trigger=...)。trigger 可以是TriggerEvent、TriggerCron,或对多 trigger 函数是两者的一个 list。 ctx.step.run、ctx.step.sleep、ctx.step.wait_for_event、ctx.step.ai.infer。 这四个 step 原语构成你在 Python 里会写的 90%。(TypeScript 有第五个,step.ai.wrap,用于 LLM-specific 的 tracing;Python projects 对 AI calls 用step.run。)inngest_client.send(events=[...])。 从你代码里的任何地方发出 events(在函数内、在 agent tools 内、从 CLI scripts)。用一个id=做 idempotency。- Dev server 启动。
npx inngest-cli@latest dev。跑在:8288。Dashboard 在http://127.0.0.1:8288。MCP 在http://127.0.0.1:8288/mcp。如果:8288被占用,它用8289+;那时在 host 上设INNGEST_BASE_URL=http://127.0.0.1:<port>让它跟随,而不只是那个 MCP URL。
A.3:真正难的那两个转变
这门课最难的不是 Inngest 的语法。是从 request 到 event 的心智转变(概念 1)和从 in-process execution 到 durable execution 的转变(概念 6)。一旦那两个落地,语法就是机械的了。如果别的什么感觉比它应该的更难,先重读概念 1 和 6。