错误处理
🌐 Error Handling
Mastra 工作流通过执行后的结果状态检查、针对暂时性故障的重试策略以及用于集中错误记录或报警的生命周期回调来支持错误处理。
🌐 Mastra workflows support error handling through result status checks after execution, retry policies for transient failures, and lifecycle callbacks for centralized error logging or alerting.
处理工作流程结果Direct link to 处理工作流程结果
🌐 Handling workflow results
当你运行工作流时,结果对象会包含状态以及发生的任何错误。
🌐 When you run a workflow, the result object contains the status and any errors that occurred.
检查结果状态Direct link to 检查结果状态
🌐 Checking the result status
import { mastra } from "./mastra";
const workflow = mastra.getWorkflow("myWorkflow");
const run = await workflow.createRun();
const result = await run.start({ inputData: { value: "test" } });
switch (result.status) {
case 'success':
console.log('Workflow completed:', result.result);
break;
case 'failed':
console.error('Workflow failed:', result.error);
break;
case 'suspended':
console.log('Workflow suspended, waiting for resume');
break;
}
结果对象结构Direct link to 结果对象结构
🌐 Result object structure
结果对象包含:
🌐 The result object contains:
status- 工作流状态:'success'、'failed'、'suspended'或'tripwire'result- 工作流输出(当状态为'success'时)error- 错误详情(当状态为'failed'时)steps- 各步骤结果及其状态和输出
访问步骤结果Direct link to 访问步骤结果
🌐 Accessing step results
你可以检查每个步骤的结果,以了解故障发生的位置:
🌐 You can inspect individual step results to understand where a failure occurred:
const result = await run.start({ inputData: { value: "test" } });
if (result.status === 'failed') {
// Find which step failed
for (const [stepId, stepResult] of Object.entries(result.steps)) {
if (stepResult.status === 'failed') {
console.error(`Step ${stepId} failed:`, stepResult.error);
}
}
}
生命周期回调Direct link to 生命周期回调
🌐 Lifecycle callbacks
在需要处理工作流完成但不需等待结果的场景中——例如后台任务、即发即忘的工作流或集中日志记录——你可以使用生命周期回调。
🌐 For scenarios where you need to handle workflow completion without awaiting the result—such as background jobs, fire-and-forget workflows, or centralized logging—you can use lifecycle callbacks.
onFinishDirect link to onFinish
当工作流以任何状态(成功、失败、暂停或触发器)完成时调用:
🌐 Called when a workflow completes with any status (success, failed, suspended, or tripwire):
import { createWorkflow } from "@mastra/core/workflows";
import { z } from "zod";
const orderWorkflow = createWorkflow({
id: 'order-processing',
inputSchema: z.object({ orderId: z.string() }),
outputSchema: z.object({ orderId: z.string(), status: z.string() }),
options: {
onFinish: async (result) => {
if (result.status === 'success') {
await db.updateOrderStatus(result.result.orderId, result.status);
}
await analytics.track('workflow_completed', {
workflowId: 'order-processing',
status: result.status,
});
},
},
});
onFinish 回调接收:
🌐 The onFinish callback receives:
status- 工作流状态result- 工作流输出(成功时)error- 错误详情(失败时)steps- 各步骤结果tripwire- 触发器信息(如果状态为'tripwire')runId- 此工作流运行的唯一标识符workflowId- 工作流的标识符resourceId- 可选资源标识符(如果在创建运行时提供)getInitData<any>()- 返回初始输入数据的函数mastra- Mastra 实例(如果工作流已在 Mastra 注册)requestContext- 请求范围的上下文数据logger- 工作流的日志记录实例state- 工作流的当前状态对象
onErrorDirect link to onError
仅在工作流失败时调用(状态为 'failed' 或 'tripwire'):
🌐 Called only when a workflow fails (status is 'failed' or 'tripwire'):
import { createWorkflow } from "@mastra/core/workflows";
import { z } from "zod";
const paymentWorkflow = createWorkflow({
id: 'payment-processing',
inputSchema: z.object({ amount: z.number() }),
outputSchema: z.object({ transactionId: z.string() }),
options: {
onError: async (errorInfo) => {
await alertService.notify({
channel: 'payments-alerts',
message: `Payment workflow failed: ${errorInfo.error?.message}`,
});
await errorTracker.capture(errorInfo.error);
},
},
});
onError 回调接收:
🌐 The onError callback receives:
status- 要么是'failed',要么是'tripwire'error- 错误详情steps- 各步骤结果tripwire- 触发器信息(如果状态为'tripwire')runId- 此工作流运行的唯一标识符workflowId- 工作流的标识符resourceId- 可选资源标识符(如果在创建运行时提供)getInitData<any>()- 返回初始输入数据的函数mastra- Mastra 实例(如果工作流已在 Mastra 注册)requestContext- 请求范围的上下文数据logger- 工作流的日志记录实例state- 工作流的当前状态对象
同时使用回调Direct link to 同时使用回调
🌐 Using both callbacks
你可以同时使用两个回调:
🌐 You can use both callbacks together:
import { createWorkflow } from "@mastra/core/workflows";
import { z } from "zod";
const pipelineWorkflow = createWorkflow({
id: 'data-pipeline',
inputSchema: z.object({ source: z.string() }),
outputSchema: z.object({ recordsProcessed: z.number() }),
options: {
onFinish: async (result) => {
// Always log completion
await logger.info('Pipeline completed', { status: result.status });
},
onError: async (errorInfo) => {
// Alert on failures
await pagerDuty.alert('Data pipeline failed', errorInfo.error);
},
},
});
回调中的错误处理Direct link to 回调中的错误处理
🌐 Error handling in callbacks
回调内部抛出的错误会被捕获和记录——它们不会影响工作流程的结果或导致工作流程失败。这确保了回调问题不会在生产环境中破坏你的工作流程。
🌐 Errors thrown inside callbacks are caught and logged—they will not affect the workflow result or cause it to fail. This ensures that callback issues don't break your workflows in production.
options: {
onFinish: async (result) => {
// If this throws, it's logged but the workflow result is unchanged
await externalService.notify(result);
},
}
重试Direct link to 重试
🌐 Retries
Mastra 对因暂时性错误而失败的工作流或步骤具有重试机制,例如当步骤与可能暂时不可用的外部服务或资源交互时。
🌐 Mastra has a retry mechanism for workflows or steps that fail due to transient errors, for example when steps interact with external services or resources that may be temporarily unavailable.
工作流级别使用 retryConfigDirect link to workflow-level-using-retryconfig
🌐 Workflow-level using retryConfig
你可以在工作流级别配置重试,这将适用于工作流中的所有步骤:
🌐 You can configure retries at the workflow level, which applies to all steps in the workflow:
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
export const testWorkflow = createWorkflow({
retryConfig: {
attempts: 5,
delay: 2000
}
})
.then(step1)
.commit();
使用 retries 的步骤级Direct link to step-level-using-retries
🌐 Step-level using retries
你可以使用 retries 属性为单独的步骤配置重试。这会覆盖该步骤特定的工作流级别重试配置:
🌐 You can configure retries for individual steps using the retries property. This overrides the workflow-level retry configuration for that specific step:
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({
execute: async () => {
const response = await fetch('example-url');
if (!response.ok) {
throw new Error('Error');
}
return {
value: ""
};
},
retries: 3
});
条件分支Direct link to 条件分支
🌐 Conditional branching
你可以使用条件逻辑根据前一步骤的成功或失败创建替代工作流程路径:
🌐 You can create alternative workflow paths based on the success or failure of previous steps using conditional logic:
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({
execute: async () => {
try {
const response = await fetch('example-url');
if (!response.ok) {
throw new Error('error');
}
return {
status: "ok"
};
} catch (error) {
return {
status: "error"
};
}
}
});
const step2 = createStep({...});
const fallback = createStep({...});
export const testWorkflow = createWorkflow({})
.then(step1)
.branch([
[async ({ inputData: { status } }) => status === "ok", step2],
[async ({ inputData: { status } }) => status === "error", fallback]
])
.commit();
检查上一步结果Direct link to 检查上一步结果
🌐 Check previous step results
使用 getStepResult() 检查上一步的结果。
🌐 Use getStepResult() to inspect a previous step’s results.
import { createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
const step2 = createStep({
execute: async ({ getStepResult }) => {
const step1Result = getStepResult(step1);
return {
value: ""
};
}
});
使用 bail() 提前退出Direct link to exiting-early-with-bail
🌐 Exiting early with bail()
在某个步骤中使用 bail() 可以提前以成功结果退出。这会将提供的有效负载作为步骤输出返回,并结束工作流执行。
🌐 Use bail() in a step to exit early with a successful result. This returns the provided payload as the step output and ends workflow execution.
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({
id: 'step1',
execute: async ({ bail }) => {
return bail({ result: 'bailed' });
},
inputSchema: z.object({ value: z.string() }),
outputSchema: z.object({ result: z.string() }),
});
export const testWorkflow = createWorkflow({...})
.then(step1)
.commit();
使用 Error() 提前退出Direct link to exiting-early-with-error
🌐 Exiting early with Error()
在一个步骤中使用 throw new Error() 以错误方式退出。
🌐 Use throw new Error() in a step to exit with an error.
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({
id: 'step1',
execute: async () => {
throw new Error('error');
},
inputSchema: z.object({ value: z.string() }),
outputSchema: z.object({ result: z.string() }),
});
export const testWorkflow = createWorkflow({...})
.then(step1)
.commit();
使用 stream() 监控错误Direct link to monitor-errors-with-stream
🌐 Monitor errors with stream()
你可以使用 stream 来监控工作流中的错误:
🌐 You can monitor workflows for errors using stream:
import { mastra } from "../src/mastra";
const workflow = mastra.getWorkflow("testWorkflow");
const run = await workflow.createRun();
const stream = await run.stream({
inputData: {
value: "initial data",
},
});
for await (const chunk of stream.stream) {
console.log(chunk.payload.output.stats);
}
相关Direct link to 相关
🌐 Related