Skip to main content

工作流类

🌐 Workflow Class

Workflow 类使你能够为复杂操作序列创建状态机,并支持条件分支和数据验证。

🌐 The Workflow class enables you to create state machines for complex sequences of operations with conditional branching and data validation.

使用示例
Direct link to 使用示例

🌐 Usage example

src/mastra/workflows/test-workflow.ts
import { createWorkflow } from "@mastra/core/workflows";
import { z } from "zod";

export const workflow = createWorkflow({
id: "test-workflow",
inputSchema: z.object({
value: z.string(),
}),
outputSchema: z.object({
value: z.string(),
}),
});

构造函数参数
Direct link to 构造函数参数

🌐 Constructor parameters

id:

string
Unique identifier for the workflow

inputSchema:

z.ZodType<any>
Zod schema defining the input structure for the workflow

outputSchema:

z.ZodType<any>
Zod schema defining the output structure for the workflow

stateSchema?:

z.ZodObject<any>
Optional Zod schema for the workflow state. Automatically injected when using Mastra's state system. If not specified, type is 'any'.

options?:

WorkflowOptions
Optional options for the workflow

WorkflowOptions
Direct link to WorkflowOptions

tracingPolicy?:

TracingPolicy
Optional tracing policy for the workflow

validateInputs?:

boolean
= true
Optional flag to determine whether to validate the workflow inputs. This also applies default values from zodSchemas on the workflow/step input/resume data. If input/resume data validation fails on start/resume, the workflow will not start/resume, it throws an error instead. If input data validation fails on a step execution, the step fails, causing the workflow to fail and the error is returned.

shouldPersistSnapshot?:

(params: { stepResults: Record<string, StepResult<any, any, any, any>>; workflowStatus: WorkflowRunStatus }) => boolean
= () => true
Optional flag to determine whether to persist the workflow snapshot

onFinish?:

(result: WorkflowFinishCallbackResult) => void | Promise<void>
Callback invoked when workflow completes with any status (success, failed, suspended, tripwire). Receives the workflow result including status, output, error, and step results. Errors thrown in this callback are caught and logged, not propagated.

onError?:

(errorInfo: WorkflowErrorCallbackInfo) => void | Promise<void>
Callback invoked only when workflow fails (failed or tripwire status). Receives error details and step results. Errors thrown in this callback are caught and logged, not propagated.

WorkflowFinishCallbackResult
Direct link to WorkflowFinishCallbackResult

传递给 onFinish 回调的结果对象。

🌐 The result object passed to onFinish callbacks.

status:

WorkflowRunStatus
The workflow status: 'success', 'failed', 'suspended', or 'tripwire'

result?:

any
The workflow output (when status is 'success')

error?:

SerializedError
Error details (when status is 'failed')

steps:

Record<string, StepResult>
Individual step results with their status and output

tripwire?:

StepTripwireInfo
Tripwire information (when status is 'tripwire')

runId:

string
The unique identifier for this workflow run

workflowId:

string
The workflow's identifier

resourceId?:

string
Optional resource identifier (if provided when creating the run)

getInitData:

() => any
Function that returns the initial input data passed to the workflow

mastra?:

Mastra
The Mastra instance (if workflow is registered with Mastra)

requestContext:

RequestContext
Request-scoped context data

logger:

IMastraLogger
The workflow's logger instance

state:

Record<string, any>
The workflow's current state object

WorkflowErrorCallbackInfo
Direct link to WorkflowErrorCallbackInfo

传递给 onError 回调的错误信息对象。

🌐 The error info object passed to onError callbacks.

status:

'failed' | 'tripwire'
The workflow status (either 'failed' or 'tripwire')

error?:

SerializedError
Error details

steps:

Record<string, StepResult>
Individual step results with their status and output

tripwire?:

StepTripwireInfo
Tripwire information (when status is 'tripwire')

runId:

string
The unique identifier for this workflow run

workflowId:

string
The workflow's identifier

resourceId?:

string
Optional resource identifier (if provided when creating the run)

getInitData:

() => any
Function that returns the initial input data passed to the workflow

mastra?:

Mastra
The Mastra instance (if workflow is registered with Mastra)

requestContext:

RequestContext
Request-scoped context data

logger:

IMastraLogger
The workflow's logger instance

state:

Record<string, any>
The workflow's current state object

以初始状态运行
Direct link to 以初始状态运行

🌐 Running with initial state

在启动工作流运行时,你可以传递 initialState 来设置工作流状态的初始值:

🌐 When starting a workflow run, you can pass initialState to set the starting values for the workflow's state:

const run = await workflow.createRun();

const result = await run.start({
inputData: { value: "hello" },
initialState: {
counter: 0,
items: [],
},
});

initialState 对象应与工作流的 stateSchema 中定义的结构相匹配。更多详细信息,请参见 工作流状态

🌐 The initialState object should match the structure defined in the workflow's stateSchema. See Workflow State for more details.

工作流程状态
Direct link to 工作流程状态

🌐 Workflow status

工作流的 status 表示其当前的执行状态。可能的取值为:

🌐 A workflow's status indicates its current execution state. The possible values are:

success:

string
All steps finished executing successfully, with a valid result output

failed:

string
Workflow encountered an error during execution, with error details available

suspended:

string
Workflow execution is paused waiting for resume, with suspended step information

tripwire:

string
Workflow was terminated by a processor tripwire. This occurs when an agent step within the workflow triggers a tripwire (e.g., content was blocked by a guardrail). The tripwire information is available on the result.

处理绊线状态
Direct link to 处理绊线状态

🌐 Handling tripwire status

当工作流包含一个触发警报的代理步骤时,工作流将返回 status: 'tripwire' 并包含警报详细信息:

🌐 When a workflow contains an agent step that triggers a tripwire, the workflow returns with status: 'tripwire' and includes tripwire details:

const run = await workflow.createRun();
const result = await run.start({ inputData: { message: "Hello" } });

if (result.status === "tripwire") {
console.log("Workflow terminated by tripwire:", result.tripwire?.reason);
console.log("Processor ID:", result.tripwire?.processorId);
console.log("Retry requested:", result.tripwire?.retry);
}

这与 status: 'failed' 不同,后者表示意外错误。触发器状态意味着处理器故意停止了执行(例如,用于内容审核)。

🌐 This is distinct from status: 'failed' which indicates an unexpected error. A tripwire status means a processor intentionally stopped execution (e.g., for content moderation).

🌐 Related