Skip to main content

工作流程

🌐 Workflows

旧版工作流功能已被移除。

🌐 Legacy workflow features have been removed.

已更改
Direct link to 已更改

🌐 Changed

getWorkflowslistWorkflows
Direct link to getworkflows-to-listworkflows

🌐 getWorkflows to listWorkflows

mastra.getWorkflows() 方法已重命名为 mastra.listWorkflows()。此更改与整个 API 中的命名约定保持一致,其中复数获取方法使用 list 前缀。

🌐 The mastra.getWorkflows() method has been renamed to mastra.listWorkflows(). This change aligns with the naming convention used across the API where plural getter methods use the list prefix.

要迁移,请将所有对 mastra.getWorkflows() 的调用替换为 mastra.listWorkflows()

🌐 To migrate, replace all calls to mastra.getWorkflows() with mastra.listWorkflows().

- const workflows = mastra.getWorkflows();
+ const workflows = mastra.listWorkflows();
Codemod

你可以使用 Mastra 的 codemod CLI 来自动更新你的导入:

🌐 You can use Mastra's codemod CLI to update your imports automatically:

npx @mastra/codemod@latest v1/mastra-plural-apis .

RuntimeContextRequestContext 在步骤上下文中
Direct link to runtimecontext-to-requestcontext-in-step-context

🌐 RuntimeContext to RequestContext in step context

工作流步骤执行上下文中参数名称 runtimeContext 已更改为 requestContext。此更改与全局重命名保持一致,以提高清晰度。

🌐 The parameter name runtimeContext has been changed to requestContext in workflow step execution context. This change aligns with the global rename for clarity.

要迁移,请在步骤执行函数中将引用从 runtimeContext 更新为 requestContext

🌐 To migrate, update references from runtimeContext to requestContext in step execution functions.

  createStep({
- execute: async ({ runtimeContext } ) => {
- const userTier = context.runtimeContext.get('userTier');
+ execute: async ({ requestContext } ) => {
+ const userTier = requestContext.get('userTier');
return { result: userTier };
},
});
Codemod

你可以使用 Mastra 的 codemod CLI 来自动更新你的导入:

🌐 You can use Mastra's codemod CLI to update your imports automatically:

npx @mastra/codemod@latest v1/runtime-context .

createRunAsynccreateRun
Direct link to createrunasync-to-createrun

🌐 createRunAsync to createRun

createRunAsync() 方法已被重命名为 createRun()。此更改通过移除多余的“Async”后缀简化了 API,因为所有运行创建都是异步的。

🌐 The createRunAsync() method has been renamed to createRun(). This change simplifies the API by removing the redundant "Async" suffix since all run creation is asynchronous.

要迁移,请将方法调用从 createRunAsync 重命名为 createRun

🌐 To migrate, rename method calls from createRunAsync to createRun.

- await workflow.createRunAsync({ input: { ... } });
+ await workflow.createRun({ input: { ... } });
Codemod

你可以使用 Mastra 的 codemod CLI 来自动更新你的代码:

🌐 You can use Mastra's codemod CLI to update your code automatically:

npx @mastra/codemod@latest v1/workflow-create-run-async .

runCountretryCount(已弃用)
Direct link to runcount-to-retrycount-deprecated

🌐 runCount to retryCount (deprecated)

runCount 参数在工作流步骤执行中已被弃用,建议使用 retryCount。此更改提供了更清晰的命名,更好地描述了重试行为。旧的 runCount 仍然可以使用,但会显示弃用警告。

🌐 The runCount parameter has been deprecated in favor of retryCount in workflow step execution. This change provides clearer naming that better describes retry behavior. The old runCount still works but shows deprecation warnings.

要进行迁移,请在步骤执行函数中将 runCount 重命名为 retryCount

🌐 To migrate, rename runCount to retryCount in step execution functions.

  createStep({
execute: async (inputData, context) => {
- console.log(`Step run ${context.runCount} times`);
+ console.log(`Step retry count: ${context.retryCount}`);
},
});
Codemod

你可以使用 Mastra 的 codemod CLI 来自动更新你的代码:

🌐 You can use Mastra's codemod CLI to update your code automatically:

npx @mastra/codemod@latest v1/workflow-run-count .

getInitData 返回未知
Direct link to getinitdata-returns-unknown

🌐 getInitData returns unknown

在 execute 函数中,getInitData 函数现在返回 unknown 而不是 any。你需要自己为它添加类型。 要简单迁移,将 getInitData() 改为 getInitData<any>()

  createStep({
execute: async ({ getInitData }) => {
- const initData = getInitData();
- if (initData.key === 'value') {}
+ const initData = getInitData<any>();
+ if (initData.key === 'value') {}
},
});
Codemod

你可以使用 Mastra 的 codemod CLI 来自动更新你的代码:

🌐 You can use Mastra's codemod CLI to update your code automatically:

npx @mastra/codemod@latest v1/workflow-get-init-data .

getWorkflowRunslistWorkflowRuns
Direct link to getworkflowruns-to-listworkflowruns

🌐 getWorkflowRuns to listWorkflowRuns

getWorkflowRuns() 方法已重命名为 listWorkflowRuns()。此更改符合 list* 方法返回集合的惯例。

🌐 The getWorkflowRuns() method has been renamed to listWorkflowRuns(). This change aligns with the convention that list* methods return collections.

要迁移,请将方法调用从 getWorkflowRuns 重命名为 listWorkflowRuns

🌐 To migrate, rename method calls from getWorkflowRuns to listWorkflowRuns.

- const runs = await workflow.getWorkflowRuns({ fromDate, toDate });
+ const runs = await workflow.listWorkflowRuns({ fromDate, toDate });
Codemod

你可以使用 Mastra 的 codemod CLI 来自动更新你的代码:

🌐 You can use Mastra's codemod CLI to update your code automatically:

npx @mastra/codemod@latest v1/workflow-list-runs .

输入默认情况下会被验证
Direct link to 输入默认情况下会被验证

🌐 Inputs are validated by default

以前,输入默认不会被验证。validateInputs 标志决定是否验证工作流输入。这个布尔值已被切换为 true。如果你想要旧的行为,或者有不需要验证模式的工作流,请设置 validateInputs: false

🌐 Previously, inputs weren't validated by default. The validateInputs flag determines whether to validate the workflow inputs or not. This boolean has been flipped to true. If you want the old behavior or have workflows whose schemas don't need to be validated, set validateInputs: false.

createWorkflow({
+ options: {
+ validateInputs: false
+ }
})

步骤 suspendPayload 验证
Direct link to step-suspendpayload-validation

🌐 Step suspendPayload validation

步骤 suspendPayload 现在针对已定义 suspendSchema 的步骤进行了验证。这也使用了 validateInputs 标志来决定是否验证 suspendPayload

🌐 Step suspendPayload is now validated for steps that have a suspendSchema defined. This also uses the validateInputs flag to determine whether to validate the suspendPayload or not.

  createStep({
id: "suspend-resume-step",
// ... other step properties
suspendSchema: z.object({
reason: z.string(),
otherReason: z.string()
}),
execute: async ({ suspend, resumeData}) => {
if (!resumeData) {
- return suspend({ reason: "Suspension reason" }); // Missing otherReason
+ return suspend({ reason: "Suspension reason", otherReason: "Other reason" });
}
},
});

分支结果字段现在是可选的
Direct link to 分支结果字段现在是可选的

🌐 Branch result fields are now optional

.branch() 方法现在返回一个模式,其中所有分支输出字段都是可选的。这反映了运行时行为,即每个分支只有在其条件为真时才会执行,因此任何分支的输出都可能未定义。

🌐 The .branch() method now returns a schema where all branch output fields are optional. This reflects the runtime behavior where each branch only executes if its condition is truthy, so outputs from any branch may be undefined.

要进行迁移,请更新任何使用分支输出的代码,以处理可选值。

🌐 To migrate, update any code that consumes branch outputs to handle optional values.

  const workflow = createWorkflow({...})
.branch([
[condition1, stepA], // outputSchema: { result: z.string() }
[condition2, stepB], // outputSchema: { data: z.number() }
])
- // Previously: stepA.result typed as string, stepB.data typed as number
+ // Now: stepA.result typed as string | undefined, stepB.data typed as number | undefined
.then(nextStep);

如果你的代码依赖于非可选类型,请在访问分支输出时添加运行时检查或提供默认值。

🌐 If your code depends on non-optional types, add runtime checks or provide default values when accessing branch outputs.

Run.start()Run.timeTravel() 中的 writableStreamoutputWriter
Direct link to writablestream-to-outputwriter-in-runstart--runtimetravel

🌐 writableStream to outputWriter in Run.start() & Run.timeTravel()

Run.start()Run.timeTravel() 中的 writableStream 参数已被 outputWriter 取代。现在,不再传递 WritableStream,而是传递一个异步回调函数,该函数直接接收每个工作流事件块。

🌐 The writableStream parameter in Run.start() and Run.timeTravel() has been replaced with outputWriter. Instead of passing a WritableStream, you now pass an async callback function that receives each workflow event chunk directly.

此更改简化了 API——不再创建 WritableStream 封装器,而是在回调中直接处理数据块。

🌐 This change simplifies the API - rather than creating a WritableStream wrapper, you handle chunks directly in the callback.

示例: 将工作流事件流式传输到 HTTP 响应(SSE):

  const run = await workflow.createRun();

- const stream = new WritableStream({
- write(chunk) {
- response.write(`data: ${JSON.stringify(chunk)}\n\n`);
- }
- });
- await run.start({ inputData, writableStream: stream });

+ await run.start({
+ inputData,
+ outputWriter: async (chunk) => {
+ response.write(`data: ${JSON.stringify(chunk)}\n\n`);
+ },
+ });
note

传递给步骤 execute 函数的 writer 参数不受此更改影响。它仍然是一个扩展了 WritableStream<unknown> 并提供 .write().custom() 方法的 ToolStream:

🌐 The writer parameter passed to step execute functions is not affected by this change. It remains a ToolStream that extends WritableStream<unknown> and provides .write() and .custom() methods:

createStep({
id: 'my-step',
execute: async ({ writer }) => {
// This API is unchanged
await writer.write({ data: 'some output' });
await writer.custom({ type: 'custom-event', payload: {} });
},
});

setState() 现在是异步的,并且传递的数据已被验证
Direct link to setstate-is-now-async-and-the-data-passed-is-validated

🌐 setState() is now async and the data passed is validated

setState() 函数现在是异步的。传递的数据现在会根据步骤中定义的 stateSchema 进行验证。状态数据验证也使用 validateInputs 标志来确定是否验证状态数据。此外,在调用 setState() 时,你现在可以只传递正在更新的状态数据,而不需要再添加之前的状态扩展 (...state)

🌐 The setState() function is now async. The data passed is now validated against the stateSchema defined in the step. The state data validation also uses the validateInputs flag to determine whether to validate the state data or not. Also, when calling setState(), you can now pass only the state data being updated, instead of adding the previous state spread (...state).

要迁移,请将 setState() 函数更新为异步。

🌐 To migrate, update the setState() function to be async.

- setState({ ...state, sharedCounter: state.sharedCounter + 1 });
+ await setState({ sharedCounter: state.sharedCounter + 1 });
+ // await setState({ ...state, sharedCounter: state.sharedCounter + 1 });
+ // this also works, as the previous state spread remains supported

已移除
Direct link to 已移除

🌐 Removed

streamVNextresumeStreamVNextobserveStreamVNext 方法
Direct link to streamvnext-resumestreamvnext-and-observestreamvnext-methods

🌐 streamVNext, resumeStreamVNext, and observeStreamVNext methods

实验性的 streamVNext()resumeStreamVNext()observeStreamVNext() 方法已被移除。这些方法现在是具有更新事件结构和返回类型的标准实现。

🌐 The experimental streamVNext(), resumeStreamVNext(), and observeStreamVNext() methods have been removed. These methods are now the standard implementation with updated event structures and return types.

要进行迁移,请使用标准的 stream()resumeStream()observeStream() 方法。更新事件类型检查以使用带有工作流前缀的名称,并直接访问流属性。

🌐 To migrate, use the standard stream(), resumeStream(), and observeStream() methods. Update event type checks to use workflow-prefixed names and access stream properties directly.

详情请参见Run.stream()Run.resumeStream()Run.observeStream()

🌐 See Run.stream(), Run.resumeStream(), and Run.observeStream() for details.

Codemod

你可以使用 Mastra 的 codemod CLI 来自动更新你的代码:

🌐 You can use Mastra's codemod CLI to update your code automatically:

npx @mastra/codemod@latest v1/workflow-stream-vnext .

suspendsetState 在步骤条件函数参数中不可用
Direct link to suspend-and-setstate-are-not-available-in-step-condition-functions-parameters

🌐 suspend and setState are not available in step condition functions parameters

suspendsetState 函数在步骤条件函数参数中不可用。

🌐 The suspend and setState functions are not available in step condition functions parameters.

要迁移,请在步骤执行函数中使用 suspend 函数。

🌐 To migrate, use the suspend function in the step execute function instead.

.dowhile(step, async ({ suspend, state, setState }) => {
- setState({...state, updatedState: "updated state"})
- await suspend({ reason: "Suspension reason" });
+ // Use the suspend/setState in the step execute function instead
});

对于 dountilbranch 条件函数的参数也是一样的。

🌐 This is the same for dountil and branch condition functions parameters.

旧版工作流导出
Direct link to 旧版工作流导出

🌐 Legacy workflows export

./workflows/legacy 导出路径已从 @mastra/core 中移除。不再支持旧有工作流程。

🌐 The ./workflows/legacy export path has been removed from @mastra/core. Legacy workflows are no longer supported.

要进行迁移,请使用新的工作流 API。旧工作流没有直接迁移路径。

🌐 To migrate, use the new workflow API. There is no direct migration path from legacy workflows.

- import { LegacyWorkflow } from '@mastra/core/workflows/legacy';
+ // Legacy workflows are no longer supported
+ // Migrate to the new workflow API

WorkflowRunOutput 中的 pipeThroughpipeTo 方法
Direct link to pipethrough-and-pipeto-methods-from-workflowrunoutput

🌐 pipeThrough and pipeTo methods from WorkflowRunOutput

WorkflowRunOutput 上的 pipeThrough()pipeTo() 方法已被弃用。这些方法仍然可用,但会在控制台显示警告。

🌐 The pipeThrough() and pipeTo() methods on WorkflowRunOutput are deprecated. These methods still work but show console warnings.

要进行迁移,请使用 fullStream 属性,而不是直接在运行输出上调用方法。

🌐 To migrate, use the fullStream property instead of calling methods directly on the run output.

  const run = await workflow.createRun({ input: { ... } });
- await run.pipeTo(writableStream);
- const transformed = run.pipeThrough(transformStream);
+ await run.fullStream.pipeTo(writableStream);
+ const transformed = run.fullStream.pipeThrough(transformStream);

观看事件 API
Direct link to 观看事件 API

🌐 Watch events API

传统的监视事件已被移除,并在 v2 事件 API 上进行了整合。watch() 方法及相关的监视端点不再可用。

🌐 Legacy watch events have been removed and consolidated on the v2 events API. The watch() method and related watch endpoints are no longer available.

要进行迁移,请使用工作流事件 API 或流式传输,而不是使用监听事件。

🌐 To migrate, use the workflow events API or streaming instead of watch events.

- const workflow = mastraClient.getWorkflow('my-workflow');
- const run = await workflow.createRun();
- await run.watch((event) => {
- console.log('Step completed:', event);
- });

+ const workflow = mastraClient.getWorkflow('my-workflow');
+ const run = await workflow.createRun();
+ const stream = await run.stream({ inputData: { ... } });
+ for await (const chunk of stream) {
+ console.log('Step completed:', chunk);
+ }

waitForEvent API
Direct link to waitforevent-api

waitForEvent API 已从工作流中移除。请改用挂起/恢复 API。

🌐 The waitForEvent API has been removed from workflows. Use the suspend/resume API instead.

要进行迁移,请使用挂起/恢复 API 等待工作流执行的里程碑。

🌐 To migrate, use suspend/resume API for waiting on workflow execution milestones.

- workflow.waitForEvent('step-complete', step1).commit();
+ workflow.then(step1).commit();
+ // Use suspend/resume API instead, in step1 execute function
createStep({
- execute: async (inputData, context) => {
- // ... execution logic
- }
+ execute: async (inputData, context) => {
+ if (!context.resumeData) {
+ return context.suspend({})
+ }
+ }
});
+
+ // after workflow is suspended, you can resume it
+ const result = await run.start({ inputData: { ... } });
+ if (result.status === 'suspended') {
+ const resumedResult = await run.resume({
+ resumeData: {
+ event: 'step-complete',
+ },
+ step: 'step1',
+ });
+ }

sendEvent API
Direct link to sendevent-api

sendEvent API 已从工作流中移除。请改用挂起/恢复 API。

🌐 The sendEvent API has been removed from workflows. Use the suspend/resume API instead.