Inngest 工作流
🌐 Inngest workflow
Inngest 是一个开发者平台,用于构建和运行后台工作流程,无需管理基础设施。
有关带有高级流程控制功能的完整示例,请参见 Inngest 工作流示例。
🌐 For a complete example with advanced flow control features, see the Inngest workflow example.
Inngest 如何与 Mastra 协作Direct link to Inngest 如何与 Mastra 协作
🌐 How Inngest works with Mastra
Inngest 和 Mastra 通过对齐它们的工作流模型来实现集成:Inngest 将逻辑组织为由多个步骤组成的函数,而使用 createWorkflow() 和 createStep() 定义的 Mastra 工作流可以直接映射到这一范式。每个 Mastra 工作流都会成为一个具有唯一标识符的 Inngest 函数,工作流中的每个步骤都会映射到一个 Inngest 步骤。
🌐 Inngest and Mastra integrate by aligning their workflow models: Inngest organizes logic into functions composed of steps, and Mastra workflows defined using createWorkflow() and createStep() map directly onto this paradigm. Each Mastra workflow becomes an Inngest function with a unique identifier, and each step within the workflow maps to an Inngest step.
serve() 函数通过将 Mastra 工作流注册为 Inngest 函数,并设置执行和监控所需的事件处理程序,从而实现两个系统之间的桥接。
🌐 The serve() function bridges the two systems by registering Mastra workflows as Inngest functions and setting up the necessary event handlers for execution and monitoring.
当某个事件触发工作流时,Inngest 会逐步执行该工作流,并记录每一步的结果。这意味着如果工作流被重试或恢复,已完成的步骤将被跳过,从而确保执行的高效性和可靠性。Mastra 中的控制流原语,例如循环、条件语句和嵌套工作流,会无缝地转化为相同的 Inngest 函数/步骤模型,同时保留高级工作流功能,如组合、分支和暂停。
🌐 When an event triggers a workflow, Inngest executes it step by step, memoizing each step's result. This means if a workflow is retried or resumed, completed steps are skipped, ensuring efficient and reliable execution. Control flow primitives in Mastra, such as loops, conditionals, and nested workflows are seamlessly translated into the same Inngest's function/step model, preserving advanced workflow features like composition, branching, and suspension.
通过 Inngest 的发布-订阅系统和仪表板,可以实现实时监控、暂停/恢复以及步骤级的可观察性。每个步骤执行时,其状态和输出都会使用 Mastra 存储进行跟踪,并可以根据需要恢复。
🌐 Real-time monitoring, suspend/resume, and step-level observability are enabled via Inngest's publish-subscribe system and dashboard. As each step executes, its state and output are tracked using Mastra storage and can be resumed as needed.
设置Direct link to 设置
🌐 Setup
安装所需的包:
🌐 Install the required packages:
- npm
- pnpm
- Yarn
- Bun
npm install @mastra/inngest@latest inngest @inngest/realtime
pnpm add @mastra/inngest@latest inngest @inngest/realtime
yarn add @mastra/inngest@latest inngest @inngest/realtime
bun add @mastra/inngest@latest inngest @inngest/realtime
构建 Inngest 工作流Direct link to 构建 Inngest 工作流
🌐 Building an Inngest workflow
本指南讲解了如何使用 Inngest 和 Mastra 创建工作流,并演示了一个计数器应用,它会将数值递增直到达到 10。
🌐 This guide walks through creating a workflow with Inngest and Mastra, demonstrating a counter application that increments a value until it reaches 10.
Inngest 初始化Direct link to Inngest 初始化
🌐 Inngest initialization
初始化 Inngest 集成以获取与 Mastra 兼容的工作流辅助工具。createWorkflow() 和 createStep() 函数用于创建与 Mastra 和 Inngest 兼容的工作流和步骤对象。
🌐 Initialize the Inngest integration to obtain Mastra-compatible workflow helpers. The createWorkflow() and createStep() functions are used to create workflow and step objects that are compatible with Mastra and inngest.
开发中:
🌐 In development:
import { Inngest } from "inngest";
import { realtimeMiddleware } from "@inngest/realtime/middleware";
export const inngest = new Inngest({
id: "mastra",
baseUrl: "http://localhost:8288",
isDev: true,
middleware: [realtimeMiddleware()],
});
在生产中:
🌐 In production:
import { Inngest } from "inngest";
import { realtimeMiddleware } from "@inngest/realtime/middleware";
export const inngest = new Inngest({
id: "mastra",
middleware: [realtimeMiddleware()],
});
创建步骤Direct link to 创建步骤
🌐 Creating steps
定义将组成你的工作流程的各个步骤:
🌐 Define the individual steps that will compose your workflow:
import { z } from "zod";
import { inngest } from "../inngest";
import { init } from "@mastra/inngest";
// Initialize Inngest with Mastra, pointing to your local Inngest server
const { createWorkflow, createStep } = init(inngest);
// Step: Increment the counter value
const incrementStep = createStep({
id: "increment",
inputSchema: z.object({
value: z.number(),
}),
outputSchema: z.object({
value: z.number(),
}),
execute: async ({ inputData }) => {
return { value: inputData.value + 1 };
},
});
创建工作流程Direct link to 创建工作流程
🌐 Creating the workflow
将这些步骤使用 dountil 循环模式组合成一个工作流。createWorkflow() 函数在 Inngest 服务器上创建一个可调用的函数。
🌐 Compose the steps into a workflow using the dountil loop pattern. The createWorkflow() function creates a function on Inngest server that is invocable.
// workflow that is registered as a function on inngest server
const workflow = createWorkflow({
id: "increment-workflow",
inputSchema: z.object({
value: z.number(),
}),
outputSchema: z.object({
value: z.number(),
}),
}).then(incrementStep);
workflow.commit();
export { workflow as incrementWorkflow };
配置 Mastra 实例Direct link to 配置 Mastra 实例
🌐 Configuring the Mastra instance
在 Mastra 上注册工作流并配置 Inngest API 端点:
🌐 Register the workflow with Mastra and configure the Inngest API endpoint:
import { Mastra } from "@mastra/core";
import { serve } from "@mastra/inngest";
import { incrementWorkflow } from "./workflows";
import { inngest } from "./inngest";
import { PinoLogger } from "@mastra/loggers";
export const mastra = new Mastra({
workflows: { incrementWorkflow },
server: {
host: "0.0.0.0",
apiRoutes: [
{
path: "/api/inngest",
method: "ALL",
createHandler: async ({ mastra }) => {
return serve({ mastra, inngest });
},
},
],
},
logger: new PinoLogger({ name: "Mastra", level: "info" }),
});
运行工作流Direct link to 运行工作流
🌐 Running workflows
本地运行Direct link to 本地运行
🌐 Running locally
-
运行
npx mastra dev在本地端口 4111 启动 Mastra 服务器 -
启动 Inngest 开发服务器。在一个新的终端中运行:
npx inngest-cli@latest dev -u http://localhost:4111/api/inngestnote
-u 后的 URL 告诉 Inngest 开发服务器在哪里可以找到你的 Mastra /api/inngest 端点
:::
-
在 http://localhost:8288 打开 Inngest 仪表板,并转到侧边栏的 Apps 部分以验证你的 Mastra 工作流是否已注册
-
通过转到 Functions,选择你的工作流,然后点击 Invoke 并使用以下输入来调用工作流:
{
"data": {
"inputData": {
"value": 5
}
}
} -
在 运行 标签中监控工作流执行,以查看逐步执行进度
在生产环境中运行Direct link to 在生产环境中运行
🌐 Running in production
在开始之前,请确保你已经拥有:
🌐 Before you begin, make sure you have:
- 已安装 Vercel 账户和 Vercel CLI (
npm i -g vercel) - Inngest 账户
- Vercel 令牌
-
在你的环境中设置 Vercel 令牌:
.envexport VERCEL_TOKEN=your_vercel_token -
将
VercelDeployer添加到 Mastra 实例src/mastra/index.tsimport { VercelDeployer } from "@mastra/deployer-vercel";
export const mastra = new Mastra({
deployer: new VercelDeployer({
teamSlug: "your_team_slug",
projectName: "your_project_name",
// you can get your vercel token from the vercel dashboard by clicking on the user icon in the top right corner
// and then clicking on "Account Settings" and then clicking on "Tokens" on the left sidebar.
token: process.env.VERCEL_TOKEN,
}),
}); -
构建 mastra 实例
npx mastra build -
部署到 Vercel
cd .mastra/output
vercel login
vercel --prod -
通过点击 与 Vercel 同步新应用 并按照指示操作,与 Inngest 仪表板 同步
-
通过进入 Functions,选择
workflow.increment-workflow,点击 所有操作 > 调用,并提供以下输入来调用工作流:{
"data": {
"inputData": {
"value": 5
}
}
} -
在 运行 选项卡中监控执行情况,以查看逐步进度
添加自定义 Inngest 函数Direct link to 添加自定义 Inngest 函数
🌐 Adding custom Inngest functions
你可以通过在 serve() 中使用可选的 functions 参数,将额外的 Inngest 函数与你的 Mastra 工作流一起使用。
🌐 You can serve additional Inngest functions alongside your Mastra workflows by using the optional functions parameter in serve().
创建自定义函数Direct link to 创建自定义函数
🌐 Creating custom functions
首先,创建你的自定义 Inngest 函数:
🌐 First, create your custom Inngest functions:
import { inngest } from "../inngest";
// Define custom Inngest functions
export const customEmailFunction = inngest.createFunction(
{ id: "send-welcome-email" },
{ event: "user/registered" },
async ({ event }) => {
// Custom email logic here
console.log(`Sending welcome email to ${event.data.email}`);
return { status: "email_sent" };
},
);
export const customWebhookFunction = inngest.createFunction(
{ id: "process-webhook" },
{ event: "webhook/received" },
async ({ event }) => {
// Custom webhook processing
console.log(`Processing webhook: ${event.data.type}`);
return { processed: true };
},
);
通过工作流提供自定义功能Direct link to 通过工作流提供自定义功能
🌐 Serving custom functions with workflows
更新你的 Mastra 配置以导入并包含自定义函数。高亮显示的行显示了新增内容:
🌐 Update your Mastra configuration to import and include the custom functions. The highlighted lines show the additions:
import { Mastra } from "@mastra/core";
import { serve } from "@mastra/inngest";
import { incrementWorkflow } from "./workflows";
import { inngest } from "./inngest";
import {
customEmailFunction,
customWebhookFunction,
} from "./inngest/custom-functions";
import { PinoLogger } from "@mastra/loggers";
export const mastra = new Mastra({
workflows: { incrementWorkflow },
server: {
host: "0.0.0.0",
apiRoutes: [
{
path: "/api/inngest",
method: "ALL",
createHandler: async ({ mastra }) => {
return serve({
mastra,
inngest,
functions: [customEmailFunction, customWebhookFunction],
});
},
},
],
},
logger: new PinoLogger({ name: "Mastra", level: "info" }),
});
函数注册Direct link to 函数注册
🌐 Function registration
当你包含自定义函数时:
🌐 When you include custom functions:
- Mastra 工作流会自动转换为带有类似
workflow.${workflowId}的 ID 的 Inngest 函数 - 自定义函数保留其指定的 ID(例如
send-welcome-email、process-webhook) - 所有功能都在同一个
/api/inngest端点上提供
这允许你将 Mastra 的工作流编排与现有的 Inngest 功能结合起来。
🌐 This allows you to combine Mastra's workflow orchestration with your existing Inngest functions.
与其他框架一起使用Direct link to 与其他框架一起使用
🌐 Using with other frameworks
默认的 serve 函数内部使用 Hono。如果你使用的是 Express、Fastify 或 Koa 等其他 web 框架,请使用带有相应 Inngest 适配器的 createServe 工厂函数。
🌐 The default serve function uses Hono internally. If you're using a different web framework like Express, Fastify, or Koa, use the createServe factory function with the appropriate Inngest adapter.
ExpressDirect link to Express
import express from "express";
import { createServe } from "@mastra/inngest";
import { serve as expressAdapter } from "inngest/express";
import { mastra, inngest } from "./mastra";
const app = express();
// Body parsing middleware required for Inngest
app.use(express.json());
const handler = createServe(expressAdapter)({ mastra, inngest });
app.use("/api/inngest", handler);
app.listen(3000);
FastifyDirect link to Fastify
import Fastify from "fastify";
import { createServe } from "@mastra/inngest";
import { serve as fastifyAdapter } from "inngest/fastify";
import { mastra, inngest } from "./mastra";
const fastify = Fastify();
// JSON parsing is handled by Fastify's default content-type parser
const handler = createServe(fastifyAdapter)({ mastra, inngest });
fastify.route({
method: ["GET", "POST", "PUT"],
url: "/api/inngest",
handler,
});
fastify.listen({ port: 3000 });
考阿Direct link to 考阿
🌐 Koa
import Koa from "koa";
import Router from "@koa/router";
import bodyParser from "koa-bodyparser";
import { createServe } from "@mastra/inngest";
import { serve as koaAdapter } from "inngest/koa";
import { mastra, inngest } from "./mastra";
const app = new Koa();
const router = new Router();
// Body parsing middleware required for Inngest
app.use(bodyParser());
const handler = createServe(koaAdapter)({ mastra, inngest });
router.all("/api/inngest", handler);
app.use(router.routes());
app.use(router.allowedMethods());
app.listen(3000);
Next.jsDirect link to Next.js
import { createServe } from "@mastra/inngest";
import { serve as nextAdapter } from "inngest/next";
import { mastra, inngest } from "@/mastra";
const handler = createServe(nextAdapter)({ mastra, inngest });
export { handler as GET, handler as POST, handler as PUT };
可用适配器Direct link to 可用适配器
🌐 Available adapters
createServe 函数可以与任何 Inngest 适配器一起使用。请参阅 Inngest serve 文档 获取完整的可用适配器列表,包括 AWS Lambda、Cloudflare Workers 等。
🌐 The createServe function works with any Inngest adapter. See the Inngest serve documentation for a complete list of available adapters including AWS Lambda, Cloudflare Workers, and more.
流程控制Direct link to 流程控制
🌐 Flow control
Inngest 工作流支持包括并发限制、速率限制、节流、防抖和优先队列在内的流程控制功能。这些选项在 createWorkflow() 调用中配置,有助于在大规模下管理工作流执行。
🌐 Inngest workflows support flow control features including concurrency limits, rate limiting, throttling, debouncing, and priority queuing. These options are configured in the createWorkflow() call and help manage workflow execution at scale.
并发Direct link to 并发
🌐 Concurrency
控制可以同时运行的工作流实例数量:
🌐 Control how many workflow instances can run simultaneously:
const workflow = createWorkflow({
id: "user-processing-workflow",
inputSchema: z.object({ userId: z.string() }),
outputSchema: z.object({ result: z.string() }),
steps: [processUserStep],
// Limit to 10 concurrent executions, scoped by user ID
concurrency: {
limit: 10,
key: "event.data.userId",
},
});
速率限制Direct link to 速率限制
🌐 Rate limiting
在特定时间段内限制工作流执行次数:
🌐 Limit the number of workflow executions within a time period:
const workflow = createWorkflow({
id: "api-sync-workflow",
inputSchema: z.object({ endpoint: z.string() }),
outputSchema: z.object({ status: z.string() }),
steps: [apiSyncStep],
// Maximum 1000 executions per hour
rateLimit: {
period: "1h",
limit: 1000,
},
});
限流Direct link to 限流
🌐 Throttling
确保工作流执行之间的最短时间间隔:
🌐 Ensure minimum time between workflow executions:
const workflow = createWorkflow({
id: "email-notification-workflow",
inputSchema: z.object({ organizationId: z.string(), message: z.string() }),
outputSchema: z.object({ sent: z.boolean() }),
steps: [sendEmailStep],
// Only one execution per 10 seconds per organization
throttle: {
period: "10s",
limit: 1,
key: "event.data.organizationId",
},
});
防抖Direct link to 防抖
🌐 Debouncing
延迟执行,直到在时间窗口内没有新的事件到来:
🌐 Delay execution until no new events arrive within a time window:
const workflow = createWorkflow({
id: "search-index-workflow",
inputSchema: z.object({ documentId: z.string() }),
outputSchema: z.object({ indexed: z.boolean() }),
steps: [indexDocumentStep],
// Wait 5 seconds of no updates before indexing
debounce: {
period: "5s",
key: "event.data.documentId",
},
});
优先级Direct link to 优先级
🌐 Priority
设置工作流的执行优先级:
🌐 Set execution priority for workflows:
const workflow = createWorkflow({
id: "order-processing-workflow",
inputSchema: z.object({
orderId: z.string(),
priority: z.number().optional(),
}),
outputSchema: z.object({ processed: z.boolean() }),
steps: [processOrderStep],
// Higher priority orders execute first
priority: {
run: "event.data.priority ?? 50",
},
});
组合流控制选项Direct link to 组合流控制选项
🌐 Combining flow control options
多种流程控制选项可以在单个工作流中组合使用:
🌐 Multiple flow control options can be combined in a single workflow:
const workflow = createWorkflow({
id: "comprehensive-workflow",
inputSchema: z.object({
userId: z.string(),
organizationId: z.string(),
priority: z.number().optional(),
}),
outputSchema: z.object({ result: z.string() }),
steps: [comprehensiveStep],
concurrency: {
limit: 5,
key: "event.data.userId",
},
rateLimit: {
period: "1m",
limit: 100,
},
throttle: {
period: "10s",
limit: 1,
key: "event.data.organizationId",
},
priority: {
run: "event.data.priority ?? 0",
},
});
所有流程控制选项都是可选的。如果未指定,工作流程将使用 Inngest 的默认行为运行。更多信息,请参见 Inngest 流程控制文档。
🌐 All flow control options are optional. If not specified, workflows run with Inngest's default behavior. For more information, see the Inngest flow control documentation.
Cron 调度Direct link to Cron 调度
🌐 Cron scheduling
Inngest 工作流可以使用 cron 表达式按计划自动触发。这使你能够以固定间隔运行工作流,例如每日报告、每小时数据同步或维护任务。
🌐 Inngest workflows can be automatically triggered on a schedule using cron expressions. This allows you to run workflows at regular intervals, such as daily reports, hourly data syncs, or maintenance tasks.
基本的 cron 调度Direct link to 基本的 cron 调度
🌐 Basic cron scheduling
通过添加 cron 属性来配置工作流以按计划运行:
🌐 Configure a workflow to run on a schedule by adding a cron property:
const workflow = createWorkflow({
id: "daily-report-workflow",
inputSchema: z.object({ reportType: z.string() }),
outputSchema: z.object({ generated: z.boolean() }),
steps: [generateReportStep],
// Run daily at midnight
cron: "0 0 * * *",
});
Cron 调度格式Direct link to Cron 调度格式
🌐 Cron schedule format
cron 属性接受格式为 minute hour day month dayOfWeek 的标准 cron 表达式
🌐 The cron property accepts standard cron expressions in the format: minute hour day month dayOfWeek
- 分钟:0-59
- 小时:0-23
- 日:1-31
- 月份:1-12 或 JAN-DEC
- dayOfWeek:0-6(星期天 = 0)或 SUN-SAT
常见的 cron 模式:
🌐 Common cron patterns:
// Every 15 minutes
cron: "*/15 * * * *"
// Every hour at minute 0
cron: "0 * * * *"
// Every 6 hours
cron: "0 */6 * * *"
// Daily at midnight
cron: "0 0 * * *"
// Daily at 9 AM
cron: "0 9 * * *"
// Every weekday at 9 AM
cron: "0 9 * * 1-5"
// First day of every month at midnight
cron: "0 0 1 * *"
// Every Monday at 8 AM
cron: "0 8 * * 1"
为计划运行提供输入数据Direct link to 为计划运行提供输入数据
🌐 Providing input data for scheduled runs
你可以提供将在每次计划执行中使用的静态输入数据:
🌐 You can provide static input data that will be used for each scheduled execution:
const workflow = createWorkflow({
id: "scheduled-data-sync",
inputSchema: z.object({
source: z.string(),
destination: z.string(),
}),
outputSchema: z.object({ synced: z.boolean() }),
steps: [syncDataStep],
cron: "0 */6 * * *", // Every 6 hours
// Input data provided to each scheduled run
inputData: {
source: "production-db",
destination: "analytics-warehouse",
},
});
为计划运行提供初始状态Direct link to 为计划运行提供初始状态
🌐 Providing initial state for scheduled runs
你还可以为计划的工作流运行设置初始状态:
🌐 You can also set an initial state for scheduled workflow runs:
const workflow = createWorkflow({
id: "scheduled-aggregation",
inputSchema: z.object({ date: z.string() }),
outputSchema: z.object({ aggregated: z.boolean() }),
stateSchema: z.object({
processedCount: z.number(),
lastProcessedDate: z.string(),
}),
steps: [aggregateDataStep],
cron: "0 0 * * *", // Daily at midnight
inputData: {
date: new Date().toISOString().split("T")[0], // Today's date
},
initialState: {
processedCount: 0,
lastProcessedDate: "",
},
});
将 cron 与流程控制结合Direct link to 将 cron 与流程控制结合
🌐 Combining cron with flow control
Cron 调度可以与流程控制选项结合使用:
🌐 Cron scheduling can be combined with flow control options:
const workflow = createWorkflow({
id: "scheduled-api-sync",
inputSchema: z.object({ endpoint: z.string() }),
outputSchema: z.object({ synced: z.boolean() }),
steps: [syncApiStep],
cron: "*/30 * * * *", // Every 30 minutes
inputData: {
endpoint: "https://api.example.com/data",
},
// Limit concurrent executions even for scheduled runs
concurrency: {
limit: 5,
},
// Rate limit scheduled executions
rateLimit: {
period: "1h",
limit: 100,
},
});
Cron 功能如何运作Direct link to Cron 功能如何运作
🌐 How cron functions work
当你使用 cron 属性配置工作流时:
🌐 When you configure a workflow with a cron property:
- 一个独立的 Inngest 函数会自动创建,ID 为
workflow.${workflowId}.cron - 此函数已在 Inngest 中注册,并将在指定的时间表触发
- 每次计划执行都会使用提供的
inputData和initialState创建一个新的工作流运行 - 当你调用
serve()时,cron 函数和主工作流程函数会同时被调用
你可以在 Inngest 仪表板的 函数 和 运行 部分监控计划执行。cron 函数将作为一个独立的函数显示在你的主工作流函数旁边。
🌐 You can monitor scheduled executions in the Inngest dashboard under Functions and Runs sections. The cron function will appear as a separate function alongside your main workflow function.
有关 cron 调度的更多信息,请参阅 Inngest cron 文档。
🌐 For more information on cron scheduling, see the Inngest cron documentation.