Skip to main content

给你的 AI Agent 装上神经系统:90 分钟速成课

15 个概念,覆盖约 80% 的真实用法:感知(触发器)、反射(持久化执行)、平衡(流量控制)。

你已经造出了一个能干活的 agent。问题在于:它只在你盯着它的时候才干活。你打开 Claude Code 或 OpenCode,敲下指令,它回复。一旦你走开,它就停了。这门课要解决的,就是弥合这道鸿沟:从一个你亲手操作的 agent,变成一个能自己运转的 worker。

弥合这道鸿沟的不是一个更聪明的 agent。你的 agent 已经具备干活所需的一切:一个负责思考的 LLM,用来行动的工具和 MCP 服务器,以及它已经掌握的那些 skill。它缺的是一套神经系统。

想想你自己的身体。你的大脑负责思考,你的肌肉负责行动。但还有第二套系统在底层运转,不需要你操心:你的心跳、你的反射、那些在你睡着时维持你生命的信号。哪怕你不再留意,你的心脏照样跳动。agent 却没有这样的东西。所以一旦你不再驱动它,它就停了。

神经系统能自己闭环,每一轮都无需人参与。它感知世界,在有事发生时唤醒 agent。它在某一步失败时凭反射做出反应,在等一个人或一个慢速 API 时把进度保持几个小时。当五百个请求同时涌来时,它让 agent 保持稳定。这就是一个你操作的 agent 和一个自己运转的 FTE 之间的区别。你给你的 agent 加上这套神经系统,而不是重写 agent。这就是整门课唯一立足的那个想法。

📚 教学辅助

打开完整幻灯片

查看完整演示文稿 —— AI Agent 神经系统


这个工具有一个技术名字:持久化执行引擎(durable execution engine)。我们用的这一个叫 Inngest。同样的模式在 Temporal、Restate 和 Dapr Agents 里也成立。而这并不只是一个教学比喻:Day AI,一家面向 AI 原生公司的 CRM,把 Inngest 称为他们产品的「神经系统」。Inngest 免费的 Hobby 套餐是最容易上手的地方:不需要信用卡,一条命令就能起一个开发服务器,还有一个你边搭边看的仪表盘。

在一切开始之前,先用一张图看清整套结构:

  1.  an EVENT happens   (e.g. a customer emails)
|
v
2. the INNGEST ENGINE catches it
(you do NOT build this. it runs your agent for you:
retries, waits, remembers every step, shows a dashboard)
|
| it reaches your code over a thin web wire (FastAPI)
v
3. YOUR AGENT runs
(the only part you write. it thinks and acts.)

这就是整个模型:两个程序。 引擎(你不用写它)接住事件并运行你的 agent(你要写它),通过一根细细的 web 线缆触达它 —— 这也是这门课里之所以会出现一个 web 服务器(FastAPI)的唯一原因。你在快速上手环节里把两者都启动起来,看着引擎驱动你的 agent。

这个例子刻意做得很小:一个客户支持 agent,它查询几位样例客户,起草一份回复,并且只在一个人批准之后才发起退款。agent 不是难点,所以我们把它做小,把精力花在围绕它的神经系统上。你会在这里从零把它搭出来。它接续在更早的 Digital FTE 课程结束的地方,不过 D0 会从零搭起一个最小化的 worker,以防你跳过了那门课。它以 Python 为先,基于 inngest-py:你用平实的英语指挥你的通用 agent,由它来写代码。

下面说说这门课是怎么搭建的,好让你以正确的方式来读。这次搭建是脊柱。 你在快速上手里把环境一次性配置好,然后第 4 部分用七个简短的提示词把整个 worker 搭出来,一次加一层神经系统。那是主线,亲手做才是模型落地的方式。第 1 至 3 部分里的十五个概念是这次搭建所依据的参考:每个概念讲一个想法,是你即将加上的那一层底下的「为什么」。有两条不错的路径。如果你喜欢先有想法再上手,就先读第 1 至 3 部分。或者直接进入快速上手和第 4 部分,每当某一层让你产生「可它为什么是这样工作的?」时,再翻回对应的概念。无论哪条路,第 4 部分都是你动手搭建的地方。

agent 及其神经系统。左侧,「世界」通过四种信号伸进来:cron、webhook、event,以及一次直接调用。中间,「AI WORKER」内含神经系统(Inngest,自主层)包裹着 agent。神经系统有三个编号的层:1 感知(触发器),2 反射(持久化执行:step.run、记忆化、重试),3 平衡(流量控制:并发、限流、replay)。右侧,agent(负责思考与行动,保持不变)持有 OpenAI Agents SDK、Skills、一个 MCP 服务器、Neon Postgres 和一个沙箱。不变量在于:agent 从不导入 Inngest,所以这套神经系统在 Inngest、Temporal、Restate 和 Dapr 之间随时可换。

agent 从不导入这套神经系统,所以你可以把 Inngest 换成 Temporal 或 Restate,而 agent 原封不动。

为什么 AI agent 需要一套神经系统(四个属性)

单个 agent 在任务中途崩溃只是让人烦。但一支五十个 agent 的队伍在没有神经系统托底的情况下处理面向客户的工作,则根本不可能:你要么采用一个能给你这套能力的平台,要么花六个月自己造一个更差的版本。有四个属性让这套神经系统对 agent 尤其重要:

  1. 每一步都花真金白银。 崩溃后的朴素重试会为那些已经成功的步骤重新付钱;步骤记忆化(概念 7)只付一次。
  2. 工作流会复合失败。 一个六步的 agent,若每步可靠性为 95%,那么它在某处失败的概率是 26%。步骤记忆化加上有针对性的重试,可以把整体可靠性提升到约 99.7%。
  3. 副作用是真实世界里的。 agent 会给客户发邮件、刷卡扣款、往 Slack 发消息。步骤记忆化加上 provider 层面的幂等键,让这些动作安全。
  4. agent 在高风险时刻需要人工审批。 没有 step.wait_for_event(概念 15),你就得自己搭一套审批队列:数据库表、轮询、超时处理、审计轨迹。那是一个项目,而不是一个特性。

Day AI,这家面向 AI 原生公司的 CRM,把这门课讲的每一个原语都用在了它的产品上:持久化的 LLM 工作流、wait-for-event 协调、失败时 replay、防抖加限流加并发,以及多租户公平性。他们的两位创始工程师各自独立地用了同一个「神经系统」的比喻。这是生产环境里的语言,不是课程的包装术语。

这门课在 Agent Factory 论点中的位置

Agent Factory 论点描述了任何生产级 agent 系统都必须满足的七大不变量。你在这里搭建的 worker 满足不变量 4(一个引擎)和不变量 5(一个记录系统,这里是一份小型审计轨迹)。这门课又补上两个,外加不变量 1 的一部分:

  • 不变量 7:世界来调用系统。 触发器(计划任务、webhook、入站 API 调用、来自其他 worker 的事件)唤醒 worker。Inngest 是其中一种实现。
  • 不变量 1 的一部分:人是主体。 审批关卡是被授权的意图重新进入运行时的地方。step.wait_for_event 是任何平台上最干净的表达:agent 挂起,一个人发出它所等待的事件,agent 恢复运行。
  • 持久化执行作为论点中隐含的不变量。 审计回答「发生了什么?」;持久化回答「从中断处再做一遍」。可 replay、可重试、可在失败后恢复。

15 个概念一览。 它们对应神经系统的三项工作:感知(触发器唤醒 worker)、反射(持久化执行在出问题时保持正确),以及平衡(流量控制在高负载下保持健康)。这是初见版本,每个概念配一句话的要点。当搭建过程中某处出问题时,文末的快速参考有一张「症状到概念」的诊断表,把你指回这次失败所属的那个概念。

15 个概念每个一句话(展开看完整地图)
#概念一句话要点
感知(触发器)世界如何触达 worker
1事件 vs 请求请求是同步的,有人在等;事件是异步的,世界已经往前走了。
2Cron 触发器一个时间表唤醒函数。一行:TriggerCron(cron="0 9 * * *")
3Webhook 触发器一个入站 HTTP 负载变成一个命名事件;你的函数对这个名字做出反应。
4幂等性与事件语义事件 ID 和步骤名让重复的事件(或重试)变成无操作。
5扇出与子 agent 委派一个事件,N 个订阅函数;或一个父函数触发 N 个子事件。
反射(持久化执行)在出问题时保持 worker 正确
6step.run 与持久化函数模型每个 step.run 都是一个检查点;函数可以在两步之间崩溃然后恢复。
7记忆化,底层的机制已完成的步骤返回存储的输出,而不是重新执行。
8step.sleepstep.wait_for_event两者都让函数持久地挂起,等一段时长或等一个事件。
9重试、错误处理、死信自动退避重试;N 次之后,失败的运行会保留下来供 replay。
10在 Python 中用 step.run 处理 AI 调用把 OpenAI 调用包进 step.runstep.ai.infer 卸载推理(step.ai.wrap 仅限 TypeScript)。
平衡与恢复高负载下的流量控制、恢复,以及人工关卡
11并发与限流concurrency 限制活跃运行数;throttle 限制每秒启动数。
12优先级与公平性优先级给队列排序;按键并发让每个租户拿到公平的份额。
13批处理把事件攒成一次批量函数调用,廉价地处理批量工作。
14Replay 与批量取消用新代码 replay 失败的运行;批量取消你不再想要的运行。
15step.wait_for_event 做 HITL 关卡函数挂起直到一个人批准,然后带着决定恢复。

前置条件。 这门课假定你已经做过 从 Agent 到 Digital FTE。如果做过,下面这些你已经全部满足,而且你手上有一个值得包裹的 worker:第 4 部分的神经系统正对着它,你也可以跳过 D0 里的从零搭建。如果没做过,先做那门课,或者干脆继续往下读:D0 会从零搭起一个最小化的 worker,让这门课其余部分仍然能独立成立。无论哪种情况,你都需要四样东西。

  1. 你会驱动一个通用 agent。 Claude Code 或 OpenCode,已安装并已认证。计划模式、规则文件、先读后写的工作流:如果这套节奏你熟悉,那你就校准好了。如果不熟悉,Agentic Coding 速成课会带你过一遍。
  2. 一个 OPENAI_API_KEY(或另一个你的通用 agent 能用的模型密钥),以及一个 Neon 账号,用作 worker 的 Postgres 记录系统。worker 会运行一个真实的模型,并在 Neon 里读写它的客户和审计轨迹。Neon 是免费的(无需信用卡),你在配置过程中点一下浏览器就能授权;如果你还没有账号,在 neon.com 上花大约一分钟注册即可。Inngest 开发服务器本身不需要任何账号。
  3. 可用的 Node.js 20+,尽管 worker 是 Python 写的。Inngest 开发服务器以 Node CLI 形式分发(npx inngest-cli@latest dev)。
  4. 一个可用的「事件驱动」vs「请求/响应」心智模型。 如果「世界发出一个事件,零个、一个或多个函数对它做出反应」读起来很熟悉,那你就校准好了。如果不熟悉,概念 1 会给你这个轮廓。
第一遍该怎么读这一页

两遍法。 第一遍把神经系统的模型,也就是它的三层,装进你脑子里;第二遍,在第 4 部分手放到键盘上,是你动手搭建的地方。如果你更喜欢先搭、让模型边搭边成形,那也行:从快速上手开始,跑完第 4 部分,把每个概念当作某一层引出「为什么」时你才翻开的参考。展开任何标着 「完成的标志」「该留意什么」 的内容:这些是可运行的行为,用来对照你的预测。在第 4 部分里,你第一遍读时可以略读那些承重的代码片段;每段周围的文字会告诉你这一层做什么,而你真正动手时由你的 agent 来写代码。「Try with AI」段落是可选的延伸提示词。每个概念以一个 预测(在往下读之前先给出答案)或一个 快速检查(检验你刚读到的规则)收尾;两者都是为了让你停一下,不是给你打分。每个术语都在它首次出现的语境里定义。

时效

截至 2026 年 5 月有效。 整个第 4 部分的搭建都针对一台真实的 Inngest 开发服务器和一个真实模型做了端到端运行,环境为 inngest 0.5.18、openai-agents 0.17.x(在 0.17.3 和 0.17.4 上分别搭建并再次验证)、fastapi 0.136.3、Python 3.12,以及 Inngest CLI。第 4 部分里的每一段代码都出自那次可运行的搭建,不是凭记忆写的。这门课所教的架构不会因为 SDK 变化而变化;SDK 只是今年触达它的接口。唯一一处 openai-agents 的小版本变动可能咬到你的,是 D5 的 resume 细节(运行状态序列化如何处理自定义上下文),所以那个 Decision 直接给出了实时文档的链接。如果某个实时文档页面和这一页在某个语法细节上出现分歧,以文档为准: 锁定你的版本,搭建时去查 Inngest Python 快速开始OpenAI Agents SDK 文档

选好你的工具,页面随之切换

在 Claude Code 和 OpenCode 之间有分歧的小节带有一个切换器;选一个,页面会在你多次访问之间保持同步。


十五分钟快速上手:搭好底座,看见那道反射

在那 15 个解释这套机制 为什么 成立的概念之前,先把这门课所运行的环境搭好,看一个任务在崩溃中存活下来。这套配置你只做一次;第 4 部分会在同一个底座上搭出真正的 worker。做完之后你将拥有:

  • 在你的通用 agent 里打开的底座,其 skill 和工具已替你配置好,
  • 一个全新的 Neon 数据库,里面有你的 agent 创建的两张表(customersaudit_log),
  • 一个运行中的小 worker,以及一个你可以盯着看的仪表盘,
  • 一次你看着它在等待时进入休眠的运行,全程消耗 零算力
  • 一次你故意弄坏的运行,然后看着系统重试:它保留了已经完成的工作,只重跑出问题的那部分,
  • 以及那同一个函数,里面换成一个 真实的 agent 在持久化步骤内写问候语,于是你收尾时看到的是一个 AI worker 在运行,而不仅仅是一个计时器。

最后那两条才是重点:重试就是整门课所讲的那道反射,而在它内部运行的 agent,就是这道反射所服务的那个承诺。这是 一坐就做完 的事,不是完整的第 4 部分搭建,所以先做完它,再回来看那些概念。

现在你启动 开篇里的那两个程序:你的 worker(你的代码)和 Inngest 开发服务器(在它旁边运行的引擎,仪表盘在 http://127.0.0.1:8288,其中 /runs 列出每一次运行)。它们通过一个小小的常驻 web 层 FastAPI 连接,那是开发服务器敲门以启动一次运行的门。整个循环一句话:一个 事件 到达,开发服务器通过那道门触达你的 worker,你的持久化函数一步一步地运行,每一步都被记录到仪表盘里。你的通用 agent 替你写好并启动两者;你的活儿是看。

还有一条边界很重要,和 Digital FTE 课程画的是同一条。你的 worker 把它的客户和它做过什么的记录保存在一个 Neon 数据库里,而这个数据库会以两种不同的方式被触碰。在你 搭建 时,你的通用 agent 用平实的英语替你伸进 Neon,去创建表、检查行。在 worker 运行 时,它通过它自己的一条普通连接与同一个数据库对话。那个搭建期用的工具永远不会接进运行中的 worker;Neon 自己的文档说得很直白,它是用来搭建和检查的,不是用于生产的。Neon 点一下就免费;Inngest 开发服务器则完全不需要账号。

拿到底座并打开它

下载底座,在你的通用 agent 里打开这个文件夹。agent 会按照下面的提示词自己完成配置。你 只配置一次ai-agent-nervous-system/ 是你整门课的文件夹,快速上手和第 4 部分共用。你永远不需要重新下载或重新解压。

下载 ai-agent-nervous-system-base.zip

cd ai-agent-nervous-system
claude

这个底座假定你有一个能干的通用 agent(Claude Code,或运行 Claude Sonnet/Opus、GPT-5 或类似模型的 OpenCode)。更小的模型会在搭建提示词上跑偏;如果它的第一份计划看起来含糊而不具体,在继续之前换一个更强的。

准备底座(约 3 分钟)

底座在 AGENTS.md 里附带了它的规则,以及它的 MCP 接线;Skills、你的密钥和 Neon 授权接下来才弄。让你的 agent 自己把自己配置好。粘贴这段:

Read AGENTS.md, then get this base ready: install the Skills it lists for whichever agent you are, copy .env.example to .env for me, and tell me exactly what you need from me to bring the Neon and Context7 MCP servers online.

该留意什么: agent 安装那四个 Inngest Skill 和 neon-postgres Skill(你会看到安装的运行过程和 Installed 确认),创建 .env,然后向你要两样东西:你的 OPENAI_API_KEY 以粘进 .env,以及点一下浏览器通过 OAuth 授权 Neon。Neon 是免费的;如果你还没有账号,在 neon.com 上花大约一分钟注册,或者直接在授权界面里建一个。INNGEST_DEV=1 已经在 .env 里了,所以 SDK 会以本地开发模式运行,不需要签名密钥。安装和接线都完成后,agent 会让你启动开发服务器(下一步),然后重启它,因为新的 Skill 和 inngest-dev MCP 不会在会话中途加载。

完成的标志: Skill 都装好了,.env 里有你的密钥,Context7 可达,Neon 已授权。inngest-dev MCP 会在开发服务器运行起来后上线,那就是下一步。

启动开发服务器,并确认 agent 能触达它(约 2 分钟)

这门课加了两条你的 agent 通过 MCP 去触达的边界:一个它搭建并检查的 Neon 数据库,以及一个它向其发送事件并观察的 运行中的开发服务器。所以在搭任何东西之前,把这两者都拉起来,确认它们是活的。

在它自己的终端里启动 Inngest 开发服务器(它是一个 Node CLI;让它一直运行):

npx inngest-cli@latest dev

仪表盘会在 http://127.0.0.1:8288 起来,开发服务器在 /mcp 暴露它的 MCP 端点。现在 重启你的通用 agent(退出后在 ai-agent-nervous-system 文件夹里重新启动),好让刚装的 Skill 和 inngest-dev MCP 都加载进来。然后粘贴这段:

List the Neon tools and the inngest-dev tools you can see.

该留意什么: 两份真实的列表。Neon 工具(创建项目、运行 SQL、描述表、获取连接字符串等等)是你的 agent 在数据库上的那只手。inngest-dev 工具(list_functionssend_eventinvoke_functionget_run_status 等等)是它在运行中的开发服务器上的那只手。下面的一切都骑在这两者之上。

关卡通过: 回复同时列出了真实的 Neon 工具名 真实的 inngest-dev 工具名。如果 Neon 工具不见了: OAuth 没走完;从准备步骤重做 Neon 授权。如果 inngest-dev 工具不见了: 开发服务器没在跑(去启动它),或者你跳过了重启(退出,在这个文件夹里重启,再问一次)。

搭好存储,并拿到它的连接字符串(约 3 分钟)

现在通过 Neon MCP 创建 worker 的记录系统,然后把它以后触达它所需的那一样东西交给 worker:一个连接字符串。你在第 4 部分搭的 worker 会在这里读它的客户、写它的审计轨迹。粘贴这段:

Paste this to your general agent. Plan first; execute on approval.

On a fresh Neon project, create two tables: customers (id, email, tier) and audit_log (a record of every action the worker takes). Then call the Neon tool that returns the connection string and write that URL into my .env as DATABASE_URL. Use the Neon tools for all of it; don't write SQL for me to run.

该留意什么: agent 调用 Neon MCP 工具 来创建项目和那两张表(你看到的是那些工具调用,而不是你敲的 SQL),然后把 DATABASE_URL 写进 .env。那个字符串就是交接物:Neon MCP 把存储开通好了,而你的 worker 用的是那个字符串,不是 MCP 服务器。

完成的标志: 一个全新的 Neon 项目存在,里面有一张 customers 表和一张 audit_log 表,.env 里有一个 DATABASE_URL。打开 console.neon.tech,选 agent 刚建的那个项目,打开 Tablescustomersaudit_log 就在那里,眼下还是空的。等 worker 在 D0 里运行时,你会看到行出现。(一张表就是一个电子表格:每行一件事,每列一个细节。)

搭出第一个持久化函数,并从你的 agent 来驱动它(约 3 分钟)

现在用你刚装的那些 Skill,搭出最小的持久化函数。Inngest 的 Skill 在它们的示例里是 TypeScript 优先的,所以你的 agent 从它们那里拿 模式(什么是一步、一个持久化函数长什么样),而从文档(开发服务器 MCP 的 grep_docs/read_doc,或 Context7)确认具体的 Python 签名,而不是凭记忆。粘贴这段:

Using the Inngest Skills, write one tiny Inngest durable function (call it greet-customer, triggered by a demo/greet event) that composes a greeting in one step.run, sleeps fifteen seconds with step.sleep, then composes a farewell in a second step.run and returns both. Serve it from a FastAPI host in local dev mode, and start the host on port 8000 with auto-reload on, so edits I make later are picked up without a manual restart.

它写出来的形状,好让你一眼认出:函数是普通的 async def,两个 step.run 调用包住应当被记忆化的工作,而中间的 step.sleep 让运行持久地挂起。进程可以在那次休眠期间崩溃、重启或重新部署,等计时器触发时,运行仍会在下一行恢复。有一个细节要在 agent 的代码里确认:Inngest 客户端是用 is_production=False 构造的,或者它读取了你 .env 里已有的 INNGEST_DEV=1。两者缺一,SDK 就会悄悄默认走 Cloud,你的函数永远不会在本地注册。

完成的标志: FastAPI host(前面说的那道门)在 8000 端口运行,开发服务器(上一步已经在跑)自动发现了它。让你的 agent 用 inngest-dev 的 list_functions 工具确认(或者打开 http://127.0.0.1:8288,点 Functions,看到 greet-customer 列在那里)。从这里起,你从你的 agent 发送事件,并在仪表盘里观察运行。

触发它,看一步以零算力休眠(由你驱动)

从你的 agent 发送触发事件。粘贴这段:

Send a demo/greet event with name Sara using the inngest-dev send_event tool.

(更喜欢用仪表盘?在 http://127.0.0.1:8288 里点 Events,再点 Send event,粘贴下面的负载,点 Send。两种方式启动的是同一次运行。)

{
"name": "demo/greet",
"data": { "name": "Sara" }
}

现在看那次持久化休眠,你大约有十五秒能现场抓到它。两种方式,选一个:

  • 让 agent 轮询(agent 原生的方式):「Poll get_run_status on that run until it finishes.」 休眠途中 agent 把运行报告为 Running,还没有结束时间,你的 host 终端全程空闲;然后它翻成 Completed,带着输出字典,以及一个大约十五秒的起止间隔。那个间隔就是休眠。
  • 盯仪表盘看: 立刻打开 http://127.0.0.1:8288Runs → 最新那次运行。第一步已经完成,sleep 那一步显示 Sleeping 并带一个恢复时间;十五秒后它自己恢复并翻成 Completed,返回的字典在 Output 面板里。

无论哪种方式,那十五秒里你的代码什么都不在跑:开发服务器持有恢复时间,host 闲着。这就是重点 —— 一次持久化的等待消耗零算力。(在它结束 之后 才打开运行,你就只会看到 Completed 和输出,现场的休眠已经过去了;重发一次再看快一点,或者让 agent 轮询。)

弄坏一步,看重试跳过它已经做完的工作(回报所在)

现在故意让一步失败,好让你看着记忆化把已完成的工作带过这次重试。把这段粘给你的 agent:

Make the farewell step raise an error on purpose, so I can watch a run fail. Keep everything else the same.

再发一次同样的 demo/greet 事件,然后读那次失败运行的 仪表盘里的逐步轨迹Runs → 最新)。回报就在这里,而且就在这 一次失败的运行 里:问候步骤显示 一次已完成的尝试,而告别步骤显示 若干次 Attempts,每次都带退避重试(Inngest 默认重试若干次),然后运行落到 Failed。仔细体会那个尝试次数的含义:已完成的问候步骤只付一次钱,不是每次重试都付。这就是你用自己的眼睛能看见的持久化执行。为什么 已完成的步骤会瞬间返回而不是重跑,是你将在概念 7 里遇到的机制;眼下,看着它发生就好。

运行这一步时有两件事要预期:

  • 逐步的证据在仪表盘里,不在 agent 里。 你的 agent 触发事件,能报告运行层面的状态,但开发服务器 MCP 的 get_run_status 返回的是运行摘要,里面 steps: null;它不会展开逐步的尝试。那些 就是 记忆化证据的尝试次数(问候停在一次,告别一路爬升)住在仪表盘的 Runs 视图里。这是快速上手里唯一一处你要去开浏览器、而不是用 agent 的地方。
  • 走到 Failed 要几分钟。 在默认重试和指数退避下,运行会对告别步骤持续重试好几分钟(一次真实运行大约四分半)才翻成 Failed。你不必等到底:记忆化证据从第一次重试起就显现,问候稳稳停在一次尝试,而告别累加更多。看个两三次尝试,就往下走。

(这个开发服务器搭建也不会显示一个单独的「memoized」徽章。记忆化 就是 那个尝试次数:已完成的步骤停在一次尝试,而坏掉的步骤一路爬升,这正是「从 memo 返回,没有重跑」在这里的样子。)

现在修好它:

Now revert the farewell step to the working version.

host 会自动重载(这就是 --reload 给你换来的;如果你跳过了它,手动重启 host)。再发一个新的 demo/greet 事件,整个函数现在会在修好的代码上干净地跑到 Completed。有一件关于恢复的事容易让人栽跟头。仪表盘的 Rerun 按钮会用你当前的代码 从头开始一次全新的运行,每一步都从零重新执行。那是事故恢复的正确工具:一次糟糕的部署弄坏了一批运行,于是你发布一个修复并把它们 rerun。但它 不是 那个保留 memo 的恢复。保留 memo 的恢复,是你刚才在失败运行内部看到的那次自动重试,已完成的步骤稳稳不动。

把它变成一个真实的 AI worker(通往第 4 部分的桥)

到目前为止这个函数只在摆弄字符串,这是刻意的:当别的东西都不挡道时,持久化更容易看清。现在让问候语来自一个真正的 agent,于是你看着同一套神经系统托起一次真实的 AI 调用。一条提示词把硬编码的问候换成一个小 agent;休眠、持久化和仪表盘全都原样不动。粘贴这段:

Replace the hardcoded greeting with a one-line call to a minimal hello-world agent built on the OpenAI Agents SDK (it just writes the greeting), still inside the same step.run. Keep the step.sleep and the farewell unchanged. Then fire a demo/greet event and show me the run.

唯一变的,是填进问候步骤的东西:不再是一个 f-string,而是一个模型来写它。而因为那次调用就坐在你已经证明过持久的同一个 step.run 里面,它白白获得了记忆化和崩溃安全,不需要任何新接线。像之前那样观察运行(从 agent 轮询,或在仪表盘里打开它):同样的三步轨迹,同样的零算力休眠,只是第一步的输出现在来自一个 agent。你的 OPENAI_API_KEY 在准备步骤就已经在 .env 里了,所以没有任何新东西要配置。

完成的标志: 一次 demo/greet 运行完成,输出里的问候语来自 agent,而不是一个硬编码的字符串。仔细体会你正在看的东西,因为它就是整门课的一句话总结:一个 AI agent,被一个事件唤醒,在一套神经系统内部持久地运行,并在一次崩溃中存活下来。第 4 部分会把这个 hello-world agent 换成一个真实的客户支持 worker,并用完整的神经系统包裹它(一个真实的事件触发器、一个会扇出的 cron、流量控制、一道人工审批关卡),但你此刻屏幕上的这个形状,就是那个形状。


你刚刚搭好了整门课的环境,并用自己的眼睛看到了神经系统在工作: Skill 都装好了,你的 Neon 存储已开通且 .env 里有 DATABASE_URL,开发服务器 MCP 是活的,你运行了一个持久化函数,看着一步在不消耗算力的情况下休眠,弄坏一步并看着自动重试把已完成的步骤从 memo 返回、只重跑坏掉的那一步,然后看着一个真实的 agent 在那同一个持久化步骤内部生成问候语。这就是这门课所讲的架构。课程的其余部分把它扩展开来:真实的感知(cron、webhook、扇出)、更强的反射(step.run 内部的 agent 调用)、高负载下真实的平衡,以及那道把「agent 可能搞砸这件事」变成「agent 起草、一个人批准、动作发出」的人工审批关卡。

如果有什么没成,下面四个问题几乎覆盖了全部情况:

  1. 开发服务器无法触达函数 host:确认 host 在 8000 端口运行。
  2. 客户端处于 Cloud 模式:agent 漏掉了 is_production=False,而 .env 里又缺 INNGEST_DEV=1,于是函数永远不会在本地注册。让它设上其中一个(显式的 is_production 值优先于环境变量)。
  3. 函数在仪表盘里不见了:host 没有重载;重启它。
  4. 一次运行卡住,没有报错也没有进展:一个失去同步的 host 会悄无声息地停滞;把 host 和开发服务器一起重启,并让一个 host 对一个开发服务器。(一个隐蔽的原因:如果 :8288 被占了、开发服务器起在 8289+,那么仅仅重新指向 inngest-dev MCP 的 URL 是不够的;host 仍在和 :8288 对话。在 host 上设 INNGEST_BASE_URL=http://127.0.0.1:<port>,让它跟着开发服务器去新端口。)

如果你碰到上面任何一个,那个万能的恢复动作在这里同样管用:「Something didn't work. Read the error, tell me in plain language what you see, and propose one fix I can approve.」

你搭出了什么,以及它往哪里长

环境配好了:底座已打开,Skill 都装好了,三个 MCP 服务器全部接好(Neon、Context7、inngest-dev),你的 Neon 存储里有它的 customersaudit_log 表且 .env 里有 DATABASE_URL,开发服务器在跑。你还用自己的眼睛看到了整门课立足的那一个想法 —— 持久化执行那道反射,并看着一个真实的 agent 在它内部运行。第 4 部分会把那个 hello-world agent 换成客户支持 worker,在同一个底座、同一个文件夹里:它读那些客户、写那些审计行,然后用完整的神经系统包裹整件事 —— 一个真实的事件触发器、一个会扇出的每日 cron、流量控制,以及退款上那道持久化的人工审批关卡。第 4 部分把这个 step.run-加-step.sleep 的骨架扩展成一个在你的 Neon 存储上干真实活的 worker。如果这次快速上手成了,前面的那些概念会解释每一块为什么是这个形状。


第 1 部分:感知,世界如何触达 worker

从这里起,第 1 至 3 部分是这次搭建背后的那排参考书架:十五个概念,每个一个想法,按神经系统所做的三项工作分组。你可以从头读到尾,也可以在第 4 部分某一层让你产生「它为什么这样工作」时,再翻进某一个里去。这第一组是感知。

一个你亲手调用的 AI agent,在你调用它时才运行。一个真正的 AI Worker 有感知:它在 世界 触达它时才运行。一位客户发来邮件,一个 webhook 到达,一个 cron 在每天 09:00 触发,另一个 worker 把活儿交接过来。这些每一个都是进来的一个信号,而触发器就是 agent 感受到它的方式。第 1 部分的五个概念正是这些感知:事件驱动的心智模型、世界伸进来的三种方式(cron、webhook、event)、防止重复处理的语义,以及让一个信号唤醒许多 worker 的扇出模式。

概念 1:事件 vs 请求,那个持久化的心智转变

这门课后面的一切都立足于一个心智转变:从请求到事件。

请求是一场同步对话。有人调用;你处理;你返回;他们继续。一个连接一直开着;一个人或一个服务在 等待。如果你崩溃了,调用方收到一个错误。一个你在提示符处与之聊天的 agent 就是一次请求:你敲字,它流式回传,对话属于你的那个终端会话。

事件是一条异步消息。世界里发生了某件事(一位客户注册了、一封邮件到了、一笔付款结清了),发起方发出一条命名的、记录该事实的记录。零个、一个或多个 函数各自独立地对这个事件做出反应。没有连接一直开着。发起方不知道谁在听,不等结果,也不被阻塞。世界已经往前走了。

# A request: I'm here, waiting, blocking
result = await agent.handle_customer_message(text=user_input)
print(result) # I unblock when the agent finishes

# An event: I fire-and-forget
await inngest_client.send(events=[
inngest.Event(
name="customer/email.received",
data={"customer_id": "c-4429", "body": email_body, "subject": subject},
),
])
# I return immediately. Somewhere else, one or more Inngest
# functions react to this event on their own schedule.

请求与事件对比。请求模型(上方,红色)是同步、阻塞且脆弱的:生产者发出一个请求,在一条开着的连接上等待,消费者大约运行八秒;一次崩溃丢失这次工作,而每分钟吸收五十个请求需要大约七个并行的处理器。事件模型(下方,绿色)是异步、持久且隔离的:生产者发出一个事件,在大约十毫秒内返回,写入一个持久化的事件日志,该日志扇出到独立的消费者(一个 agent、一个分析计数器、一个 VIP 检测器);让一个消费者崩溃,事件会在日志里等待并重试。事件就是那个信号;一旦存储,worker 就按自己的时间做出反应。

请求让生产者等待;事件把它解放出来,而存储下来的事件能在崩溃中存活。

这个转变听起来很小。其实不然。一旦你以事件来思考,持久性和扩展性几乎免费就掉了出来,因为:

  • 生产者不会被消费者拖慢(收邮件那一方不等 agent 起草完回复)。
  • 消费者可以崩溃并重启而不丢失工作(事件是持久存储的;Inngest 会重新投递它)。
  • 新的消费者可以加进来而不必改动生产者(第二个函数,比如一个分析计数器,可以订阅 customer/email.received,而收邮件那一方毫不知情)。
  • 背压变成一条流量控制策略,而不是一次代码改动(Inngest 限制并发;生产者照样触发;事件排队)。

预测。 你的客户支持 worker 回复一封邮件要 8 秒:三秒用于 agent 的推理,四秒用于两次 MCP 工具调用,一秒用于数据库写入。在峰值负载下你每分钟收到 50 封邮件。如果你用请求模型(邮件解析器阻塞到 agent 完成),这意味着到你邮件解析器的多少条并行 HTTP 连接?如果你用事件模型(邮件解析器发出一个事件并立即返回),又是多少?信心 1-5。

答案:请求模型需要 大约 7 个并发解析器(50/分钟 × 8 秒约等于 6.7 个并行处理器,再加点余量)。事件模型需要 一个解析器。它发出事件并在约 10 毫秒内返回,事件队列吸收掉这 50/分钟的尖峰,而 Inngest 函数以你允许的任意并发度消费这个队列。

那道差距就是全部重点。事件成了「世界里发生了什么」和「worker 对此做什么」之间一条持久的边界,而所有好处都从这一步流出来:生产者从不等待,崩溃的消费者从存储的事件上重试,新的消费者无需触碰生产者就能接上来。事件是你不再拥有工作时序的方式。

Try with AI
Walk me through three scenarios. For each, classify it as REQUEST-MODEL
or EVENT-MODEL, and explain which one fits better:

A) A user clicks "Submit refund request" in the support portal and
expects to see "Refund issued: $30" within 2 seconds.

B) A nightly cron job at 02:00 runs a customer-health-check across
all 5,000 customers and writes a report to Slack.

C) A customer sends an email to support@; we want a draft response
ready within 60 seconds for the on-call agent to review and send.

For each, name (a) what the human's expectation of timing is and
(b) what failure looks like if the model crashes mid-execution.

概念 2:Cron 触发器,因为时间流逝而运行的工作

最简单的触发器是时钟。一个 AI Worker 所做的许多事并不是对外部事件的反应;它们是计划内的工作:每日健康报告、每周清理、每小时重算。Inngest 的 cron 触发器只有一行代码。

import inngest

@inngest_client.create_function(
fn_id="daily-customer-health-check",
trigger=inngest.TriggerCron(cron="0 9 * * *"), # 09:00 every day, UTC
)
async def daily_health_check(ctx: inngest.Context) -> dict[str, int]:
"""Run a customer-health pass for every Pro/Enterprise customer."""
customers = await ctx.step.run("fetch-pro-customers", fetch_pro_customer_ids)

# fan out: one event per customer, one worker run per event
events = [
inngest.Event(name="customer/health_check.requested", data={"customer_id": cid})
for cid in customers
]
await ctx.step.send_event("fan-out", events)

return {"customers_scheduled": len(customers)}

有三点要注意:

  • 时间表就是标准的 cron 语法。 0 9 * * * 是每天 09:00 UTC;*/15 * * * * 是每 15 分钟;0 9 * * 1 是周一 09:00。Inngest 默认按 UTC 解释 cron;如果你需要别的时区,你在 cron 字符串本身前面加前缀(例如 TZ=Europe/Paris 0 12 * * 5),而不是传一个单独的参数。

  • 函数仍然用同样的持久化步骤。 不论 cron 触发还是事件触发,函数 的形状是一样的:ctx.step.run 处理副作用,ctx.step.send_event 来扇出。持久化的工作方式一样。流量控制的工作方式一样。触发器只是函数 怎么 开始。

  • cron 的产出是一次普通的 Inngest 函数运行。 它出现在仪表盘里,有运行 ID,有轨迹,支持 replay。如果你周一早上的 cron 运行在第 3 步失败,周二的 cron 会照常运行,而周一的失败仍可供你修好 bug 之后 replay。

如果 cron 触发时你的服务正好宕了会怎样? 这正是把持久化调度器和脆弱调度器区分开来的那个问题。Inngest 的 cron 运行在时间表触发的那一刻就被持久记录了。如果你的函数端点不可达,Inngest 会带退避重试,直到成功或撞到重试上限。09:00 触发的 cron 不会因为你 09:00 正在滚动部署就「错过」;运行等着,你完成部署,运行就完成了。开发中的 cron 触发器有一个值得知道的怪癖:本地开发服务器只在它运行时才触发 cron。生产环境在 Inngest 始终运行的基础设施上运行它们。

快速检查。 三个论断。每个标 True 或 False。(a) 如果一个 cron 函数跑 45 分钟、每 15 分钟调度一次,任意时刻都会有三个并发实例在运行。(b) 你可以在一个 cron 触发的函数里用 step.sleep 把工作摊到一天之中。(c) 一个 cron 触发的函数也可以从仪表盘手动调用以做测试。

答案:(a) 取决于并发策略:默认情况下 Inngest 会把重叠的运行排队;如果你设 concurrency=1 它们串行,如果你设 concurrency=10 它们并行。默认值是合理的。(b) True,而且这是「把每日工作摊到几小时以平滑负载」的常见模式。(c) True:Inngest 仪表盘让你按需调用任何函数来测试,不论它的触发器是什么。

Try with AI
With my AI coding assistant connected to the Inngest dev server MCP,
write a cron-triggered Inngest function in Python that:

1. Runs every Monday at 09:00 UTC.
2. Queries the audit_log table for all conversations resolved in the
prior week (status='resolved' in that window).
3. Computes per-agent metrics: total conversations resolved, average
resolution time, count of escalations, count of refunds issued.
4. Returns the metrics as a JSON object.

After you write the function, test it now instead of waiting for
Monday: trigger it on demand from the Inngest dev dashboard (the
Invoke button), since the dev server only fires crons while it is
running. Confirm the audit query is correct by running the SQL
directly against the database and checking the rows it returns;
grep_docs can confirm your step.run pattern matches Inngest's
examples, but only running the query proves the SQL itself.

概念 3:Webhook 触发器,当外部世界打进来

第一个触发器是时钟。第二个是 HTTP:你系统之外的某个东西(Stripe、你的邮件服务商、你站点上的一个表单、一个 GitHub 事件)想触达你的 worker。

要精确地说清哪一部分难,因为它不是你会猜的那一部分。接收 POST 很容易:像 FastAPI 这样的 web 框架三行就给你 @app.post(...)。难的是 POST 落地 之后 的一切:把那次调用排队、失败时重试、在工作中途崩溃后存活、拒绝重复处理一次重新投递、运行 agent、保持一个四小时的审批、从仪表盘 replay 任意一次运行。门很便宜;门后的厨房才是活儿,而那间厨房就是 Inngest。

所以这条路由保持很小。它的全部工作就是接收 POST,把事件交给 Inngest,并快速回 200。持久化的工作在它后面的 Inngest 函数里运行。如果你反过来在请求处理器 内部 干那活儿,你会撞上那些经典的 webhook bug:发送方超时并在你还在干活时重发,一次重启丢掉这个任务,一次重新投递给客户退了两次款。(Inngest 的托管选项甚至能铸造一个公开的 inn.gs/e/... URL,让你连路由都不用写。)

现在到了让所有人困惑的部分。你的应用最后有了 两道门,而它们朝着相反的方向:

  DOOR 1: the webhook door  (you write it, or use the hosted URL)
Stripe knocks here with DATA -> it just calls send() and is done

DOOR 2: /api/inngest (auto-made by inngest.fast_api.serve)
the ENGINE knocks here to RUN YOUR CODE, one step at a time
it speaks Inngest's own protocol, so a raw Stripe POST here is rejected

这两道门从不直接互相对话。它们只通过事件连接:门 1 把一个事件丢进去,引擎捡起它,再从门 2 回来运行你的函数。自动创建门 2(快速上手已经做了)对门 1 毫无帮助;门 1 是你仍要自己写的那一道。

那么 webhook 那道门究竟 调用 什么?只调用 send()。整条路由就这么小:

@app.post("/webhooks/stripe")
async def stripe_webhook(request: fastapi.Request):
payload = await request.json()
# verify the signature, reshape Stripe's envelope, then hand it off:
await inngest_client.send(
inngest.Event(name="stripe/charge.refund.failed", data=reshape(payload)),
)
return {"ok": True} # ack Stripe in milliseconds

那个 send() 把事件丢进 Inngest 的流里,路由就结束了。它不调用你的函数,也不调用 /api/inngest。Inngest 处理那一半:它把事件名匹配到 on_refund_failed,再从门 2 回来运行函数的那些步骤。端到端:

Stripe → 门 1(webhook)→ send() → Inngest → 门 2(/api/inngest)→ 你的函数

@inngest_client.create_function(
fn_id="handle-stripe-refund-failed",
trigger=inngest.TriggerEvent(event="stripe/charge.refund.failed"),
)
async def on_refund_failed(ctx: inngest.Context) -> dict[str, str]:
"""Triggered by Stripe webhook → Inngest event → this function."""
charge_id = ctx.event.data["charge_id"]

# Find the support ticket this refund belongs to
ticket = await ctx.step.run(
"find-ticket-for-refund", lookup_ticket_by_charge, charge_id,
)

# Hand the support worker the full context.
# step.run takes (step_id, handler, *args): pass args positionally, not as kwargs.
await ctx.step.run(
"notify-support-agent",
notify_support_agent_of_refund_failure,
ticket["id"], charge_id,
)

return {"ticket": ticket["id"], "action": "notified"}

那就是门后的函数:Inngest 把事件匹配给它并运行它,查出工单、通知支持 worker,而排队、重试和幂等都替你处理好了。Webhook 的工作几乎总是像这样 异步 的:函数在那次快速 ack 之后 在后台运行,从不在请求期间运行。

有两个值得起名的模式:

  • 通用 JSON webhook。 发送方不必是某个知名厂商。把任何能 POST JSON 的服务指向同一种 URL,事件名由你自己选。vendor/event.subtype 这种风格只是约定,但当你照着来时仪表盘会把事件归类得很整齐。
  • Webhook transform。 厂商的负载又大又嵌套,而且一个厂商常常把许多种事件类型发往同一个 URL。一个 transform 是一个小的重塑函数,它在负载到达的那一刻、在它变成事件之前,运行在 Inngest 的服务器上。(即便你的 worker 是 Python,它也是用 JavaScript 写的,因为它运行在 Inngest 那一侧,不在你的应用里。)它做两件事:挑选你的事件名,并把负载压平成你真正用到的那几个字段。你的函数代码就此摆脱了厂商专属的 JSON。

预测。 一个 Stripe webhook 在和你的客户支持 worker 调用 inngest_client.send 发出一个 不同的、名为 customer/refund.investigation_needed 的事件的同一毫秒,触发了 stripe/charge.refund.failed。两个事件同时到达系统;上面那个函数只在 Stripe 事件上触发。这个函数会运行一次还是两次?信心 1-5。

答案:一次。 一个函数只为它注册的那个事件名触发。stripe/charge.refund.failedcustomer/refund.investigation_needed 是不同的名字,所以它们唤醒不同的函数(或谁都不唤醒),不管它们是不是在同一瞬间落地。事件名就是那个路由键。

这也是为什么命名不是装饰。一个错字,把 customer/email_received 写成函数监听的 customer/email.received,函数就会悄无声息地永远不运行。什么都不报错;活儿只是没发生。仪表盘是你的安全网:匹配不上任何函数的事件,会出现在一条你可以盯着看的、单独的未匹配流里。

在本地,没有 URL 可粘。 上面的一切都是 生产 路径。在你的笔记本上你没有公开 URL,Stripe 也触达不了 localhost。所以你搭建时,自己扮演 webhook 的角色:send_event(或开发仪表盘的「Send to Dev Server」按钮)注入一个真实 webhook 本会产生的那个事件。这就是为什么下面的动手环节用 send_event 测试,从不碰 Stripe。

这道分割值得记牢:

事件如何进来
生产Stripe POST 到你线上的 webhook URL;它在你的流里变成一个事件
本地开发(你)你用 send_event 注入那个已经成形的事件

你的函数代码两种方式下都 完全一样;它只对事件名做出反应,永远不知道事件来自一个真实 webhook 还是你的 send_event

Try with AI
I need to handle three webhook sources for my customer-support worker:

A) Stripe: refund failed, charge disputed
B) Postmark (email service): bounced email, complaint
C) My internal admin UI: manual "investigate this ticket" button

For each, decide:

1. What event names you'd use (vendor/event.subtype format).
2. Whether the function reacting to it should run synchronously (the
caller is waiting) or asynchronously (fire and continue).
3. Whether you'd write a webhook transform to reshape the payload, or
consume it raw.

Then write the Inngest function for the Stripe refund-failed case in
Python, using the MCP's grep_docs to find the current syntax for
TriggerEvent and the dev-server MCP's send_event tool to test it.

概念 4:幂等性,当同一个事件到达两次

同一个事件有时会到你这儿两次。一位客户点了「Issue refund」,页面很慢,于是这次点击触发了两次;又或者请求过去了,但回给调用方的确认丢了,于是调用方重试。无论哪种,你的 worker 现在为一笔真实退款看到了两个 customer/refund.requested 事件。如果它每个都发起退款,这位客户就被退了两次款。

这是事件系统里最常见的 bug,不是什么罕见边角情况。发送方会一直重试直到收到确认(网络丢包、服务器重启、端点超时),所以你被承诺的是 至少一次 投递,从不是 恰好一次。解药是让第二份副本无害:对第一份动作,认出重复的那份,跳过它。这个属性有个名字。当某件事运行两次和运行一次结果相同时,它就是 幂等的

Inngest 内建了两层这种保护。

第 1 层:事件 ID 在源头播种。 当你自己发送一个事件(而不是从 webhook 接收)时,你可以附上一个幂等键:

await inngest_client.send(events=[
inngest.Event(
name="customer/refund.requested",
data={"order_id": "o-4429", "amount_cents": 5000},
id=f"refund-request-{order_id}", # idempotency key: identical on every retry
),
])

如果在去重窗口内(默认 24 小时)发送了第二个 id 相同的事件,Inngest 会丢掉这个重复。同一个逻辑事件、同一个 id,只有一次函数运行。这个键在每个重复上都必须 相同,这正是关键所在。用请求中稳定的某样东西来构造它(这里是订单 id),绝不要用时间戳或随机值,那些每次发送都变,会悄无声息地让去重失效。

这也是你如何从一开始就驯服本节开头那个被重发的 webhook。你不直接在 webhook 事件上设 id,而是由把 POST 变成事件的那一方(托管的 transform,或你自己的接收路由)从 provider 自己的事件 id 来设它。Stripe 在每个事件上盖一个唯一 id,重试时原封不动地重发,所以重新投递的 webhook 携带同一个 id,去重得和一个自发事件一模一样。

第 2 层:步骤级幂等。 在一个函数内部,每个 step.run 由它的名字标识。如果一个函数在第 3 步和第 4 步之间崩溃,重试会从头重跑函数代码,但对于第 1、2、3 步,Inngest 返回 存储的 输出而不重新执行步骤体。第 4 步第一次正常运行。这就是让一个函数「持久」的东西:已完成步骤的副作用不会在重试时再次发生。

@inngest_client.create_function(
fn_id="issue-customer-refund",
trigger=inngest.TriggerEvent(event="customer/refund.requested"),
)
async def issue_refund(ctx: inngest.Context) -> dict[str, str]:
# Step 1: look up the order. If the function retries, this returns
# the SAME order data it computed the first time, from Inngest's memo.
order = await ctx.step.run(
"lookup-order", lookup_order_by_id, ctx.event.data["order_id"],
)

# Step 2: call Stripe. If the function retries AFTER this step
# succeeded, the Stripe call does NOT happen again. The refund is
# issued exactly once even if the function runs three times.
refund = await ctx.step.run(
"issue-stripe-refund",
lambda: call_stripe_refund_api(
charge_id=order["stripe_charge_id"],
amount=ctx.event.data["amount_cents"],
),
)

# Step 3: write the audit row. Same property: runs at most once.
await ctx.step.run(
"audit-refund",
lambda: write_audit_refund_issued(order_id=order["id"], refund=refund),
)

return {"refund_id": refund["id"]}

如果这个函数在第 3 步期间崩溃,重试会重新进入第 1 步(拿到缓存的订单数据,无数据库调用)、重新进入第 2 步(拿到缓存的退款数据,无 Stripe 调用)、真正运行第 3 步、返回。客户的卡只被扣一次,哪怕函数运行了三遍。 这是最要紧的那个属性。它正是让 Inngest 与「一个带重试循环的队列」在本质上不同的东西。

(第 1 步把它唯一的参数按位置传入。第 2 步和第 3 步则把它们的调用包进一个 lambda,因为 step.run 只转发位置参数,所以 lambda 是你把一个用关键字参数的调用交给步骤的方式。两种形式都管用,而且 lambda 还把步骤体变成一个 Inngest 能记忆化的自包含单元。)

外部边界处的恰好一次需要两层

记忆化从函数的视角给你恰好一次的步骤 完成:一旦一步被记录为成功,它就永不重跑。但有一个狭窄的窗口。如果一步调用了 Stripe,而进程在 Stripe 扣款 之后、Inngest 记录结果 之前 死掉,重试会再次调用 Stripe,因为在 Inngest 看来这步从未完成。修法是把步骤记忆化和 provider 自己的幂等键配对(Stripe 的 Idempotency-Key 头,或你其他 provider 暴露的任何去重 id)。两者互补,而非互相替代:step.run 让你函数的 内部逻辑 恰好一次;provider 的键让 外部副作用 恰好一次。

快速检查。 True 还是 False。(a) step.run 只在它内部的函数也幂等时才让步骤幂等。(b) 一个在去重窗口外携带重复 ID 的事件,会被当作一个新事件。(c) 如果 step.run 在执行途中失败(步骤代码抛出异常),Inngest 存下这次失败并在下一次尝试时重试该步骤,而不重跑之前的步骤。

答案:(a) Falsestep.run 单凭自身就给 步骤 提供「成功即至多一次」;它不需要里面的代码幂等。一旦一步被记录为成功,它的步骤体在重试时永不重跑。唯一的例外是上面那条注释里的窗口:如果进程在 Stripe 扣款之后、Inngest 记录该步骤之前死掉,重试会重新调用 Stripe,这正是为什么用一个 provider 幂等键来托底。但你函数的 内部 逻辑,你从不必手动让它幂等。(b) True:Inngest 的去重窗口默认 24 小时;那个窗口之后同 ID 的事件被当作新的。(c) True:自动重试是记忆化的;Inngest 知道第 3 步在第 1 次尝试失败,于是第 2 次尝试只重试第 3 步。之前成功的步骤不重新执行。(这是运行内重试,不是仪表盘的 Replay 按钮,后者是一次全新运行,见概念 14。)

Try with AI
Here are three scenarios. For each, decide: idempotency PROBLEM or
NO PROBLEM, and if it's a problem, what's the fix:

A) Stripe sends the same charge.refund.failed webhook three times
in 90 seconds (because their first two attempts timed out at
your endpoint). Your function emails the customer.

B) A customer clicks "Issue refund" three times because the page
was slow. Your function calls Stripe and writes audit_log.

C) Your nightly cron at 09:00 sends a customer-health-check event
to each Pro customer. If two crons fire at the same time (a deploy
bug), what happens?

For each problem case, propose ONE specific fix: event ID seed
inside the function, idempotency key in inngest_client.send, or
function-level deduplication on the trigger.

概念 5:扇出与子 agent 委派,一个事件多个 worker

一个事件常常需要在 许多 处触发工作。Stripe 的 charge.refund.failed 事件可能需要:通知支持 agent、写审计、更新客户的风险评分、提醒财务运营、往 Slack 发消息。五个反应,各自独立,全都来自一个事件。

Inngest 的模式:让多个函数订阅同一个事件。 没有扇出代码;只是多个带相同 TriggerEvent@inngest_client.create_function 装饰器。每个函数独立运行,有自己的重试,有自己的步骤轨迹,与其他函数互不影响地失败。

@inngest_client.create_function(
fn_id="refund-failed-notify-support",
trigger=inngest.TriggerEvent(event="stripe/charge.refund.failed"),
)
async def notify_support(ctx: inngest.Context) -> dict[str, str]:
# ... runs the customer-support worker to draft a response ...
return {"status": "drafted"}


@inngest_client.create_function(
fn_id="refund-failed-update-risk-score",
trigger=inngest.TriggerEvent(event="stripe/charge.refund.failed"),
)
async def update_risk_score(ctx: inngest.Context) -> dict[str, float]:
# ... runs the risk-scoring worker ...
return {"new_risk_score": 0.42}


@inngest_client.create_function(
fn_id="refund-failed-post-slack",
trigger=inngest.TriggerEvent(event="stripe/charge.refund.failed"),
)
async def post_to_slack(ctx: inngest.Context) -> None:
# ... posts a Slack notification ...
return None

一个 Stripe webhook 到达。Inngest 创建一个事件。三个函数触发,各在自己的运行里。如果 post_to_slack 因为 Slack 宕了而失败,另外两个不受影响,正常完成。失败的那次运行留在仪表盘里,等 Slack 恢复后供 replay。这是多 worker 协调的核心,也是你未来那一层管理者(后面一门课)将在规模上去组合的架构模式。

另一个扇出模式:父函数触发 N 个子。 有时扇出是动态的。你的每日 cron 需要给每位 Pro 客户触发一个客户健康事件,视那一周而定可能是 500 个或 5,000 个。父函数发送 N 个事件:

async def fan_out_per_customer_events(
ctx: inngest.Context,
customers: list[str],
run_day: str, # pinned by the caller (the cron's scheduled date), never date.today()
) -> int:
events = [
inngest.Event(
name="customer/health_check.requested",
data={"customer_id": cid},
id=f"daily-health-{cid}-{run_day}", # stable id: identical on every retry
)
for cid in customers
]
# ctx.step.send_event memoizes the send, so a retry of this function
# does not re-fire the fan-out (and even if it did, the stable ids dedup).
await ctx.step.send_event("fan-out", events)
return len(events)

那 5,000 个事件在一个 send_event 步骤里发出去(底层会把一个大列表分块成几次批量调用,而不是字面上的一次 HTTP 请求)。5,000 次函数运行触发,每次带自己的 customer_id,每个隔离,每个可独立重试。流量控制(概念 11)限制同时运行多少个,免得你熔了下游 API。cron 函数在几秒内返回;扇出以 Inngest 流量控制策略允许的任意速率运行。

子 agent 委派 是扇出的一个特例。在一次 worker 运行内部,你通过发送更多事件(await ctx.step.send_event(...),于是这次委派像其他任何步骤一样被记忆化)把子任务委派给其他 worker 类型。父不等子,除非它显式使用 step.invoke(它运行一个子函数并等待其结果)来收集它们的结果。

预测。 你有三个函数都由 customer/email.received 触发:起草回复的客户支持 agent(15 秒)、一个分析计数器(50 毫秒),以及一个检查客户是否高价值的「VIP 检测器」(200 毫秒)。当一封邮件到达时,每个函数对用户可见的延迟看起来如何?三个选项:(a) 三个加起来约 15 秒;(b) 三个并行运行,总延迟约 15 秒(最慢的那个);(c) 每个独立运行,完全没有共享延迟。信心 1-5。

答案:(c)。每个函数都是自己的一次运行,在自己的进程槽里。客户支持 agent 不阻塞分析计数器;VIP 检测器不阻塞 agent。从外面看,任一特定函数的延迟就是那个函数自己的时间。这就是扇出能扩展的原因:消费者是隔离的,如果 agent 崩溃,分析计数器不受影响。唯一的注意点,概念 11 会展开:这种隔离是 在不同函数之间。当单个函数扇出到它自身的成千上万次运行时,一个并发上限会刻意让靠后的那些运行排队,于是那些同函数的兄弟确实要等自己的轮次。不同函数从不互相阻塞;同一函数的许多次运行会。

Try with AI
Design the fan-out architecture for these three scenarios. For each,
sketch the event names and the functions that subscribe:

A) New customer signs up. Need to: send welcome email, create
Stripe customer, post to Slack #new-customers, write to
audit_log, schedule a 7-day follow-up.

B) Customer support email arrives. Need to: draft a reply (agent),
detect sentiment, check if VIP, update customer's "last contact"
timestamp, attach to the right ticket thread.

C) Daily cron at 09:00 needs to run customer-health-check on
~5,000 Pro customers. Each check takes ~30 seconds. We want
the whole batch to complete by 11:00 (a 2-hour window).

For each, decide: how many event types, how many subscriber
functions, what the idempotency story is, and one specific failure
mode this design protects against.

第 2 部分:反射,当某处出问题时会发生什么

第 1 部分讲的是工作如何触达 worker。第 2 部分讲的是当那份工作中途坏掉时会发生什么。

想象一个真实 worker 的一轮。它调用一个 agent,agent 调用几个工具,那些工具撞上一个数据库、一个支付 API 和一个模型。那是接连几次网络调用,其中任何一次都可能失败:一次超时、一条断掉的连接、一个宕了几秒的服务。没有保护时,一个小小的失败就把 worker 刚做的一切全扔掉,从头开始整一轮。

持久化就是那个修法,而且它说起来很简单:当某处中途失败时,已经完成的步骤保持完成,worker 从坏掉的那一点接着干,而不是从头来过。 在神经系统的图景里,这就是那道反射:它就那么发生,飞快,agent 根本不必去想。

Inngest 用一个工具 step.run 加底下一套叫记忆化的机制给你这个能力。第 2 部分讲这两者,然后是基于时间的版本(step.sleepstep.wait_for_event)、重试如何表现,以及 step.ai 这些辅助方法。

如果你在略读: 最要紧的两个是概念 6(step.run)和概念 7(记忆化)。第 2 部分其余的一切都建立在它们之上,所以慢慢读这两个。它们一旦想通,概念 8 到 10 会过得很快。

概念 6:step.run 与持久化函数模型

一个普通的 Python 函数运行一次,从上到下。如果它中途崩溃,你从头开始。如果它在崩溃前发起了三次 API 调用,下一次尝试会再发那三次,再付一遍钱,可能再给某人多扣一次款。

一个 Inngest 函数是持久的。每个你想设为检查点的操作都被包进 step.run(name, fn, ...)。然后 Inngest 一步一步地驱动这个函数。它从头运行你的处理器,当它到达一个还没做过的步骤时,它运行那一步,保存结果,再从头重新进入处理器,这一次 为每个已完成的步骤返回它存储的输出,而不是重新执行。函数「追赶」到它上次停下的地方,迈出下一步,如此往复。(所以一个函数的处理器体会运行很多次,每步一次,而不只是在出问题时。)

为什么非要重新进入处理器,而不是干脆从停下处继续?因为开篇那两个程序。引擎和你的函数是两个独立的程序。一个程序无法在另一个程序的代码中途暂停并保持它的位置。所以引擎只能用它唯一能用的方式来驱动你的函数:它通过 web 调用你的函数,运行到下一个未完成的步骤,让那一步运行,把结果拿回来。然后它把那个结果存在自己这一侧,再为下一步调用你的函数,把它已经存好的一切交回去。

  ENGINE                                   YOUR FUNCTION (host)
| call: run from the top -----------> runs to step 1, does it
| <---------------------------------- returns step 1's result
stores result 1
| call again -----------> step 1 from memo, runs step 2
| <---------------------------------- returns step 2's result
stores result 2
| call again -----------> steps 1-2 from memo, runs step 3
| ...and so on, one call per step

这就是全部机制。「从头重跑,已完成的步骤从 memo 来」不过是引擎每步调用你的函数一次、把结果留在它那一侧。而因为结果住在引擎那一侧,一个已完成的步骤即便你的 host 在运行途中崩溃并重启也能存活。

@inngest_client.create_function(
fn_id="customer-support-conversation",
trigger=inngest.TriggerEvent(event="customer/email.received"),
)
async def handle_email(ctx: inngest.Context) -> dict[str, str]:
customer_id = ctx.event.data["customer_id"]

# Step 1: load the customer record (one DB call)
customer = await ctx.step.run(
"load-customer", load_customer_by_id, customer_id,
)

# Step 2: load the conversation thread (one DB call)
thread = await ctx.step.run(
"load-thread", load_thread_for_customer, customer_id,
)

# Step 3: run the OpenAI Agents SDK agent (your worker).
# step.run forwards only positional args, so a call that needs keyword
# args is wrapped in a lambda (the step body becomes a no-arg callable).
response = await ctx.step.run(
"run-agent",
lambda: run_customer_support_agent(
customer=customer,
thread=thread,
email_body=ctx.event.data["body"],
),
)

# Step 4: write the draft reply to the database
await ctx.step.run(
"save-draft-reply",
lambda: save_reply(customer_id=customer_id, text=response.draft),
)

# Step 5: notify the on-call human reviewer via Slack
await ctx.step.run(
"notify-reviewer",
lambda: post_slack_for_review(response=response),
)

return {"status": "drafted", "reviewer_notified": True}

五步。每一步都独立地设了检查点。

持久化给你换来什么,用能击中这个函数的三种失败来看:

如果这里失败没有 step.runstep.run
agent 超时(第 3 步)重试重新加载客户和会话线程,从零重跑 agent,把 OpenAI token 钱付两遍第 1-2 步从 memo 回来;只有第 3 步重试,而且 Inngest 替你处理那个瞬时错误
进程在第 3 步和第 4 步之间被杀(部署、重启、OOM)agent 的回复丢了;邮件无人应答,直到有人注意到函数在重启后恢复:第 1-3 步在毫秒内从 memo 返回,第 4-5 步运行,客户拿到回复
Slack 返回 503(第 5 步)你丢了这次工作,或者你只为 Slack 手写重试加退避Inngest 带退避重试第 5 步,直到 Slack 恢复或重试预算耗尽;第 1-4 步保持完成,草稿已经存好

你不写任何重试循环、任何「我是不是已经做过这件事」的检查,也不写你自己的任何状态机。那个状态机 就是 这一串 step.run 调用。

step.run 唯一的一条规则。 一步必须 可安全重跑:如果它失败、Inngest 再次运行它,第二次运行绝不能弄坏任何东西。

  • 纯函数自动安全。
  • 幂等的 API 调用安全(Stripe 的 idempotency_key,你自己的 MCP 服务器工具):重复就是无操作。
  • 非确定性的工作仍然可安全重跑;你只是可能在重试时拿到 不同的 结果。一个新的随机 ID,或一次默认温度下的 LLM 调用,在第二次尝试时会不一样。对一个 agent 的回复来说这没关系(任何有效草稿都行)。当那个确切的值必须在重试间保持稳定时,把它钉死:传一个种子,或者在它自己更早的一步里生成一次再读回来。

快速检查。 True 还是 False。(a) 每当 Inngest 推进到下一步时,函数体都从头重新执行,不只是在重试时,会重跑你的 step.run 调用之间的普通代码(变量赋值、分支)。(b) 如果一步要 30 秒完成、函数在第 25 秒崩溃,重试会从第 25 秒接着那一步继续。(c) step.run 的输出存在 Inngest 的基础设施里,不在你的应用里。

答案:(a) True,而且这让人意外:Inngest 在每一步都从头重新进入你的处理器,从 memo 跳过已完成的步骤。所以在一次干净的运行里,step.run 之外 的代码会运行很多次,不只是在重试时。一步 之内 的代码运行一次,然后从 memo 返回。(模块级的 import 无论如何只加载一次;只有处理器体会重跑。)这才是把工作放进 step.run 内部的真正理由。(b) Falsestep.run 是那个原子单元;如果一步被打断,重试会重跑整个步骤。如果你的某一步长到不能允许它重启,你就把它拆成更小的步骤。(c) True:步骤输出存储是 Inngest 的一部分,不是你的数据库。这就是为什么即便你的数据库 schema 已经变了,你还能 replay 那些运行。

Try with AI
With my AI coding assistant connected to the Inngest dev server MCP,
shape a customer-support worker into an Inngest durable function.
Take a Runner.run call that processes a customer email and wrap each
of these inside its own step.run:

1. Load the customer record
2. Load the related conversation thread
3. Run the agent (the OpenAI Agents SDK Runner)
4. Persist the draft reply
5. Notify the on-call reviewer

Use grep_docs to find the current Python SDK syntax. Use
invoke_function to test it with a synthetic email payload. Then
deliberately raise an exception in step 4 and use get_run_status
to confirm steps 1-3 don't re-execute on retry.

概念 7:记忆化,可恢复性底下的那个机制

概念 6 说过「已经完成的步骤返回它们存储的输出,而不是重新执行」。那个机制就是 记忆化(memoization),值得仔细看一看,因为每个别的 Inngest 原语都建立在它之上。

当你调用 await ctx.step.run("load-customer", load_customer_by_id, "c-4429") 时,Inngest 维护一个以 (run_id, step_name) 为键的 memo 存储。同一行代码的行为取决于那个键是不是已经填好了:

  • 第一次尝试: memo 是空的,所以 load_customer_by_id 真的运行,Inngest 在把结果交还给你之前保存它的返回值。
  • 之后每一次重放(Inngest 在迈向下一步时重新进入处理器,在任何重试时也是):memo 里已经有了 load-customer,所以 load_customer_by_id 运行,那次数据库调用从不发生,保存的值在毫秒内回来。

这就是为什么重试便宜(昂贵的工作已经缓存了),为什么持久化是正确的(昂贵的工作从不发生两次),以及为什么「函数体从上到下重跑」尽管听着浪费却没关系:步骤内部的工作其实不会重跑;重跑的只是步骤之间的编排代码。

跨两次尝试的步骤记忆化。第 1 次尝试从左到右运行五步:load-customer、load-thread 和 run-agent 各自完成并存储其输出,然后 save-draft 崩溃,notify 从未到达。第 2 次尝试,也就是重试,从头重跑函数,但 load-customer、load-thread 和 run-agent 以零成本从 memo 存储返回(昂贵的 run-agent 步骤不再付费);save-draft 现在真正运行,notify 完成。一个以 (run_id, step_name) 为键的 memo 存储持有那些存储的输出。三个属性:重试便宜、副作用只发生一次、步骤名就是 memo 键。

已完成的步骤只付一次钱,不是每次重试都付。

让新用户意外的那个推论。 step.run 之外 的代码在 Inngest 每次重新进入处理器时都运行,那是每步一次,不只是在重试时。如果你这样写:

async def handle_email(ctx: inngest.Context) -> dict[str, str]:
# ANTI-PATTERN: this re-runs every time Inngest advances a step. Don't do this.
expensive_thing: dict = await fetch_expensive_data(ctx.event.data["id"])

await ctx.step.run("do-something", do_something_with, expensive_thing)
return {"status": "done"}

fetch_expensive_data 会在函数迈出的每一步上再次运行,哪怕没有任何失败。这个单步例子在一次干净的运行里就已经调用它两次(每次处理器重新进入一次),而你每加一步就多一次调用。所以按每次调用 $0.10 算,它在任何东西坏掉之前就已经在浪费钱,而一次重试又把这一切重付一遍。修法是把那个昂贵的东西包进它自己的一步:

async def handle_email(ctx: inngest.Context) -> dict[str, str]:
expensive_thing: dict = await ctx.step.run(
"fetch-expensive-data", fetch_expensive_data, ctx.event.data["id"],
)
await ctx.step.run("do-something", do_something_with, expensive_thing)
return {"status": "done"}

现在 fetch_expensive_data 被记忆化了;重试不会再为它付钱。

步骤名就是 memo 键。 Python SDK 不会在重复的名字上冲突;它按调用顺序自动编号(load-customer,然后 load-customer:1,然后 load-customer:2),于是每个拿到自己的 memo 槽。但别依赖这个:那些自动编号不带任何含义,所以仪表盘轨迹里显示的 load-customer:7 关于 哪位 客户什么都没告诉你,而插入或删除一步会移动后面所有的编号。给每次调用一个稳定的、由数据派生的名字,比如在循环里写 step.run(f"load-customer-{customer_id}", ...),让 memo 键和数据绑定,而不是和调用顺序绑定。

预测。 你的函数有三步。第 1 步(load-customer)的数据库调用花 $0.01,耗时 100 毫秒。第 2 步(run-agent)的 OpenAI token 花 $0.20,耗时 12 秒。第 3 步(save-draft)的数据库调用花 $0.005,耗时 50 毫秒。第 2 步因 OpenAI 限速有 30% 的概率失败;Inngest 带退避重试。(a) 把三步都包进 step.run 和 (b) 只把第 2 步包进 step.run,两者的成本差是多少?信心 1-5。

答案:用 (a),第 2 步一次重试只花第 2 步本身的成本($0.20);第 1 步被记忆化跳过,第 3 步还没运行。用 (b),第 1 步在 step.run 之外,所以它在第 2 步每次重试时都重新执行:每次重试约 $0.21(第 1 步 $0.01 加第 2 步 $0.20)。这里第 3 步不是成本所在,它只运行一次,在第 2 步终于成功之后;重点在于,任何在失败步骤 之前 的工作,除非你把它包起来,否则都会重跑。在一千封邮件、30% 重试率下,那大约是 $3 的第 1 步数据库调用被白白浪费,而真正的危险比钱更大:如果第 1 步有一个副作用(一次写入、一次扣款),把它留在 step.run 之外会让那个副作用在每次重试时再次发生。把你不想重新执行的一切都包进 step.run 一旦你理解了这个机制,它就不是可选项。

Try with AI
With my AI coding assistant: review the Inngest function we built
in Concept 6's Try-with-AI and identify any code BETWEEN step.run
calls that should be wrapped in its own step but isn't. Common
candidates:

- Computed values (timestamps, IDs, formatting) that we want to be
stable across retries
- Calls to logging or metrics services
- Reads from Redis, environment variables, secret managers

Then propose a refactor that moves each of these into its own step
with a meaningful name. For each, explain whether the side effect
is one you want to happen once (use step.run) or every retry
(leave it outside).

概念 8:step.sleepstep.wait_for_event,穿过时间的持久化

有些工作必须等待。一条欢迎邮件流水线立刻发一封邮件,然后等三天,再发一封跟进。一次退款调查需要等一个人批准。一条试用转化流程在 7 天内盯着「用户升级为付费」,并根据它看到什么发不同的邮件。

在一个普通的 Python 函数里,「等三天」意味着把一个进程开着三天。那不可行:你的进程会重启,你的托管商会按 72 小时的闲置算力向你收费,你的计时器会丢失。在 Inngest 里,「等三天」是一行:

from datetime import timedelta

@inngest_client.create_function(
fn_id="trial-welcome-series",
trigger=inngest.TriggerEvent(event="user/trial.started"),
)
async def welcome_series(ctx: inngest.Context) -> dict[str, str]:
user_id = ctx.event.data["user_id"]

await ctx.step.run("send-welcome-email", send_welcome_email, user_id)

# Wait three days. The function gets paged out of memory. Nothing
# is consuming compute. Three days later, Inngest pages it back in
# and resumes execution at the next line.
await ctx.step.sleep("wait-three-days", timedelta(days=3))

await ctx.step.run("send-followup", send_followup_email, user_id)

return {"status": "completed"}

step.sleep 是持久的,是休息中的神经系统。函数挂起;Inngest 存下恢复时间;你等待时没有任何东西消耗算力;函数在正确的时间恢复,所有之前的步骤输出仍然被记忆化着。step.sleep(以及 step.sleep_until)在付费套餐上最多可等一年,在免费的 Hobby 套餐上最多七天(Inngest 用量限制)。七天的 Hobby 上限对这门课用到的每一次休眠都足够宽。

更强的那个兄弟是 step.wait_for_event。不等时间,而是等 另一个事件。函数挂起,直到一个匹配的事件到达,或直到你设的一个超时过期。这正是让 Inngest 成为 HITL(概念 15)和 agent 间协调模式最干净表达的东西:

@inngest_client.create_function(
fn_id="refund-with-approval",
trigger=inngest.TriggerEvent(event="customer/refund.requested"),
)
async def refund_with_approval(ctx: inngest.Context) -> dict[str, str]:
request = ctx.event.data
request_id = request["request_id"]

# If amount is over $100, require approval before issuing
if request["amount_cents"] >= 10_000:
# Notify a human via Slack/email/whatever
await ctx.step.run("notify-approver", notify_human_approver, request)

# Wait for an approval event. Up to 24 hours; expires otherwise.
approval = await ctx.step.wait_for_event(
"wait-for-approval",
event="refund/approval.decided",
timeout=timedelta(hours=24),
if_exp=f"async.data.request_id == '{request_id}'",
)

if approval is None or not approval.data.get("approved"):
return {"status": "rejected_or_timeout"}

# Either it was under $100, or it was approved
refund = await ctx.step.run(
"issue-stripe-refund", call_stripe_refund_api, request,
)
return {"status": "issued", "refund_id": refund["id"]}

从上到下发生了什么:

  the function reaches wait_for_event   ->  it SUSPENDS  (zero compute)
|
| a human sees the Slack note, clicks Approve in your admin UI
| the UI sends a refund/approval.decided event
v
Inngest matches that event to THIS waiting run (if_exp picks the right one)
|
v
the function RESUMES, with the event as the `approval` value
|
v
the refund step runs -> Stripe refund happens, after the human approved

唯一微妙的部分是中间那次匹配:if_exp 是让那个批准事件唤醒 这一个 请求的运行、而不是别人的那个东西。

step.sleepstep.wait_for_event 是你不必为之付钱的超时。在你的代码里函数看起来是同步的(「等三天,然后发邮件」),但运行时语义是异步且持久的。这是 Inngest 出名的两件事之一(另一件是持久化重试)。没有它,替代方案是一个队列加一个状态机加一个数据库加一个轮询器,你会写一千行而不是三行。

快速检查。 三个论断。每个标 True 或 False。(a) 如果 step.sleep 设了 30 天、你的服务在这 30 天里被重新部署五次,那么在付费套餐上这次休眠会不间断地继续。(b) 如果 step.wait_for_event 超时,函数会抛出一个异常。(c) 同一个函数里的两个 step.wait_for_event 调用可以同时等待同一个事件。

答案:(a) 在付费套餐上为 True:休眠存在 Inngest 的基础设施里,不在你服务的内存里,所以重新部署不会丢失它们。注意套餐上限:30 天的休眠在付费套餐上没问题,但超过了免费 Hobby 套餐七天的休眠上限。(b) False:超时时,wait_for_event 返回 None。你的代码检查它并决定做什么(拒绝、升级、默认批准,看策略而定)。(c) 在正常的顺序代码里为 False:一个函数撞上一个 wait_for_event,挂起,只有在第一个恢复 之后 才到达下一个,所以这两次等待按顺序进行,而一个匹配的事件恢复的是当前挂起的那一个等待。它们只有在你把它们作为并行步骤发起时才会重叠,那是一个超出本课的模式。日常的规则:一个事件恢复一个等待点。

Try with AI
Build a delayed-investigation flow with my AI coding assistant.
Specification:

1. Triggered by event 'customer/refund.failed'.
2. Immediately notify the on-call human via Slack with the refund
details and a "Investigate" button.
3. Wait for the human to click the button (which fires
'customer/refund.investigation_started') for up to 4 hours.
4. If the click arrives in time: run the agent to draft an
investigation summary.
5. If 4 hours pass without a click: escalate to a senior reviewer
by firing 'customer/refund.escalated'.

Use the dev-server MCP's send_event tool to simulate the
human-click event during testing. Use get_run_status to inspect
how the suspended function shows up in the dashboard. Before
writing, use list_docs to scan the Inngest documentation tree
for the right page on wait_for_event semantics, then
read_doc on the page you find to get the exact syntax for
the if_exp filter expression.

概念 9:重试、错误处理、死信

这是那道反射的特写。默认情况下,Inngest 重试失败的步骤。默认值是合理的:约 4 次重试加指数退避,每次尝试之间从几秒到几分钟不等。最后一次重试失败后,运行进入 failed 状态,并留在那里以供检查和(可选的)replay。你可以按函数调它:retries=10,或 retries=0 永不重试。要为 某个特定 失败跳过重试(一张被拒的卡、一个 401),从步骤内部抛出 inngest.NonRetriableError,就像下面的例子那样。

@inngest_client.create_function(
fn_id="charge-customer",
trigger=inngest.TriggerEvent(event="order/checkout.completed"),
retries=2, # transient Stripe errors (503, timeout) retry twice
)
async def charge_customer(ctx: inngest.Context) -> dict[str, str]:
try:
charge = await ctx.step.run(
"call-stripe", call_stripe_charge, ctx.event.data,
)
return {"status": "charged", "charge_id": charge["id"]}
except inngest.NonRetriableError as e:
# call_stripe_charge raises NonRetriableError on a declined card, which
# tells Inngest NOT to retry the step (a decline will not become an
# approval on attempt 2). So we land here on the FIRST failure, with no
# wasted retries, mark the order, and kick off the dunning flow.
await ctx.step.run(
"mark-failed",
lambda: mark_order_failed(ctx.event.data["order_id"], reason=str(e)),
)
await ctx.step.run(
"emit-dunning-event", emit_dunning, ctx.event.data["order_id"],
)
return {"status": "card_declined"}

三个模式要紧。

模式 1:瞬时失败 vs 永久失败。 Inngest 默认重试一切,但有些错误不是瞬时的。Stripe 的卡被拒错误重试时还会被拒。你下游 API 的 401-未授权不会因为你等一等就变成 200。你的函数应当专门捕获这些并处理它们:写你的数据库、发出一个下游事件、干净返回,免得它们把重试预算浪费在没希望的尝试上。Inngest 的 NonRetriableError 显式告诉 Inngest 为一个被抛出的异常跳过重试。

模式 2:步骤级 vs 函数级错误。 一个抛出的步骤会被重试。步骤级重试耗尽后,函数失败。有时你想让一个函数 一个失败的步骤下 存活:记录失败、把工作标为「部分完成」、继续。把 step.run 包进 try/except。步骤照样拿到它的重试;如果所有重试都失败,异常传播到你的 catch 块,在那里你可以决定做什么。

模式 3:死信与 replay。 一个彻底失败的函数不会消失;它落在仪表盘的「failed runs」视图里,带着它完整的轨迹、步骤输出和异常,旁边是一个 Replay 按钮。修好 bug,发布它,replay,不必写任何死信处理器。(Replay 是从头开始的一次全新运行,不是保留 memo 的恢复,所以让有副作用的步骤保持幂等;概念 14 会完整讲它。)

预测。 你的函数在第 2 步调用 Stripe,在第 4 步调用你的客户数据服务。Stripe 在第 2 步第一次尝试时返回 503(服务不可用,瞬时)。第 2 步带指数退避重试 4 次(约 1 秒、2 秒、5 秒、12 秒);第 4 次重试时 Stripe 回来了,扣款成功。现在第 4 步运行,数据服务带一个 500 宕了。Inngest 会重试整个函数,还是只重试第 4 步?多少次?信心 1-5。

答案:只重试第 4 步,而且它拿到自己的重试预算。 步骤不共享重试。第 2 步的四次重试和第 4 步的彼此独立。Inngest 会重试第 4 步(默认约 4 次),如果数据服务回来了,第 4 步完成,函数成功。第 2 步的 Stripe 扣款 不会 重新发起,因为第 2 步的输出在它那次成功的重试之后被记忆化了。客户恰好被扣一次款,哪怕函数在重试间花了 20 秒。

Try with AI
With my AI coding assistant: extend the customer-support worker
function from Concept 6 with explicit retry and failure handling.
Specification:

1. The OpenAI Agents SDK call should retry 3 times on transient
failures (rate limit, timeout), but NOT retry on a content-policy
refusal from the model.
2. The Slack notification should retry up to 10 times (Slack is
often flaky; don't lose the notification).
3. The Postgres write should retry once; if it fails again, log the
failure and continue (don't fail the whole function over a
transient DB blip).

For each step, decide what's transient vs permanent and structure
the try/except accordingly. Use grep_docs to find the Python SDK's
NonRetriableError equivalent.

概念 10:在 Python 中用 step.run 处理 AI 调用(step.ai.wrap 仅限 TypeScript)

概念 6-9 对任何有副作用的代码都适用:数据库写入、API 调用、文件写入、agent 调用,其中也包括你的 LLM 调用。所以这里先把 Python 里 AI 调用的标题摆出来:你继续用 ctx.step.run。Inngest 确实提供了 AI 专用的 step.ai 原语,但在 Python 里它们要么不可用、要么很小众,而去伸手抓它们正是这个概念存在以防你犯的那个常见错路。

先把重要的 Python-vs-TypeScript 注意点摆在前面。 Inngest 的 step.ai 模块有两个方法,它们的语言支持不同。step.ai.infer() 在 TypeScript 和 Python 里都可用(Python SDK v0.5+):它把推理卸载到 Inngest 的基础设施并追踪那次调用。step.ai.wrap()TypeScript 专属:今天没有 Python 对应物。对 Python 项目(比如这门课的 worker),包裹一次 OpenAI Agents SDK 调用的正确模式是 ctx.step.run(...),它已经给你被包裹步骤的输入和输出的完整持久化、重试和可观测性。你只是拿不到 TypeScript 的 step.ai.wrap 额外加上的那些 LLM 专属的提示词/响应遥测。(截至 2026 年 5 月,对照 AI Inference 文档验证。)

step.run 包裹的是 agent 运行,而不是一次裸的模型调用。 在这门课里你的 worker 是一个 OpenAI Agents SDK agent,所以是 agent 发起 LLM 和工具调用,不是你。你把整个 agent 运行 包进 ctx.step.run(...)。Inngest 不关心步骤里面是什么;你的 agent 只是你交给它的那个函数。它记录步骤的输入和 agent 的结果,在瞬时失败时重试该步骤,并在成功时记忆化它,于是后面的步骤永远不再为 agent 的成本重新付钱。

@inngest_client.create_function(
fn_id="summarize-customer-thread",
trigger=inngest.TriggerEvent(event="customer/thread.summary_requested"),
)
async def summarize_thread(ctx: inngest.Context) -> dict[str, str]:
thread = await ctx.step.run(
"load-thread", load_thread, ctx.event.data["thread_id"],
)

# The agent makes the model and tool calls internally. You wrap the whole
# AGENT RUN in step.run, so Inngest sees it as one step: it records the
# input and the agent's result, retries on a transient failure, and
# memoizes on success so later steps do not re-pay the agent's cost.
result = await ctx.step.run(
"run-agent",
lambda: run_support_agent(thread=thread),
)

return {"summary": result.summary}

仪表盘把这次运行显示为 load-thread 然后 run-agent,各带它的输入和输出。和 TypeScript 的 step.ai.wrap 相比,你唯一拿不到的,是仪表盘 AI 视图里拆出来的 LLM 专属遥测(token 数、模型名);Agents SDK 自己的追踪覆盖了那一块。

agent 运行是一步。 因为你包裹了整个 agent,它 内部 的模型和工具调用不是单独的 Inngest 步骤。如果 agent 运行中途失败、Inngest 重试 run-agent整个 agent 会从头重跑,为它在那次尝试上已经花掉的 token 重新付钱。这通常没关系:一份 agent 草稿重做起来便宜,任何有效草稿都行。当一次 agent 运行昂贵到你不想整个重做时,把工作拆成更小的片段,每个自己一个 step.run(加载和检索各自一步,然后一次更短的 agent 调用),于是一次重试只重做失败的那一片。

步骤轨迹与客户数据

因为 step.run 把每步的输入和输出记录到 Inngest 的可观测性存储,你穿过一步传递的 内容 会被存储,并在仪表盘里可见。如果你的提示词包含 PII(姓名、邮箱、地址)、密钥(API 密钥、内部令牌)、合同或财务数据,或受监管的内容(HIPAA、GDPR 范围内的数据、PCI),不要把原始内容传进步骤体。脱敏、哈希、摘要,或传一个引用(一个 customer_idticket_id,而不是完整的工单文本),然后在步骤体 内部 从你的权威存储重新加载敏感内容,那里的保留和访问控制由你配置。如果你启用了 OpenAI Agents SDK 自己的追踪,同样的纪律适用。把步骤轨迹当作任何生产日志来对待:默认有用,按策略受监管。

step.ai.infer(Python 支持,但很小众)。 你很少会去伸手抓它;step.run 是这门课里每次 AI 调用的默认。它唯一的用途:不从你的进程调用 OpenAI,而是请 Inngest 的基础设施去发起那次调用,于是你的进程可以在请求在途时释放资源。在按在途时间计费的无服务器平台上,以及对长推理(Deep Research、大批量 embedding)而言,那省真金白银;对一台常驻服务器上的亚秒级调用,它只是徒增延迟。如果你真要用它,从 AI Inference 文档按你的版本拉确切的签名;它住在实验性的 inngest.experimental.ai 命名空间里,这门课的搭建没有动它。

快速检查。 True 还是 False。(a) 在 Python 里,把你的 agent 运行包进 ctx.step.run("run-agent", run_support_agent, ...),会让它持久、在瞬时失败时重试、在成功时记忆化。(b) 在 Python 里用 OpenAI Agents SDK 搭配 Inngest,step.ai.infer 是硬性要求。(c) 把单次 OpenAI 调用的 step.run 换成 step.ai.infer,总会让函数运行起来更便宜。

答案:(a) True:这是推荐的 Python 模式。agent 运行进入步骤体;Inngest 把整个步骤当作工作单元。(b) Falsestep.run 对大多数情况就够了。step.ai.infer 是针对无服务器算力成本的一种优化,不是要求。这个工作示例里的 OpenAI Agents SDK 集成用的是普通的 step.run(c) Falsestep.ai.infer 只在 (i) 你在一个按在途时间计费的无服务器平台上 并且 (ii) 调用长到请求卸载省下的钱压过额外编排开销时才省钱。对常驻服务器上的亚秒级调用,普通的 step.run 胜出。

Try with AI
With my AI coding assistant: take a customer-support agent
invocation and produce TWO versions of the Inngest function that
calls it:

Version A: The normal pattern. Wrap the Runner.run call (the whole
agent run) in step.run: durable, retried on transient failures,
memoized, with the standard step trace.

Version B: The niche exception, for comparison. step.ai.infer can
only offload ONE model call, not a whole agent, so write a SEPARATE
small function that makes a single direct OpenAI completion via
step.ai.infer (the Python-supported primitive that hands that one
call to Inngest's infrastructure to save serverless compute cost).
This is the one place you call the model directly instead of letting
the agent do it.

For each version, explain (a) what the dashboard trace shows for a
successful run, (b) what happens when the OpenAI call hits a 429
rate limit, and (c) on which kind of deployment (always-on server
vs serverless) Version B's offload saves real money.

第 3 部分:平衡与恢复,生产规模

第 1 和第 2 部分让你的 worker 跑起来并在崩溃中存活。第 3 部分讲的是让它以真实规模运行:不让一个忙碌的 worker 压垮它周围的一切,并在大批量出问题时快速恢复。这五个概念,用大白话说:

  • 并发与限流(概念 11):限制同时发生多少次运行,以及新的多快启动,这样一阵事件洪流就不会打开一千条数据库连接,或在一秒内冲破你的 OpenAI 限速。
  • 优先级与公平性(概念 12):确保一位发 500 封邮件的客户不会把其他所有人推到队伍末尾。
  • 批处理(概念 13):把 10,000 个事件当作约 100 个分组的运行来处理,而不是 10,000 个单独的。
  • Replay 与取消(概念 14):一次糟糕的部署之后,在修好的代码上重跑那些失败的运行;或者取消你不再想让它发生的工作。
  • 人工审批关卡(概念 15):在一个高风险动作之前暂停 agent 并等一个人,比如一笔大额退款。

合起来,它们把一个能跑的 worker,变成一个你可以放心摆在付费客户面前的 worker。

概念 11:并发与限流

你的原型每分钟处理几封邮件,没问题。然后一个繁忙的早晨一下发来 1,000 封,你的 worker 试图同时运行全部 1,000 个,于是它在同一瞬间打开 1,000 个 OpenAI 调用和 1,000 条数据库连接,把两者都耗尽。这是原型和生产之间最常见的鸿沟,而修法是两个小小的限制,每个一行:

  • 并发 是有多少次运行可以 同时 执行。
  • 限流 是允许新的运行多快 启动
from datetime import timedelta

@inngest_client.create_function(
fn_id="customer-support-conversation",
trigger=inngest.TriggerEvent(event="customer/email.received"),
concurrency=[inngest.Concurrency(limit=10)],
throttle=inngest.Throttle(limit=100, period=timedelta(minutes=1)),
)
async def handle_email(ctx: inngest.Context) -> dict[str, str]:
...

concurrency=10 是说:任意时刻这些函数最多有 10 个在运行。第 11 个事件在队列里等,直到那 10 个里有一个完成。throttle=100/minute 是说:每分钟最多启动 100 次 运行。第 101 个事件要等,哪怕还有并发余量。

为什么你通常两个都想要。 并发保护你的下游系统不被 同时 太多的调用淹没(上面那个 1,000 连接的问题)。限流保护它们不被一阵 突发 淹没:如果 500 封邮件在 9:00 整落地,你不想 500 次运行在同一秒里启动,哪怕你有并发余量;限流把这些启动摊开。

那个微妙之处,也是为什么单凭一个并发上限并不总是够:并发限制有多少次运行 在途,而不是新的多快 启动。如果你的运行很快,一个空出来的槽在某个运行完成的瞬间就被填上。所以 concurrency=10 仍然可以每秒发起几百次启动,远远足以冲破一个「每分钟 30 个请求」的限制,哪怕同时只有 10 个在运行。所以把旋钮对准你要保护的那个限制:一个 计数 限制(一个 20 连接的数据库池)要 并发;一个 速率 限制(OpenAI 的每分钟 30 个)要 限流。当运行很慢时,并发作为副作用也限住了速率,你可能不需要限流;当运行很快时,只有限流能控住速率。

按键并发。 单个 concurrency 限制全局应用于函数。一个更有意思的模式是按键并发:按事件的某个属性来限。你传一个 列表 的上限,而不是一个:

concurrency=[
inngest.Concurrency(limit=10), # global cap
inngest.Concurrency(limit=2, key="event.data.customer_id"), # per-customer cap
],

这是说:全局最多 10 个函数运行,并且 每位客户同时最多 2 个。如果单个客户一分钟内发 100 封邮件,只有 2 封被同时处理;其余 98 封排在后面。与此同时,其他客户的邮件正常流动;他们不被那个话痨客户阻塞。这是两行代码里的多租户公平性。概念 12 会进一步展开这个模式。

想象 9 点突发下的整套策略:限流放慢运行启动的速度,并发上限把同时运行的数量稳住,而按客户的键让一股洪流不至于占掉每一个槽,与此同时别的一切在一个持久化队列里等待。

负载突发下的流量控制,从左到右。第 1 阶段,来自五位客户的一阵 500 封邮件突发到达,其中一位客户(玫红色)在刷屏。第 2 阶段,一个设为每分钟 100 的限流限住运行启动的速度,平滑掉尖峰。第 3 阶段,一个 10 的并发上限约束同时运行什么,而一个每客户 2 的上限意味着那位刷屏的客户在十个槽里最多占两个,其余的继续流动。第 4 阶段,其余的在一个持久化队列里等待,随着槽空出来逐渐排空,于是什么都不丢。

什么都不丢;工作排队。三个旋钮决定运行什么、它多快启动,以及谁来等。

快速检查。 三个论断,True 还是 False。(a) 如果你设 concurrency=10、1,000 个事件一下到达,其中 990 个会被丢弃。(b) 限流和并发限制两者都会降低 吞吐量。(c) 按键并发需要一个能从事件数据确定地得出的键。

答案:(a) False:事件不会被丢弃;它们排队。Inngest 的队列是持久的;那 990 个事件等到并发槽空出来。(b) False。 限流限制启动速率;并发限制在途运行。两者都不丢工作;两者都塑造工作 何时 执行。在一段长窗口上,如果你的平均负载低于限制,吞吐量不变。在一个峰值上,吞吐量被塑造:突发被队列吸收。(c) True:键表达式在事件数据上求值;它必须为同一个逻辑范围产出一个稳定的字符串(customer_id 没问题;current_timestamp 不行)。

Try with AI
With my AI coding assistant: design the concurrency and throttling
policy for the customer-support worker. Constraints:

- OpenAI rate limit: 30 requests per minute, hard cap.
- Postgres connection pool: 20 max connections (the worker takes 1 per run).
- Some customers send bursts of 30+ emails in a minute (an angry
customer); these shouldn't starve other customers.
- We expect ~1,000 emails per day, with peaks around 9am and 2pm.

Propose:
1. A global concurrency value
2. A per-customer concurrency value
3. A throttle (limit and period)

For each, explain what production failure it protects against and
what the cost is (in queue latency at peak).

概念 12:优先级与公平性,多租户扩展

并发限制管用。按键并发加上了基本的公平性。生产级的多租户系统需要更多:优先级(Enterprise 客户不该为同样的算力排在业余玩家后面)和 公平份额调度(哪怕在自己的并发上限内,单个租户也不能垄断系统)。

优先级。 Inngest 在每个事件上求一个优先级表达式;优先级更高的运行插到优先级更低的运行前面。它只是概念 11 那个 create_function 上多一个参数:

priority=inngest.Priority(
# Higher number wins (range -600..600). The producer puts the tier's
# priority on the event directly: Enterprise = 100, Pro = 0, Free = -100.
run="event.data.tier_priority",
),

当并发队列里有 50 个运行在等时,Enterprise 客户的运行先走,然后是 Pro,再是 Free。同一档位内部,按 FIFO 顺序。优先级不覆盖并发或限流限制;它只是决定 哪个 等待中的运行拿到下一个空槽。一个 Enterprise 客户仍要等一个槽空出来;他们只是拿到下一个。

公平份额调度。 当你有几百个租户争抢同一个全局并发池时,FIFO 加优先级还不够。单个发突发的租户仍能占住大多数槽好几分钟。公平份额调度通过并发上的 key 参数加上深思熟虑的容量配置来实现,给每个租户一个有保障的切片:

concurrency=[
inngest.Concurrency(limit=50), # global pool
inngest.Concurrency(limit=3, key="event.data.tenant_id"), # max 3 per tenant
],

有了这个:50 个总槽,没有租户拿超过 3 个。如果 20 个租户活跃,那就是最多请求 60 个槽但只有 50 个可用。公平份额把它们轮转过去,每个租户拿到一些份额,没人被关在门外。

预测。 你有一个客户支持函数,concurrency=10、按客户 concurrency=2。你还配了优先级:Enterprise = 高,Free = 低。在上午 9:00,队列里有:来自客户 A(Free)的 5 个事件、来自客户 B(Enterprise)的 5 个事件,以及来自单个新客户 C(Free,刚买了他们的第一个套餐)的 10 个事件。它们以什么顺序执行?信心 1-5。

答案:它分三趟解析,按这个顺序。

  1. per-customer cap (2 each)  ->  eligible pool = 2 from A, 2 from B, 2 from C   (6 runs)
2. priority sorts the pool -> B's 2 first (Enterprise), then A's 2 and C's 2 (Free, FIFO)
3. fill the 10 global slots -> all 6 fit, so 6 run now; the rest wait

随着每个运行完成,那位客户的下一个排队事件变得有资格(第 1 趟),下一个空槽给优先级最高的等待者(第 2 趟)。正是那个按客户的上限,挡住了客户 C 的十个事件占掉整个队列。

哪些能在本地验证,哪些需要 Cloud

流量控制是这门课里唯一一处「跑起来盯着看」不完全成立的地方。概念 11 和 12 的四个旋钮里,只有 并发 能在本地开发服务器上观察到:发一阵突发,你会看到只有 N 个同时运行。另外三个,你在本地配置并推理,然后在 Inngest Cloud(或一个分支部署)里确认效果:

  • 限流 是开发服务器不强制执行的速率限制,所以在本地你的运行能多快启动就多快,不管那个限制。配置是对的;速率只在 Cloud 里咬人。
  • 优先级公平份额 只在 持续的多租户争用 下才显现,也就是一个满的队列加许多个租户竞争。一小撮测试事件从不制造那种情形,所以即便配置正确,它们在本地也保持不可见。

所以对这三个,「已验证」意味着配置被接受、函数运行,而你能推理那个行为。别从一台安静的开发服务器得出「什么都没被强制执行」的结论;去 Cloud 里在负载下确认真实效果。

Try with AI
With my AI coding assistant: extend the customer-support worker
configuration with a priority and fair-share scheme. Requirements:

1. Three customer tiers: Enterprise, Pro, Free.
2. Enterprise customers should never wait more than 5 seconds at
peak load.
3. Free tier customers should get fair access: no Free customer
should be starved for more than 60 seconds, even when the
global queue is full.
4. A single noisy customer (regardless of tier) should not occupy
more than 3 slots.

Write the concurrency + priority configuration. For each line of
config, explain which requirement it satisfies.

概念 13:批处理,划算的批量处理

有些工作天生是批量的。你不会把 10,000 段客户对话各自独立地摘要;你用一批 50 段来调用 LLM。你不会把 10,000 行审计一行一行地写;你用一次批量插入来写。Inngest 的批处理触发器让你攒起事件,用这一批作为输入去调用单个函数。

@inngest_client.create_function(
fn_id="batch-embed-tickets",
trigger=inngest.TriggerEvent(event="ticket/resolved"),
batch_events=inngest.Batch(
max_size=50, # invoke when 50 events accumulated, OR
timeout=timedelta(seconds=30), # invoke when 30 seconds pass, whichever first
),
)
async def batch_embed_resolved_tickets(ctx: inngest.Context) -> dict[str, int]:
# ctx.events (plural) instead of ctx.event
ticket_ids = [e.data["ticket_id"] for e in ctx.events]

tickets = await ctx.step.run(
"load-tickets", load_tickets_by_ids, ticket_ids,
)

# One embedding call for 50 tickets, not 50 calls for 1 ticket each
embeddings = await ctx.step.run(
"embed-batch", embed_texts_batch,
[t["text"] for t in tickets],
)

await ctx.step.run(
"store-embeddings", store_embeddings_batch,
ticket_ids, embeddings,
)

return {"batched": len(ctx.events)}

变了什么:ctx.events 是一个列表,不是单个事件。函数每批运行一次,而不是每个事件一次。OpenAI embedding API 用一批 50 段文本来调用,而不是 50 次单段文本调用,这便宜得多(你按 token 付费,但每次请求的开销没了)也快得多(一次 API 往返而不是 50 次)。

批处理是正确工具,当 工作天生可批量(embedding、批量数据库写入、批量邮件)而且你能容忍最多一个超时时长的延迟才让工作发生。它是错误工具,当每个事件需要交互式响应,或者当事件之间的顺序以不可预测的方式重要时。

快速检查。 True 还是 False。(a) 批处理函数仍然拿到重试和记忆化;整批作为一个整体被持久地记忆化。(b) 如果批处理超时在只攒了 3 个事件时过期,函数会等到接下来 47 个到达才运行。(c) 你可以把 batch_eventsconcurrency 结合起来,限制多少批并行运行。

答案:(a) True:批就是那个工作单元;重试用它所有的事件仍在范围内的方式重放整批。(b) False:那正是超时的全部意义。30 秒后函数用攒到的任何东西运行,哪怕只是 1 个事件。(c) True:这是生产模式。批处理加并发一起,把你的下游负载控得很漂亮。

Try with AI
With my AI coding assistant: write a batched Inngest function that
embeds resolved support tickets, converting a per-ticket event
handler into one batched call.

Triggers: 'ticket/resolved' event, batched at 50 events or 30 seconds.

The function should:
1. Load the ticket bodies in one query
2. Call OpenAI embeddings API with a 50-text batch (faster + cheaper)
3. Store the embeddings
4. Emit a 'ticket/embedded' event per ticket for downstream consumers

Use grep_docs to find the OpenAI batch-embedding pattern.

概念 14:Replay 与批量取消,生产恢复

有时一切同时出错。你发布了一个 bug;过去六小时里有一千次运行失败。又或者你的下游 API 宕了 30 分钟;那个窗口里所有试图调用它的运行都死了。又或者你发现了一个逻辑错误,想在修好之后把一天的工作重做一遍。

首先,那个绊倒所有人的区别。 Inngest 给你 两种 方式让一个失败的步骤重跑,而它们表现不同:

  • 自动重试(在同一次运行内)。 当一步抛出时,Inngest 带退避重试函数,从头重新进入。已完成的步骤从 memo 返回, 重新执行;只有失败的那一步再次运行。这是那个保留 memo 的恢复,就是你在快速上手里看到的那个,也是让「在第 3 步花的 $0.20 不会重花」这个属性成立的那个。它是自动的,发生在原来的运行内部。
  • Replay / Rerun(仪表盘按钮,跨许多次运行)。 这用你当前部署的代码,从头开始一次全新的运行,每一步都从零重新执行(一次 rerun 拿到一个新的运行 id,重跑第一步,而不是旧运行的恢复)。所以实际上旧运行的 memo 在这里救不了你。它是为事故恢复用的,不是为跳过已完成的工作。

把这两者分清楚就是整个概念。memo 的回报住在自动重试里;Replay 是从头开始。下面两行是同样的五步在两条路径下的样子:

重试与 replay 对比,同一个五步函数在两条路径下展示。上行,同一次运行内的自动重试(run_id A1):第 1 到 3 步(load-customer、load-thread、run-agent)以零成本从 memo 返回,只有失败的 save-draft 步骤重跑,然后 notify 运行。下行,作为全新运行的 replay(run_id A2):全部五步在当前代码上重新执行,你为整个函数再次付费,所以挡住第二次真实退款的是一个幂等键,而不是 memo。memo 在一次运行内保护你;幂等键跨 rerun 保护你。

memo 在一次运行内保护你;跨 rerun 保护你的是一个幂等键,而不是 memo。

两个相反的恢复原语。 Replay 说 「这份工作失败了,我想让它在修好的代码上再跑一遍。」 批量取消说 「这份工作排了队,但我不再想让它发生。」 同一个仪表盘界面,相反的意图。大多数团队在跑真实流量的头三个月内两个都用得上。

Replay 是那个恢复原语。失败的运行连同它们完整的步骤历史、输入事件,以及失败步骤的异常一并保留。在仪表盘里你打开 Functions 视图,过滤到一个有失败运行的函数,选一个时间窗口和一个失败模式(任何特定错误消息,或干脆「所有失败」),点 Replay。Inngest 把每个调度成一次 从头开始的全新运行,跑在现在部署的任意代码上。

关于 replay 要理解三件事。

  • Replay 用你当前部署的代码。 如果你在运行失败和你 replay 它们之间部署了一个修复,被 replay 的运行用新代码。这正是重点:拿一群死在某个 bug 上的运行,发布修复,全部免手动重跑。
  • Replay 重新执行每一步;它 复用旧运行的 memo。 一次被 replay 的运行是一次新运行,所以每一步在修好的代码上从零重跑。从成本上,按每次被 replay 的运行 整个函数 的成本来规划,不是只算失败的那一步。挡住一次 replay 发出 第二次 真实世界副作用(一次重复退款、一封重复邮件)的,不是 memo,而是那个副作用上的一个 幂等键(概念 4):你从请求派生出一个稳定的键(对退款来说,类似 (order_id, request_id)),provider 把重复当作无操作。这门课里那个最小化的 worker 为了简短省掉了那个键,它的退款按客户匹配并无条件写入,所以一个生产版本会在任何真钱流动之前加上一个。
  • Replay 是显式选择的。 失败的运行待在仪表盘里直到你对它们动手。它们不会永远重试;它们不会消失。它们等你。

批量取消 是反面。有时你有成千上万个 排队的休眠中的 运行,你不再想要它们:一个营销活动被取消了,一位客户流失了你不再想给他们发跟进邮件,一个特性被回滚了。在仪表盘里你选一个函数和一个时间窗口或事件过滤器,点 Cancel。匹配的运行干净地终止:它们的 step.sleepstep.wait_for_event 调用不再恢复,排队的运行不再启动,在途的运行检查取消标志并在下一个步骤边界退出。取消尊重步骤边界;一个在途的 step.run 在终止前会先把它当前所在的那一步做完,所以你不会得到半完成的 Stripe 扣款或撕裂的数据库写入。

Replay vs 取消,作为一个决定。 当一群运行出了岔子时,问一个问题:我想让这份工作成功,还是想让它不发生? 如果工作应当成功(bug 修复恢复),replay。如果工作不应发生(取消的活动、流失的客户、回滚的特性),取消。如果你不确定(比如失败的运行里既有你想恢复的,也有本就不该触发的),把你的仪表盘查询过滤得更窄,让每个子集得到正确的处理。

这在实践中带来三个模式:

  • 「我们发布了一个 bug」的恢复。 找到那次糟糕部署时间窗口里的失败运行,修好 bug,发布修复,replay 那些失败。客户体验:他们的邮件一小时没得到回复,但最终得到了,而你不必写任何恢复代码。
  • 「活动取消」的回滚。 一个欢迎系列在 14 天里发三封跟进邮件;客户在第 4 天流失。你不想发第 7 天和第 14 天的跟进。批量取消匹配的 wait-for-eventsleep 运行。
  • 「schema 迁移」的 replay。 你改了 agent 格式化摘要的方式;你想用新格式重新摘要昨天的工单。找到那些运行(成功或不成功)并 replay 它们;因为一次 replay 是从头开始的全新运行,agent 在新代码上重跑每一步,这正是你这里想要的。让你有副作用的步骤保持幂等,这样重跑它们不会重复扣款或重复发送。

开发服务器 MCP 让恢复不必离开你的通用 agent 就能进行。开发期间你可以让 AI 用 get_run_status 检查一次失败的运行,然后通过在修好的代码上重新触发事件来恢复工作(给它一个 新的 事件 id,因为用 相同的 id 重新触发会被概念 4 的幂等语义去重成一个无操作)。仪表盘的 Rerun 按钮是等效的一键路径。无论哪种方式,你拿到的都是当前代码上的一次全新运行,不是保留 memo 的恢复。

快速检查。 True 还是 False。(a) 一次仪表盘 Replay 在 部署的代码上重跑工作。(b) 一次仪表盘 Replay 从 memo 返回原运行成功的步骤,只重跑失败的那个。(c) 一次失败运行 内部 的自动重试从 memo 返回已完成的步骤,只重跑失败的步骤。(d) 批量取消一个在途的函数,会在步骤中途中止当前正在执行的 step.run 以更快终止。

答案:(a) True:一次 replay 是在现在部署的任意代码上、从头开始的全新运行,这就是它为什么是 bug 修复恢复的工具。(b) False:这是那个陷阱。一次 replay 是一次从头重新执行每一步的新运行,所以旧运行的 memo 不会带过来。挡住一次被 replay 的副作用第二次触发的,是幂等键,不是 memo。(c) True:这是那条保留 memo 的路径,也是你在快速上手里看到的那个。已完成的步骤停在一次尝试,而失败的步骤重试。(d) False:取消尊重步骤边界;当前的 step.run 在运行终止前先完成(或失败)。这防止撕裂的写入。

Try with AI
Walk through a recovery scenario with my AI coding assistant:

Yesterday at 14:00 we deployed a change to the worker's agent step.
A bug in the new code made the agent step throw on every run.
From 14:00 to 18:00, 47 customer-support runs failed at that step.

At 18:30 we noticed, fixed the bug, and re-deployed.

Use the dev-server MCP's grep_docs to find Inngest's replay docs,
then:

1. Outline the exact dashboard steps to identify the 47 failed runs.
2. Explain what a dashboard Replay does for one of those runs: is it
a fresh run from the top on the fixed code, or a resume that
reuses the old run's memo? What does that mean for the cost of
replaying all 47?
3. Confirm whether the customers will see one reply or several if a
replayed run re-sends the email, and name the mechanism that
keeps it to one (hint: it is not memo).
4. Identify ONE scenario in this story where you'd prefer to
bulk-cancel instead of replay, and explain why.

概念 15:用 step.wait_for_event 做 HITL 关卡,运行时里的不变量 1

有些动作太重要,不能让 agent 自己拿。发起一笔 $500 的退款、发出一份法律通知、关闭一个账户:你想让 agent 调查并 提议 那个动作,但要一个人在它真正发生之前批准它。那次为人停下的暂停就是一道 审批关卡,而它是整个系统里唯一一处 worker 停下来等某个人的地方。(用 Agent Factory 的话说这是 不变量 1,人是主体:在一个高风险决定上,运行的是那个人的判断,不是 agent 的。)

Inngest 的 step.wait_for_event(概念 8)让这件事变干净。agent 一直运行到决策点,然后挂起并等一个批准事件。一个人审阅它(在 Slack、一个管理界面,或邮件里)并点批准或拒绝;那次点击触发事件,函数带着裁决醒来,并行动。你的代码控制的是 允许 agent 做什么,不是它如何推理。

@inngest_client.create_function(
fn_id="refund-with-hitl-gate",
trigger=inngest.TriggerEvent(event="customer/refund.investigated"),
concurrency=[inngest.Concurrency(limit=5)],
)
async def refund_with_gate(ctx: inngest.Context) -> dict[str, str]:
request_id = ctx.event.data["request_id"]
amount_cents = ctx.event.data["amount_cents"]

# Step 1: the agent's analysis (your worker, run durably).
# Keyword-arg calls are wrapped in a lambda; step.run forwards only positional args.
analysis = await ctx.step.run(
"agent-investigates",
lambda: run_refund_investigation_agent(request_id=request_id),
)

# Step 2: if the agent thinks refund is warranted AND amount > $100,
# gate behind human approval
needs_approval = analysis.recommends_refund and amount_cents >= 10_000

if needs_approval:
await ctx.step.run(
"notify-approver",
lambda: send_slack_approval_request(
request_id=request_id,
analysis=analysis,
amount_cents=amount_cents,
),
)

# === THE HITL GATE ===
approval = await ctx.step.wait_for_event(
"wait-for-human-approval",
event="refund/approval.decided",
timeout=timedelta(hours=24),
if_exp=f"async.data.request_id == '{request_id}'",
)

if approval is None:
# Timeout: no human responded in 24h. Escalate.
await ctx.step.run(
"escalate-timeout",
lambda: escalate_to_senior_reviewer(request_id=request_id),
)
return {"status": "escalated_timeout"}

if not approval.data["approved"]:
await ctx.step.run(
"notify-rejected",
lambda: notify_customer_rejected(request_id=request_id),
)
return {"status": "rejected_by_human"}

# Either it was approved, or it didn't need approval
refund = await ctx.step.run(
"issue-refund",
lambda: call_stripe_refund(request_id=request_id, amount_cents=amount_cents),
)

await ctx.step.run(
"audit-approved-refund",
lambda: audit_refund(
request_id=request_id,
refund=refund,
approved_by="human" if needs_approval else "auto",
),
)

return {"status": "issued", "refund_id": refund["id"]}

代码里你看到的:一串步骤,中间有一个 wait_for_event。运行时发生的:

  • agent 运行(第 1 步,持久地)。
  • 函数决定关卡是否适用(代码内逻辑,没有副作用)。
  • 如果设关卡:一个 Slack 通知触发(第 2 步,持久)。函数挂起最多 24 小时。
  • 一个人在 Slack 里点批准或拒绝。管理后端用 refund/approval.decided 和那个 request_id 调用 inngest_client.send
  • Inngest 把事件匹配到那个挂起的函数(if_exp 过滤器确保只有匹配的 request ID 才匹配)。函数在下一行恢复。
  • 函数用那个人的决定要么发起退款,要么通知拒绝。两条路径都审计这个决定和审批人。

这正是让 Inngest 与「队列加状态机」在本质上不同的东西。HITL 模式是一个原语。函数的代码从上读到下,关卡内联其中。没有回调,没有状态恢复,没有 if state == waiting_for_approval: ... 的分派。运行时处理挂起/恢复的机制;你的代码表达 策略

运行时的人工审批关卡,分三个阶段。第 1 阶段:agent 运行、调查、起草动作、请求批准,然后暂停。第 2 阶段:函数在 step.wait_for_event 上挂起,以零算力等任意长的时间,与此同时一位人类审阅者在 Slack 或一个管理界面里读它并点批准或拒绝。第 3 阶段:一个批准事件恢复函数,它分支:批准发起退款,拒绝记录一笔被阻止的退款,超时也记录一笔被阻止的退款。每条分支都往 audit_log 写一行。人是主体:agent 提议,一个人决定。

agent 提议,一个人决定,而那次等待什么都不花。

后面一门课会从架构上展开不变量 1:被授权的意图、spec 驱动的工作流,以及那一层「worker 的管理者」,它决定哪些关卡适用于哪些动作。这门课给你那个 运行时原语。当那一层管理者到来时,它实现的关卡正是这个 wait_for_event 模式,只是在机队规模上组合起来。现在懂了这个原语,意味着后面那个架构模式读起来像「一次合理的组合」,而不是「魔法」。

这是你在第 4 部分的 Decision 5 里搭的那块拱顶石:退款审批,做成持久的。这里的概念是形状;那个工作示例把它接到一个真实的 needs_approval 工具上,并证明退款恰好触发一次。

预测。 你有一道设了 timeout=timedelta(hours=24) 的 HITL 关卡。一位客户的退款请求在周五 17:00 进来。整个周末没有人在线。关卡的超时在周六 17:00 触发。你的超时处理器记录一笔被阻止的退款。审阅者在周一 9:00 读到这个请求。把时间线走一遍:周末里有多少次函数运行是活跃的?Inngest 收了多少算力的费?信心 1-5。

答案:周末里 零次活跃的函数运行。函数 挂起 了:Inngest 存下它的状态,把函数换出内存,等事件或超时。Inngest 不为挂起的时间计费。当周六 17:00 到来、超时触发时,函数恢复几百毫秒去写那行被阻止退款的审计,然后完成。审阅者直到周一才看,对 worker 这一侧不花一分钱。Inngest 上 HITL 工作流的经济性,和那些为每一秒「批准了没?」的轮询都向你收费的基于轮询的队列截然不同。

Try with AI
With my AI coding assistant: design a durable refund-approval gate.
Specification:

1. The agent investigates and decides a refund is warranted, but the
refund tool needs human approval before it runs.
2. The gate should:
- Notify the on-call reviewer with the agent's recommendation
- Wait up to 4 hours for the reviewer to approve or reject
- On approve: issue the refund.
- On reject: do not issue; record a blocked refund.
- On 4-hour timeout: do not issue; record a blocked refund.
3. Every branch (approve/reject/timeout) writes an audit row from a
small fixed set of action names, capturing what was decided.

Use the dev-server MCP's send_event to simulate each branch of
the reviewer's decision during testing.

第 4 部分:工作示例,一个客户支持 AI Worker

这是这门课的脊柱:你真正动手搭建的地方。在这之前的一切都是模型和参考。从这里起你组装出那个真实的 worker。先是 worker(一条提示词),然后是围绕它的神经系统,每条提示词一层。每一层都点名它所依据的概念,所以如果一层引出一个「为什么」,第 1 至 3 部分里的那个概念就是要翻开的那一页。你用简短、平实的英语提示词指挥你的通用 agent,由它来写代码。下面展示的代码片段是每一层的那几行承重的代码,不是文件本身。完整的实现都针对一台真实的开发服务器和一个真实模型做了端到端运行,所以你看到的形状就是会运行的东西。如果一个签名看起来不熟悉,你的 agent 会查当前的文档。

你即将搭建的完整流程,一封邮件从头到尾:

  a customer emails
|
v
the INNGEST ENGINE catches the event and drives your worker,
one step at a time, storing each result as it goes:

1. audit: "message received"
2. load the customer from Neon
3. YOUR AGENT drafts a reply (the thinking part; D1 makes it durable)
4. is it a refund? PAUSE for a human (waits hours, survives crashes; D5)
5. on approve: issue the refund; on reject: record it
6. audit: "reply sent"

if a step crashes, the engine re-runs only that step, never the
finished ones (D6). the same worker also wakes on a daily cron
and runs under flow-control caps (D3, D4).

还是开篇那张两个程序的图,引擎驱动你的 agent,现在是那个真实的 worker。你一次搭一层:

形状:七条提示词,在你已经配好的底座上。

  • D0 搭出 worker 本身,独立运行。
  • D1 让 agent 运行持久化。
  • D2 让一个事件唤醒它。
  • D3 加一个会扇出的每日 cron。
  • D4 加流量控制。
  • D5 是拱顶石:退款上一道持久化的人工审批关卡。
  • D6 证明 worker 在一个坏掉的步骤中存活:重试而不重做已完成的工作,然后恢复。

第 4 部分一次搭一层地建出神经系统。D0(左)是 agent,只搭一次:它思考和行动,带 OpenAI Agents SDK、两个工具、Neon Postgres 和一份审计轨迹;此后它从不改变。然后从外面加上六层:D1 反射(让它持久,把运行包进 step.run),D2 感知(在客户邮件事件上醒来),D3 感知(一个按客户扇出的每日 cron),D4 平衡(流量控制:并发与限流),D5 关卡(拱顶石,一道持久化的审批关卡,心智在这里重新进入),以及 D6 证明(弄坏一步并恢复)。感知唤醒它,反射让它保持正确,平衡让它保持健康,而关卡让一个人来决定。

agent 在 D0 之后从不改变;每一层都是神经系统,从外面加上去。

开始之前。 你的环境从快速上手起就已经配好了:打开那同一个 ai-agent-nervous-system 文件夹,Inngest 和 neon-postgres Skill 已安装,你的 OPENAI_API_KEY 和你的 Neon DATABASE_URL.env 里,你的 customersaudit_log 表已开通,三个 MCP 服务器(Neon、Context7、inngest-dev)都接好了。只有两条提醒:

  • 开发服务器在运行。 如果你关了它就再启动一次:在它自己的终端里 npx inngest-cli@latest dev。仪表盘在 http://127.0.0.1:8288。(等你以后部署到 Inngest Cloud 时,免费的 Hobby 套餐是 $0、无需信用卡;它的上限在第 5 部分。)
  • 下面那些 MCP 调用的一条大小写注意点。 开发服务器的工具名是 snake_casesend_eventget_run_statusinvoke_function),但它们的 参数是 camelCaseget_run_status 接受 runIdinvoke_function 接受 functionId)。Python SDK 全程 snake_case;只有 MCP 调用的参数是 camelCase

任务简介

你搭一个小型 客户支持 worker 并给它一套神经系统。这个 worker 从 Neon customers 表(id、email、tier)读它的样例客户,给一封进来的邮件起草一份温暖的回复,只在有人工审批时才能发起退款,并为每个动作往 Neon audit_log 表写一行审计,动作名取自它所选的一个小的固定集合(一个封闭集合,于是一个错字变成一个响亮的错误,而不是一行悄无声息的坏数据)。然后那七条提示词在它周围加上 Inngest:一个事件唤醒它,agent 调用持久地运行,一个每日 cron 给每位符合条件的客户扇出一次健康检查,流量控制限住并发与限流,退款在一道持久化的人工关卡上暂停,一条 replay 路径恢复失败的运行。

关于下面这些提示词的一条说明。 每一条都按你真会对一个通用 agent 说的方式写:简短、平实,信任它去处理细节。它们冷粘进去就能用,而如果你先让 agent 定位(「read the project and tell me what you see, then ask me anything unclear before you start」),随着文件堆起来会更好。提示词是目的地;先定位是上匝道。


D0:搭出 worker,独立运行

你在哪里:底座已打开,开发服务器在运行,你的 Neon 存储已开通,但还没有 worker 存在。这个 Decision 搭出那个独立的 worker;到结束时它在一封样例邮件上运行,并往 Neon 写一行审计。

底座已经附带了一个你的 agent 在打开时读过的 AGENTS.md,所以它了解这个项目。这就是为什么这些提示词保持简短。其中唯一值得你自己也记在脑子里的那条规则,是整门课的架构不变量:worker 自己的代码从不从 inngest 导入。 agent 和它的工具保持纯 Python;神经系统从外面包裹它们。那道分隔,agent 和神经系统彼此分开,正是让你以后能把 Inngest 换成 Temporal 或 Restate 而 worker 原封不动的东西。

你的 Neon 记录系统从快速上手起就已经开通了:customersaudit_log 表存在,DATABASE_URL 在你的 .env 里。所以 worker 从一开始就读写那个数据库。现在搭 worker。粘贴这段:

Build me a minimal customer-support agent with the OpenAI Agents SDK, running in a local sandbox. It reads the sample customers from my Neon customers table (each row has an id, email, and tier), drafts a warm reply to an incoming customer email, and can issue a refund, but the refund tool needs human approval before it runs. When an email reports a duplicate charge, an overcharge, or a failed order, the agent must actually call the refund tool, not just promise a refund in prose. Write an audit row into my Neon audit_log table for every action, using a small fixed set of action names and the DATABASE_URL in .env. Seed the customers table with five sample rows first if it is empty. Keep it small; it exists to be wrapped, not shipped. Then run it on a sample email and show me the reply.

worker 通过 DATABASE_URL 触达 Postgres,从不通过 Neon MCP(那只是你搭建期用的工具)。agent 写出来的代码里有一行对这门课其余部分是承重的,那个退款工具的装饰器:

@function_tool(needs_approval=True)
def issue_refund(order_id: str, amount_cents: int, reason: str) -> str:
...

needs_approval=True 让 agent 暂停 而不是发起退款:运行带着待定的退款返回,等一个人来决定。它是 D5 那块拱顶石挂上去的那个钩子。(这个底座给每笔退款都设关卡,好让拱顶石保持简单;在生产里你只会给超过某个阈值的设关卡,也就是概念 15 那个超过 $100 的模式。接线一样。)有一件事要保持拆分,因为 D5 依赖它:把 agent 和它的沙箱运行配置作为分开的两块来搭,这样 D5 能在恢复时重建 agent 并重新供给沙箱。

完成的标志: agent 在一封样例邮件上运行并打印一份简短回复,Neon audit_log 表里有一行新行(在控制台里查它,或让你的 agent 通过 Neon 工具把它读回来)。如果邮件描述的是一笔退款,运行会在退款工具处暂停而不发起它;那次暂停就是整个重点,而 D5 让它持久。

这里你通用 agent 的模型很重要

这一部分里的提示词假定一个前沿级别的通用 agent(Claude Sonnet 或 Opus、一个 GPT-5 级别的模型,或 Gemini 2.5 Pro)。你在学的 Inngest 架构(事件、步骤、记忆化、流量控制)是 SDK 级别的,不管什么模型驱动你的 agent 都成立。但 搭建体验 靠的是强大的指令遵循能力,尤其是 D5 那块拱顶石。在更弱的模型上,预期要把一条提示词反复迭代不止一次,并把文件名写明白。架构没坏;只是提示需要更多脚手架。


D1:让 agent 运行持久化

你在哪里:一个只在你调用它时运行、运行中途崩溃就丢掉一切的 worker。这个 Decision 把 agent 调用包进 step.run;到结束时一次完成的运行在仪表盘里显示 agent 步骤被记忆化了。

神经系统从这里开始:把整个 agent 调用包进单个 step.run,让它持久且被记忆化。粘贴这段:

Wrap the agent run in an Inngest durable function so it survives crashes and retries transient failures. The whole agent call goes inside a single step.run so it is memoized. Run it in local dev mode against the Inngest dev server, with a FastAPI host. Confirm a completed run shows the agent step memoized in the dashboard.

agent 调用是昂贵的那部分(模型 token、好几秒)。在 step.run 内部它的结果被记忆化,于是当后面一步失败、运行重试时,agent 不会再次运行。那就是「每次重试都重付钱、重行动的 worker」和「每件昂贵的事只做一次的 worker」之间的区别。让 agent 以一次普通的(非流式)运行被调用;D5 的持久化恢复建立在它之上。

它作为两个进程运行:FastAPI host,以及指向它的 Inngest 开发服务器。你的 agent 把两者都启动。

完成的标志: 仪表盘列出了这个函数,一次完成的运行显示 agent 步骤。(你在 D2 里用一个真实事件唤醒它;现在,可被发现就够了。)


D2:在一个事件上触发它

你在哪里:持久化函数存在了,但你还是手动触发它,而且什么都没被记录。这个 Decision 在一个真实事件上唤醒它,并在 agent 两侧各写一行审计。

这是开篇那张图第一次真正运行起来。不再是你调用 worker,而是一个 customer/email.received 事件 到达,引擎 接住它,引擎调用你的 worker 来运行。你也开始记录发生了什么:agent 之前一行审计,agent 之后一行审计。粘贴这段:

Make the worker wake on a customer/email.received event instead of being run by hand. Add an ingress audit step before the agent and a reply audit step after it. Send a test event and show me the run completing with both audit rows.

要在本地测试它,自己用开发服务器 MCP 的 send_event 发送那个事件(一个携带邮件文本和客户 id 的 customer/email.received 事件),不需要 webhook。在生产里你会改为把你的邮件服务商指向一个 Inngest webhook URL,那是一个仪表盘设置,不是代码。

完成的标志: 一个测试事件驱动一次运行,它带着三个按顺序的步骤(审计、agent、审计)完成,Neon audit_log 表里有两行新行,agent 之前一行、agent 之后一行。

为什么是两步,而不是一步。 每次审计写入都是它自己的 step.run,所以每个各自被记忆化。如果回复步骤失败、运行重试,入口那行不会被写两次,agent 也不会运行两次,于是审计轨迹在重试间保持恰好一次(D6 会证明的那个属性)。


D3:一个会扇出的每日 cron

你在哪里:一个世界一次唤醒它一封邮件的 worker。这个 Decision 加一个每日 cron,给每位符合条件的客户扇出一个事件;到结束时每位客户拿到自己的一次持久化子运行。

加上计划内的工作:一个每日 cron,给每位 Pro 和 Enterprise 客户触发一个健康检查事件,每个事件触发它自己的一次持久化运行。粘贴这段:

Add a daily cron that fans out one customer/health_check.requested event per Pro and Enterprise customer, each one idempotency-keyed so a re-delivered cron run never double-fires. Each child event triggers its own durable run that writes one audit row. Invoke the cron manually and show me one child run per eligible customer.

两件事撑起这个 Decision。扇出在一步内部进行(step.send_event,不是一次裸的客户端发送),所以 cron 的一次重试不会重新发出重复。而每个事件拿到一个从客户和 cron tick 派生的幂等 id(类似 health-{customer}-{cron_run}):如果同一个 tick 被投递两次(一次重新部署、一次重试),重复的那个被丢掉,于是每位客户那天恰好拿到一次检查。从你的 agent 用 MCP 的 invoke_function 调用这个 cron(别等到 09:00)。一个开发期怪癖:开发服务器只在它运行时才触发 cron;生产在 Inngest 始终运行的基础设施上运行它们。

完成的标志: 父函数在几秒内完成,仪表盘显示每位符合条件的客户一次子运行,标准档位的客户被正确跳过。

为什么是扇出,而不是一个循环。 父函数自己不处理那些客户;它发送 N 个事件并返回。每个子是它自己的一次运行,隔离、可独立重试、由它自己的并发限住。一个函数内部的循环会把它们耦合起来:一位慢客户拖住其余的,而一次崩溃丢掉整批。扇出是一次计划内的唤醒变成 N 次独立持久化运行的方式。


D4:流量控制

先退一步看:到现在你已经组装出一个 worker,被三种方式触达,全都共享一个 Neon 存储。这就是 D4 要给它装上限的东西。

              INNGEST ENGINE   (routes events, runs functions, stores steps)
|
┌──────────────┼────────────────┐
v v v
an email a daily cron one run per customer
arrives fans out a the cron emitted
(D2: the check per (D3: each isolated,
email worker) customer (D3) retryable on its own)
└────────── all run in YOUR host ───────────┘
|
Neon Postgres (customers + audit_log)

每条路径内部是同一个 agent;只是世界如何触达它不同。现在你在负载下让所有这些保持健康。

你在哪里:一个能处理每封邮件、但在突发下会一下把它们全部触发的 worker。这个 Decision 加三条流量控制策略;到结束时一阵二十个事件的突发在上限下排队,没有丢失或重复的行。

当五百封邮件在 9 点落地时,worker 不该一下发起五百次模型调用:那会冲破限速,并让排在那位刷屏客户后面的每个人挨饿。加一个全局并发上限、一个按客户上限,以及一个限流。粘贴这段:

Add flow control to the email handler: a global concurrency cap, a per-customer concurrency key so one noisy customer can't starve the rest, and a throttle to protect the OpenAI rate limit. Fire a burst of twenty events across five customers and show me they queue under the cap and all complete with no dropped or duplicated audit rows.

三个旋钮做三件事:一个全局并发上限(同时执行多少次运行)、一个按客户并发键(让一个吵闹的账户最多占一两个槽,从不让其余的挨饿),以及一个限流(每分钟 启动 多少次运行)。把限流对准你真实的下游限制:任务简介里 OpenAI 的上限约为每分钟 30,所以用 30,不是一个泛泛的 100。(一个函数最多带两条并发策略;全局加按键这一对是常见的形状。)

并发上限保护两道天花板:模型的限速和你的 Neon 连接预算。一份你的 worker 的单个运行副本已经把它自己的数据库连接限住了,因为副本里的每次运行共享一个连接池。并发上限是让你 同时 运行 好几份 副本时总数保持理智的东西:十份副本每份上限 10,约等于 100 条连接,你按 Neon 的预算来定它的大小。连接池约束一份副本;上限约束整支机队。

从你的 agent 发出那阵突发:用 send_event 跨五位客户发二十个 customer/email.received 事件。

完成的标志: 突发在上限下排队(运行计数保持在全局上限或以下、在按客户上限或以下),每次运行都完成,审计轨迹每个事件恰好一行进、一行出,没有丢失的运行、没有重复、没有 Neon 连接错误。

为什么这些是策略,而不是代码。 这些没有一样住在你的函数体里;它是运行时强制执行的配置。没有这些上限,一阵突发要么熔了一个下游系统,要么让一个租户垄断 worker。手写同样的公平性是一个队列加一个调度器加一个限速器,几百行。在这里它是三个装饰器参数。


D5:退款上一道持久化的人工审批关卡(拱顶石)

你在哪里:早在 D0,你的 agent 就已经在退款前暂停,但那次暂停只活在内存里。这个 Decision 让它在一次崩溃、一次部署,或一位拖了几小时的审阅者面前存活,于是当他们终于批准时退款仍然恰好触发一次。

先把整个想法摆在任何代码之前。你的 agent 判定一笔退款是合理的,但它不能在一个人说「行」之前发起它。D0 的暂停只把那个决定保存在运行中的进程里,所以一次崩溃或一位慢审阅者就把它丢了。D5 把那次暂停变成一次 持久的等待:函数进入休眠(什么都不花),只在决定到达时才醒来。

  the agent decides a refund is warranted
|
v
it PAUSES and asks a human (it does NOT issue the refund yet)
|
v
the function SLEEPS, waiting for the decision
(minutes or hours; free while it waits; survives a crash,
a deploy, a reviewer who goes to lunch)
|
v
a human clicks Approve or Reject -> sends the decision event
|
v
the function WAKES and finishes:
approved -> issue the refund (exactly once)
rejected -> no refund; record it
no answer in 4h -> no refund; record a timeout

粘贴这段:

Right now the agent pauses before a refund, but that pause is lost if the worker crashes or the reviewer takes hours. Make the pause survive that: when the agent stops for approval, save where it stopped, then wait up to four hours for a human's approve-or-reject for this customer. When the decision comes in, pick up exactly where the agent left off and finish, so the refund happens at most once per run. On a rejection, the reply to the customer must say the refund was declined, never that it was issued. Then prove it for me: drive a refund, show the run waiting, send an approval, and show exactly one refund row. Do it again with a rejection and show a blocked row and no refund.

那整幅图就是一行代码。函数在 wait_for_event 处停下,在决定事件出现时再次启动:

decision = await ctx.step.wait_for_event(
"await-refund-approval",
event="refund/approval.decided", # what we are waiting for
timeout=datetime.timedelta(hours=4), # give up after 4 hours
if_exp=f"async.data.customer_id == '{customer_id}'", # only THIS customer's decision
)

# no decision came in 4 hours -> write a blocked-refund row and stop
# approved or rejected -> pick the agent back up and finish

那一个调用就是整道关卡。你不写队列、不写轮询循环、不写要手动检查的「批准了没?」标志。运行时替你持有那次暂停。你的代码只说要等什么、拿到答案后做什么。不过有三件事容易搞错,每一件都会悄无声息地弄坏这道关卡:

  • if_exp 把决定关联到 这一位 客户,于是给一位客户的批准永不恢复另一位客户的运行。这里 customer_id 管用,因为这个演示里每位客户最多有一笔待定退款;如果一位客户曾经可能同时有两笔退款在途,就改成关联一个唯一的 request_id(概念 8 和 15 用的那个键)或运行 id,否则一个批准可能恢复了错误的运行。
  • 当 agent 恢复时,把你保存的状态交还给它,而不是一段全新的对话。 如果你忘了会出什么岔子:一段全新的对话不记得它已经请求过批准,于是恢复后的 agent 又撞上退款、又请求批准,永远循环下去。重建 agent 并重新供给它的运行配置,然后只喂给它保存的状态。(这就是为什么 D0 把 agent 的构建和它的运行配置拆开放;这是那一个漏掉就让恢复失败的细节。)
  • 保存状态会悄无声息地丢掉你的自定义上下文,所以要手动把它放回去。 这是那个不报错却失败的陷阱。当 Agents SDK 序列化那个暂停的运行时,它 带过来一个自定义运行上下文(你的退款工具从中读取客户 id 和幂等键的那个对象);它保存一个空的,而且只发一条警告。所以恢复时你必须自己重新供给那个上下文,用 RunState.from_string(agent, saved_state, context_override=your_context)。漏了它,被批准的退款工具就在没有上下文的情况下运行:它悄无声息地没写退款行,而运行却仍然报告成功。你看到的是「批准了,但没有 refund_issued 行」,却没有任何东西能解释它。(在 openai-agents 0.17.x 上验证;确切的序列化规则是那种会在小版本之间变动的 beta 细节,所以搭建时去对照当前的 Agents SDK run-state 文档 确认。)

从你的 agent 来驱动它:发送一个描述退款的 customer/email.received 事件,看运行在关卡处挂起(仪表盘显示它 WAITING 在零算力),然后 send_event 一个携带那位客户 {"approved": true, ...}refund/approval.decided。再用 {"approved": false} 做一遍。

完成的标志: 批准时,挂起的运行恢复,Neon audit_log 表里恰好有 refund_issued。拒绝时,运行恢复,审计里有一行 refund_blocked没有 refund_issued,agent 的回复解释了这次拒绝。

这道关卡给你的是 单次运行内 的恰好一次,而那道边界值得说清。如果 同一笔 退款被驱动穿过 两次 运行(一个重发的事件、一次手动 replay),这里没有任何东西单凭自身挡住第二次退款;那是概念 4 那个稳定幂等键(或 provider 自己的键)的活儿,以请求为键,正如那里那个退款例子所示。这个最小化的 worker 为了保持小巧把那个键省掉了,所以对着一次运行证明「恰好一次」,并在一笔真实退款可能被驱动两次的那一刻去拿概念 4 那个键。

为什么这是拱顶石。 每个别的层(感知、反射、平衡)都各自让 worker 保持正确或健康。这一个,是人的心智在一个高风险动作上重新进入循环的地方,持久地,要多久就多久。


D6:证明持久化在一个坏掉的步骤中存活

你在哪里:一个每一层都包裹好了的完整 worker。这个 Decision 证明那个为所有这一切辩护的属性;到结束时你已经看着一个坏掉的运行把它失败的步骤重试许多次,而它已完成的审计步骤恰好运行一次,然后在一次全新的运行上恢复了那份工作。

最后要证明的属性,就是为所有这一切辩护的那个,概念 7 的记忆化机制。你在那里理解了它;现在在你自己的 worker 里证明它。粘贴这段:

Deliberately break the agent step so it fails, fire an event, and show me Inngest retrying it while the earlier audit step stays memoized, so the failing run writes its ingress audit row exactly once across all the agent retries. Then fix the step and recover the work, and show me the recovery completing.

故意弄坏 agent 步骤,发几个 customer/email.received 事件,读每次运行的轨迹。证据在每个 失败的 运行里面:入口审计步骤显示 一次 已完成的尝试(它的行写了一次),而 agent 步骤显示 若干次 尝试,它带退避重试然后失败,回复步骤从不运行。审计步骤停在一次尝试、而 agent 步骤一路爬升,这就是概念 7 的记忆化,现在在你自己的 worker 里:失败的运行把它的入口行写一次,无论 agent 重试多少次。

然后还原那个破坏,并通过在修好的代码上重新触发事件来恢复工作(或者,对一次真实的糟糕部署批次,用仪表盘的 Rerun 按钮;两者都从头开始一次全新的运行,见概念 14)。这里有个让人意外、但它是对的、不是 bug 的部分:恢复是一次 全新的运行,所以它写它 自己 的入口行。一次「弄坏再恢复」之后,那位客户名正言顺地有了 行入口,一行来自失败的运行,一行来自恢复。记忆化是一个运行内的保证;它从不跨越两个独立的运行。

完成的标志: 在失败运行的轨迹里,入口步骤停在 一次尝试 并写了一行,而 agent 步骤累积了若干次尝试并失败(那个「N 次重试却只有一次尝试」就是 记忆化),然后恢复运行在修好的代码上完成。诊断是按运行的,不是按客户的:打开 单个 运行的轨迹,确认入口步骤显示一次尝试。跨两个独立运行有两行入口是对的;入口步骤 在一次运行内 运行两次才是那个 bug(通常是一个不唯一的步骤名)。

为什么这是那条分界线。 一个在糟糕部署中丢掉客户工作的 worker,只是一个你调用的 agent。一个接住同一个糟糕部署、响亮地失败、重试坏掉的步骤而不重做它已经完成的工作、并在修复之后在一次全新运行上干净恢复的 worker,是一个 AI Worker。

做过 Digital FTE 课程?

把这同一套神经系统指向你自己的 SandboxAgent worker,而不是这个最小化的底座;包裹方式完全一样。而这个 step.wait_for_event 审批替换掉那门课可选 Decision 10 里那张手写的运行状态表:你刚搭的这道持久化关卡 就是 那个持久化层,所以你可以删掉那张表。


刚刚发生了什么

你搭了一个小型客户支持 worker,并一次一层地给它装上了一套神经系统。worker 的内部在 D0 之后从不改变:同样的 SandboxAgent、同样的两个工具、同样的 Neon Postgres 审计轨迹。改变的是它 周围 的一切。它现在在一个 customer/email.received 事件上以及一个按符合条件的客户扇出的每日 cron 上醒来,持久地运行(agent 调用在 step.run 里),遵守流量控制(全局与按客户并发、一个限流),把退款挡在一道持久化的人工审批后面(step.wait_for_event),并通过 replay 失败的运行从一次糟糕部署中恢复,而审计轨迹显示:在任何单次运行内每一步恰好触发一次,无论那次运行重试了多少回。

agent 代码是一样的;它的触达范围不是。你从一个你操作的 agent 起步,提示它、看它、再提示它。你现在有一个自己运转的 worker:世界唤醒它,它的反射带它穿过失败,它在负载下保持平衡,而一个人只在风险要求时介入。那就是开篇画的那条线,在 一个你操作的 agent一个自己运转的 FTE 之间,而你刚刚跨过了它。

剩下的关切是规模上的可观测性、多 worker 协调,以及那一层决定哪些 worker 处理哪些流量的管理者。那是这条赛道的下一门课。这门课覆盖的是生产就绪执行的那个单元;下一门课把这些单元组合成一支劳动力。


第 5 部分:这门课在哪里收尾

一个 AI Worker 的成本形状

有两个成本面要紧:基础设施成本(Inngest,以及你运行 worker 所在的任意存储和算力)和 推理成本(模型 token)。基础设施随负载增加大致保持平坦;推理线性增长。下面这套方法才是要学的;任何具体的金额在它发布的那一周就过时了,所以把数字当作示意,在你把一个数字写进预算之前查当前的定价页。

Inngest 定价。 Inngest 按执行次数收费:每次函数运行,加上每次步骤级重试,各算一次执行。

套餐价格执行次数 / 月并发步骤值得注意的
Hobby$050,00053 个用户、50 个实时连接、无需信用卡
Pro起价 $75 / 月1,000,000100+1000+ 个实时连接、15+ 个用户、7 天轨迹保留
Enterprise定制定制500-50,000SAML / RBAC、90 天轨迹保留、专属支持

注意 Inngest 计量两样不同的东西。一个是 执行(上表):一次函数运行加上每次步骤重试。另一个是 事件(你发进去的东西):每天前 1-5M 个事件包含在内,超过之后超额从每个事件约 $0.000050 起,量更大时递减。在 Pro 上,超过 100 万执行上限后,每额外 100 万执行加收 $50。

这里要紧的 Hobby 套餐上限。 5 个并发步骤的上限意味着,哪怕你在代码里声明 concurrency=Concurrency(limit=10),平台账号级别的上限也把你按在 5。你的代码对生产是正确的;免费套餐上观察到的并发是 5。step.sleepstep.sleep_until 也受套餐约束:免费 Hobby 套餐上最多七天,付费套餐上最多一年(Inngest 用量限制)。

推理成本占大头。 一次典型的客户支持运行每次对话用几千到一万个模型 token。把你的每 token 价格乘以你每封邮件的 token 数再乘以你每天的邮件数,你就得到那条要紧的线;对大多数 worker 来说它让其他一切相形见绌。这才是你要优化的。 别的都是凑整误差。两个最高价值的杠杆:保持一个稳定的缓存提示词前缀(让模型把重复的那部分按更便宜的缓存费率计费,而不是每次调用都按全价),以及把容易的轮次路由到一个更便宜的模型。

三个 Inngest 专属的成本杠杆,一旦你进入优化区:

  • 别把纯函数包进 step.run 如果一个函数没有副作用,它不需要持久化;包裹它会徒增一笔 step-run 费用却毫无好处。把 step.run 留给 I/O 和副作用。
  • 批量路径用 batch_events 一批 50 个事件是一次函数运行,不是 50 次。
  • step.sleepstep.wait_for_event 廉价地挂起。 挂起的函数不为挂起时间计费。一次 3 天的延迟跟进和一次 3 秒的成本一样。

规模上的形状: 推理是随流量增长的那张账单;Inngest、你的数据存储和算力相对保持平坦。在你真实的量上做那道乘法,而不是相信这里印的某个数字。


替换指南:神经系统是不变量,平台不是

这门课在每一层都点名 Inngest。那是因为一个教学示例需要具体的答案,而不是「用任何你喜欢的编排器」。但这套架构在任何合规的替代品上都成立。课程的设计明确预期的五种替换:

  • 触发面:Inngest 事件 → Temporal 信号、Restate 处理器、AWS EventBridge + Lambda。 每个平台都有一种方式来表达「这段代码在这个命名的东西发生时运行」。事件名、负载形状和幂等纪律全都可迁移。变的是:SDK 的装饰器语法和仪表盘。

  • 持久化执行:Inngest step.run → Temporal activities、Restate 处理器、自定义的 Postgres 支撑的状态机。 每个都给你「记忆化这次有副作用的调用、在瞬时失败时重试、在崩溃后恢复」的语义。Temporal 是最接近的类比,也是更老、更经企业检验的选项。Restate 是最新的,带更偏函数式编程的风味。自定义状态机是那些无法采用一个托管平台的团队会写的东西;通常 1,000 到 10,000 行代码,重造出约 70% 的 Inngest 免费给你的东西。

  • HITL 原语:step.wait_for_event → Temporal 的 await Workflow.execute_activity(approval_signal)、Restate 的 awakeables、自定义的 Redis/Postgres 审批队列。 模式是一样的:函数挂起,外部信号恢复它,审计捕获决定。Inngest 的表达写起来最干净;Temporal 的更啰嗦但在大规模上久经沙场。

  • Cron 调度:Inngest cron 触发器 → Kubernetes CronJobs + 队列、GitHub Actions 计划、AWS EventBridge 计划。 Cron 触发器是大路货。Inngest 的优势不在于 cron;在于 cron 触发的函数自动获得和事件触发的函数一样的持久化/replay/流量控制。其他平台让你自己接线。

  • 流量控制:Inngest 并发 + 限流 → 带 worker 并发的 Temporal 任务队列、Redis 支撑的限速器、AWS SQS 消息可见性超时。 其他平台能做这些;Inngest 用我们见过的那种配置密度(一个装饰器参数)来做。

Dapr 作为生产规模上的开源伙伴。 一个值得点名的更有抱负的替代品:Dapr Agents 作为生产规模上 Inngest 的结构性伙伴,就像 OpenCode 之于 Claude Code 那样。Dapr Agents 在 2026 年 3 月 23 日于 CNCF 治理下达到 v1.0 GA(CNCF 公告Dapr Agents 核心概念)。DurableAgent 是那个生产就绪的类;更老的 Agent 类已弃用。当 Kubernetes 原生部署和多语言 SDK 比 Inngest 的本地开发体验更重要时,选 Dapr。Inngest 是更好的学习工具(仪表盘让心智模型可见);Dapr 是更好的规模工具,当你撞上了 Inngest 的套餐上限,或需要 K8s 原生的多语言部署时。

Inngest 也是开源的(github.com/inngest/inngest;1.0 版本在 2024 年 9 月加入了自托管支持),并可通过 Helm + KEDA 自托管。规模上要紧的轴是治理、支持和成熟度:Inngest 由单一厂商治理,自托管的故事还年轻;Dapr 由 CNCF 治理,有更长的生产履历。

这门课的概念Inngest 原语Dapr 生产对应物教学注解
计划内的工作TriggerCronCron 输入绑定 / Dapr Scheduler同一个想法:时间唤醒 worker。Dapr 通常需要组件配置。
Webhook/事件入口Inngest webhook 端点 → 事件HTTP 端点、输入绑定,或 pub/sub 入口Inngest 藏起更多管道;Dapr 给你基础设施控制权。
内部事件inngest_client.send()Dapr pub/sub同一个事件驱动心智模型;Dapr 里 broker 可插拔。
扇出一个事件触发许多函数一个 topic/事件被许多服务消费同一架构;Dapr 用 broker/topic/subscriber 组合。
持久化步骤step.run() + 记忆化Dapr Workflows + activities相似的生产目的,不同的开发者模型。
无算力等待step.sleep()持久化工作流计时器两者都避免在等待时把进程开着。
人工审批关卡step.wait_for_event()工作流外部事件/信号、pub/sub、actorsInngest 的表达更简单;Dapr 更可组合。
重试函数/步骤重试工作流/activity 重试 + 韧性策略Dapr 把韧性既做成运行时策略也做成工作流行为。
死信 / 失败的运行Inngest 仪表盘失败运行 + replayBroker DLQ + 工作流状态/重启/手动工具Inngest 这里更开箱即用;Dapr 更偏基础设施原生。
流量控制并发、限流、优先级、批处理Kubernetes 扩缩、应用并发、broker 控制、韧性策略、批量 pub/subDapr 能做,但不是一个装饰器参数。Inngest 更稠密。
有状态协调wait_for_event、事件键、步骤状态Actors + 状态存储 + 工作流Dapr Actors 在长生命周期的身份/有状态协调上更强。
Agent 运行时你的 agent 在 Inngest 函数里DurableAgent / Dapr Agents v1.0 GADapr Agents 显式地让 agent 由工作流支撑且可恢复。

这张表是一份 翻译指南,不是 API 完全相同的断言。Inngest 用一种紧凑的开发者体验教那个生产模式:触发器、步骤、等待、replay 和流量控制在一个产品界面里。Dapr 通过分布式系统的构建块实现同一套生产架构:绑定、pub/sub、工作流、actors、状态、韧性,以及 Kubernetes 原生操作。概念直接迁移;实现风格改变。截至 2026 年 5 月,对照 Dapr 的绑定概览和 Dapr Agents 核心概念验证。

在生产规模上去拿 Dapr 的三个理由:

  • CNCF 治理,按章程厂商中立:没有单一厂商控制平台或你对它的依赖。
  • 多语言且 Python 一等公民。 Dapr Agents 是 Python 优先的;同样的 agent 代码可以和用 JavaScript、Go、.NET、Java 或 PHP 写的服务并肩运行,谁都不必学第二个框架。
  • 设计上在 Kubernetes 上水平扩展。 跑在你自己的集群里、一个托管产品里(Diagrid Catalyst),或本地通过 dapr init。扩展的故事在每个环境里都是同一套架构。

诚实的注意点:Dapr 不是一个入门级平台。在生产里运行它意味着 Kubernetes、状态存储、pub/sub broker、placement 服务、可观测性、YAML 组件、sidecar。当你的目标仍是学这些模式时,那是一大片操作面,这正是为什么这门课从 Inngest 起步:一条命令,仪表盘就出现。等这些模式落地、问题转向在你掌控的基础设施上以组织规模运行时,再去拿 Dapr。

先在 Inngest 和 OpenAI Agents SDK 上学这些概念:快速的反馈循环、最少的基础设施、聚焦于模式。当你到达 Kubernetes 治理、多语言团队,或厂商中立成为不可妥协的那个规模时,同样的架构模式凭上面那张翻译表作为你的钥匙,会移植到 Dapr 上。模式迁移;底座改变;你在这门课里学到的,仍然是那块承重的知识。


这门课(目前)没覆盖的

你搭的 worker 满足论点提出的七大不变量里的四个。具体来说:它运行在一个引擎上(不变量 4,SandboxAgent),对着一个记录系统(不变量 5,审计轨迹),世界能够调用它(不变量 7,你加的那些触发器),并在一个设关卡的决定上以人为主体(不变量 1,部分:运行时机制在这里,更广的架构模式在后面)。剩下的三个不变量,以及那个把 worker 们变成一支劳动力的更广架构,是后面的课程。每个一句话:

  • 不变量 2:每个人都需要一个代理人。 一个在边缘的个人 agent,它持有你的上下文、代表你的判断,并把工作中介给劳动力。论点点名 OpenClaw 为当前的实现。
  • 不变量 3:劳动力需要一个管理者。 一个编排器,它分派工作、强制执行预算、审计执行,并把招聘暴露为一个可调用的能力。论点点名 Paperclip
  • 不变量 6:劳动力按策略可扩张。 一个元层,一个被授权的 agent 在其中生成一条提示词、开通一个运行时,并注册一个新 worker,无需唤醒一个人。Claude Managed Agents 是其中一种实现。

一个在事件上醒来、持久地运行、并在人那里设关卡的单个 worker,是这门课所教架构里最小的那个单元。下一门课把那个 worker 扩展成一支劳动力:由一个管理者协调的多个 worker,按需可扩张,由触发器唤醒,由 spec 治理。同样的 OpenAI Agents SDK 基础,同样的审计习惯,同样的 Inngest 神经系统。架构是不变量。


怎样才真的把这个练好

读这门速成课不会让你擅长搭 AI Worker。用它才会。你从搭那个 worker 起步,在包裹它时感受那些摩擦,让每一处摩擦教会你它属于哪个概念。

这门课的对应关系:

  • 「为什么事件到达时我的函数不触发?」→ 事件名拼错或命名空间不匹配(概念 3)。把你 TriggerEvent 里的事件名字符串和 inngest_client.send 里的逐字节对比。
  • 「为什么我的函数为同一个逻辑事件触发了两次?」→ 缺幂等键(概念 4)。给事件加一个带确定性种子的 id=
  • 「为什么我的函数在一次部署后『丢了工作』?」→ 在 step.run 之外的代码在干活(概念 7)。把 I/O 和副作用包进命名的步骤。
  • 「为什么客户被扣了两次款?」→ Stripe 调用在 step.run 之外,或步骤名不唯一(概念 6 和 7)。把那次调用移进一个命名的 step.run;让步骤名在函数内全局唯一。
  • 「为什么 OpenAI 在 9 点峰值返回 429 错误?」→ 缺限流(概念 11)。加 throttle=Throttle(limit=N, period=timedelta(minutes=1))
  • 「为什么一位客户的突发让其他客户挨饿?」→ 缺按键并发(概念 12)。加第二个 Concurrency(limit=2, key="event.data.customer_id")
  • 「为什么我的 HITL 关卡在周末悄无声息地触发了?」→ 缺一个往审计写入的超时处理器(概念 15)。在 approval is None 上分支,并显式写那行审计。

一次搭一块地建出这套架构。 这就是为什么第 4 部分是七条提示词,不是一条。搭那个 worker(D0)。把 agent 包进 step.run(D1),看你故意在运行中途崩溃时有什么变化。在一个事件上唤醒它(D2)。加上 cron 扇出(D3),然后是流量控制(D4),在你真撞过一次限速之后,然后是那道持久化审批关卡(D5),在一个高风险动作真需要一个人的时候。每一层都是它自己的一份学习。合成一次大重写,它们就是一堵墙。

这门课所教的纪律(在事件上醒来、持久地运行、在人那里设关卡、在 bug 上 replay)就是那个架构不变量。无论什么平台实现它,那个四属性的契约才是你真正承诺的东西。这是 Lindy 押注:你建在那些已经持久的部分上 —— 纯函数、SQL、一门有类型的语言、一个事件总线 —— 而不是这一季的某个包装。产品可替换;纪律不可。


快速参考

一道把叙述式课程和搭建期参考分开的分隔线。下面这些小节是用来搜的,不是从头读到尾的。每个概念的一句话要点在引言那张折叠的速查表里;本节是搭建期的诊断、两棵决策树,以及文件布局。

决策树:选触发面

当世界里有一件新事发生时,那次唤醒从哪儿来?

  • 一个外部系统给我们发了一个 HTTP 请求。 → Webhook 触发器。在 Inngest 仪表盘里配置源;用 transform 重塑负载;消费由此产生的事件。
  • 一个时间表说时候到了。 → Cron 触发器。TriggerCron(cron="...")。用 UTC;生产的 cron 即便你的服务正在部署中途也照样触发。
  • 另一个 Inngest 函数在它运行期间发出了一个事件。 → 事件触发器。TriggerEvent(event="ns/name.subtype")。让一个或多个函数订阅同一个名字。
  • 一个交互式用户在等一个即时响应。 → 不是 Inngest 触发器。把请求/响应留在你正常的 web 端点里;如果响应涉及重活儿,就在请求内部触发一个事件并立即返回,让 Inngest 异步处理那份工作。

决策树:选步骤原语

给定一个函数正在运行、你需要做 某件事,你伸手抓哪个 step.* 调用?

  • 一次有副作用的调用(API、数据库、文件写入、agent 调用)。ctx.step.run("name", fn, ...)。默认选项。成功即记忆化,瞬时失败即重试。
  • 一台按在途时间计费的无服务器平台上一次长时间运行的 OpenAI 调用。ctx.step.ai.infer(...)。把推理卸载到 Inngest 的基础设施,让你的函数进程可以释放。
  • 在继续之前等一段固定时长。ctx.step.sleep("name", timedelta(...))。持久;等待时零算力(免费套餐最多七天,付费最多一年)。
  • 等一个外部事件(人工审批、兄弟函数完成)。ctx.step.wait_for_event("name", event="...", timeout=..., if_exp=...)。持久;在事件到达时恢复,或超时时返回 None
  • 纯确定性计算(格式化一个字符串、算一个日期)。 → 直接写代码。不需要 step.run;不收费。

文件位置速查

一个扁平的项目,四个文件,没有 src/ 嵌套:

ai-agent-nervous-system/
├── .claude/
│ └── skills/ # the four Inngest skills (installed in the Quick Win)
│ ├── inngest-setup/SKILL.md
│ ├── inngest-events/SKILL.md
│ ├── inngest-steps/SKILL.md
│ └── inngest-durable-functions/SKILL.md
├── db.py # Neon Postgres access: pooled asyncpg, load_customers, record (closed-vocabulary audit) (D0)
├── worker.py # the worker: SandboxAgent + 2 tools (D0)
├── inngest_app.py # the nervous system: Inngest functions + FastAPI host (D1-D5)
├── .env # OPENAI_API_KEY, DATABASE_URL, INNGEST_DEV=1
└── AGENTS.md # the base's rules file (read on open)

这些文件名是一种合理的布局,不是一项要求;你的 agent 可能落在 agent.pymain.py 上,那也没关系。要紧的是那道边界,不是名字:worker 代码从不导入 inngest,而恰好一个文件在上面接起那套神经系统。有了那个布局,客户和审计轨迹住在你的 Neon 数据库里(在快速上手里开通,在 D0 里填种子数据),不在本地文件里;worker 文件在 D0 之后从不改变,而每个神经系统层(D1 到 D5)都编辑那一个 Inngest 文件。

诊断表,症状 → 根因 → 概念

症状首先怀疑重读哪个概念
期望的事件到达时函数从不触发事件名拼错、命名空间不匹配C3(webhook)、C5(扇出)
函数为同一个逻辑事件触发两次缺幂等键C4(幂等性)
函数在部署后「丢了工作」step.run 之外的代码在干活C7(记忆化)
Cron 计划在一次部署期间没触发仅本地开发服务器,生产跑在 Inngest 基础设施上C2(cron)
客户为一笔退款被扣两次款Stripe 调用在 step.run 之外,或步骤名不唯一C6(step.run)、C7(记忆化)
9 点峰值时 OpenAI 限速错误缺限流C11(并发 + 限流)
一位客户的突发让其他客户挨饿缺按键并发C12(优先级 + 公平性)
函数永远挂起,从不恢复wait_for_event 里的事件名和被发送的事件不匹配C8(wait_for_event)、C15(HITL)
HITL 超时在周末悄无声息地触发了缺一个往审计写入的超时处理器D5(持久化退款关卡)、C15(HITL)
昨天失败的运行从仪表盘里消失了运行会保留直到被手动 replay 或保留窗口之后C14(replay)
Replay 重新给客户扣了款Replay 是一次重新执行每一步的全新运行;那次扣款没有幂等键C4(幂等性)、C14(replay 是一次全新运行)
函数轨迹不显示 OpenAI 提示词步骤轨迹显示函数输入/输出但没有 LLM 专属的提示词/token 遥测C10(Python 用 step.run;LLM 专属遥测需要你自己的 OpenAI 客户端追踪;step.ai.wrap 的提示词级轨迹仅限 TypeScript)

附录:可选的渊源与一份 Inngest 速查表

你不需要 Digital FTE 课程才能做第 4 部分:D0 从零搭出 worker。两条简短的说明作为背景。

A.1:如果你来自 Digital FTE 课程

从 Agent 到 Digital FTE 课程搭出一个更丰富的客户支持 worker:可移植的 Skill、一个 Postgres 记录系统,以及一个自定义 MCP 服务器。如果你做过它,你已经有一个 SandboxAgent worker 躺在磁盘上,你可以跳过 D0 的最小化底座:把这套神经系统(D1 起)指向你自己的 worker。包裹方式完全一样。一个额外好处:你在 D5 里搭的那道持久化退款关卡(step.wait_for_event)替换掉那门课可选 Decision 10 里那张手写的运行状态表,所以你可以删掉它。如果你 做过那门课,忽略这一切;D0 给你你需要的一切。

A.2:这门课用到的 Inngest 专属要点

如果下面有任何东西感觉不熟悉,在扎进第 4 部分之前先扫一遍对应的文档页。

  • Inngest 客户端实例化。 每个 Python 项目一个 inngest.Inngest(app_id=...) 实例,从一个模块导出,在你装饰函数的任何地方导入。Python 快速开始
  • 函数装饰。 @inngest_client.create_function(fn_id=..., trigger=...)。触发器可以是 TriggerEventTriggerCron,或为多触发器函数提供两者的一个列表。
  • ctx.step.runctx.step.sleepctx.step.wait_for_eventctx.step.ai.infer 这四个步骤原语构成你在 Python 里要写的内容的 90%。(TypeScript 有第五个,step.ai.wrap,用于 LLM 专属的追踪;Python 项目用 step.run 处理 AI 调用。)
  • inngest_client.send(events=[...]) 从你代码的任何地方发出事件(在函数内部、在 agent 工具内部、从 CLI 脚本)。用一个 id= 做幂等。
  • 开发服务器启动。 npx inngest-cli@latest dev。跑在 :8288。仪表盘在 http://127.0.0.1:8288。MCP 在 http://127.0.0.1:8288/mcp。如果 :8288 被占了它用 8289+;那就在 host 上设 INNGEST_BASE_URL=http://127.0.0.1:<port>,让它跟上,而不只是 MCP URL。

A.3:真正难的那两个转变

这门课最难的东西不是 Inngest 的语法。是 从请求到事件的心智转变(概念 1)和 从进程内执行到持久化执行的转变(概念 6)。一旦那两个落地,语法就是机械的了。如果别的什么感觉比它该有的更难,先重读概念 1 和 6。

闪卡学习辅助

知识检查

一道对你刚跑过的那些想法的快速、带关卡的自检。

Checking access...