时间旅行
🌐 Time Travel
时间旅行允许你从任何特定步骤重新执行工作流,可以使用存储的快照数据或你提供的自定义上下文。这对于调试失败的工作流、使用不同输入测试各个步骤,或在不重新运行整个工作流的情况下从错误中恢复非常有用。 你还可以使用时间旅行执行尚未运行的工作流,从任何特定步骤开始。
🌐 Time travel allows you to re-execute a workflow starting from any specific step, using either stored snapshot data or custom context you provide. This is useful for debugging failed workflows, testing individual steps with different inputs, or recovering from errors without re-running the entire workflow. You can also use time travel to execute a workflow that has not been run yet, starting from any specific step.
时间旅行是如何运作的Direct link to 时间旅行是如何运作的
🌐 How time travel works
当你在工作流运行上调用 timeTravel() 时:
🌐 When you call timeTravel() on a workflow run:
- 工作流程会从存储中加载现有快照(如果可用)
- 目标步骤之前的步骤结果是从快照或提供的上下文中重建的
- 执行从指定步骤开始,使用提供的或重建的输入数据
- 从那时起,工作流程将继续直到完成
时间旅行需要配置存储,因为它依赖于持久化的工作流快照。
🌐 Time travel requires storage to be configured since it relies on persisted workflow snapshots.
基本用法Direct link to 基本用法
🌐 Basic usage
使用 run.timeTravel() 从特定步骤重新执行工作流:
🌐 Use run.timeTravel() to re-execute a workflow from a specific step:
import { mastra } from "./mastra";
const workflow = mastra.getWorkflow("myWorkflow");
const run = await workflow.createRun();
const result = await run.timeTravel({
step: "step2",
inputData: { previousStepResult: "custom value" },
});
指定目标步骤Direct link to 指定目标步骤
🌐 Specifying the target step
你可以使用步骤引用或步骤 ID 指定目标步骤:
🌐 You can specify the target step using either a step reference or a step ID:
使用步骤引用Direct link to 使用步骤引用
🌐 Using step reference
const result = await run.timeTravel({
step: step2,
inputData: { value: 10 },
});
使用步骤IDDirect link to 使用步骤ID
🌐 Using step ID
const result = await run.timeTravel({
step: "step2",
inputData: { value: 10 },
});
嵌套工作流步骤Direct link to 嵌套工作流步骤
🌐 Nested workflow steps
对于嵌套工作流中的步骤,使用点标记法、步骤 ID 数组或步骤引用数组:
🌐 For steps inside nested workflows, use dot notation, an array of step IDS or an array of step references:
// Using dot notation
const result = await run.timeTravel({
step: "nestedWorkflow.step3",
inputData: { value: 10 },
});
// Using array of step IDs
const result = await run.timeTravel({
step: ["nestedWorkflow", "step3"],
inputData: { value: 10 },
});
// Using array of step references
const result = await run.timeTravel({
step: [nestedWorkflow, step3],
inputData: { value: 10 },
});
提供执行上下文Direct link to 提供执行上下文
🌐 Providing execution context
在时间旅行时,你可以提供上下文来指定前一步的状态:
🌐 You can provide context to specify the state of previous steps when time traveling:
const result = await run.timeTravel({
step: "step2",
context: {
step1: {
status: "success",
payload: { value: 0 },
output: { step1Result: 2 },
startedAt: Date.now(),
endedAt: Date.now(),
},
},
});
上下文对象包含按步骤 ID 键控的步骤结果。每个步骤结果包括:
🌐 The context object contains step results keyed by step ID. Each step result includes:
status:步骤的执行状态(success、failed、suspended)payload:传递给该步骤的输入数据output:步骤的输出数据(针对成功的步骤)startedAt:步骤开始的时间戳endedAt:步骤结束的时间戳(针对已完成的步骤)suspendPayload:传递给suspend()的数据(用于暂停步骤)resumePayload:传递给resume()的数据(用于恢复步骤)
重新运行失败的工作流Direct link to 重新运行失败的工作流
🌐 Re-running failed workflows
时间旅行对于调试和从失败的工作流执行中恢复特别有用:
🌐 Time travel is particularly useful for debugging and recovering from failed workflow executions:
const workflow = mastra.getWorkflow("myWorkflow");
const run = await workflow.createRun();
// Initial run fails at step2
const failedResult = await run.start({
inputData: { value: 1 },
});
if (failedResult.status === "failed") {
// Re-run from step2 with corrected input
const recoveredResult = await run.timeTravel({
step: "step2",
inputData: { step1Result: 5 }, // Provide corrected input
});
}
带有挂起工作流的时间旅行Direct link to 带有挂起工作流的时间旅行
🌐 Time travel with suspended workflows
你可以通过时间旅行从之前的步骤恢复挂起的工作流程:
🌐 You can time travel to resume a suspended workflow from an earlier step:
const run = await workflow.createRun();
// Start workflow - suspends at promptAgent step
const initialResult = await run.start({
inputData: { input: "test" },
});
if (initialResult.status === "suspended") {
// Time travel back to an earlier step with resume data
const result = await run.timeTravel({
step: "getUserInput",
resumeData: {
userInput: "corrected input",
},
});
}
流时间旅行结果Direct link to 流时间旅行结果
🌐 Streaming time travel results
在时间旅行执行期间使用 timeTravelStream() 接收流事件:
🌐 Use timeTravelStream() to receive streaming events during time travel execution:
const run = await workflow.createRun();
const stream = run.timeTravelStream({
step: "step2",
inputData: { value: 10 },
});
for await (const event of stream.fullStream) {
console.log(event.type, event.payload);
}
const result = await stream.result;
if (result.status === "success") {
console.log(result.result);
}
具有初始状态的时间旅行Direct link to 具有初始状态的时间旅行
🌐 Time travel with initial state
在时间旅行时,你可以提供初始状态以设置工作流级别状态:
🌐 You can provide initial state when time traveling to set workflow-level state:
const result = await run.timeTravel({
step: "step2",
inputData: { value: 10 },
initialState: {
counter: 5,
metadata: { source: "time-travel" },
},
});
错误处理Direct link to 错误处理
🌐 Error handling
时间旅行在特定情况下会出错:
🌐 Time travel throws errors in specific situations:
运行工作流Direct link to 运行工作流
🌐 Running workflow
你不能时间旅行到正在运行的工作流:
🌐 You cannot time travel to a workflow that is currently running:
try {
await run.timeTravel({ step: "step2" });
} catch (error) {
// "This workflow run is still running, cannot time travel"
}
无效的步骤IDDirect link to 无效的步骤ID
🌐 Invalid step ID
如果工作流中不存在目标步骤,时间旅行会抛出异常:
🌐 Time travel throws if the target step doesn't exist in the workflow:
try {
await run.timeTravel({ step: "nonExistentStep" });
} catch (error) {
// "Time travel target step not found in execution graph: 'nonExistentStep'. Verify the step id/path."
}
输入数据无效Direct link to 输入数据无效
🌐 Invalid input data
当启用 validateInputs 时,时间旅行会根据步骤的架构验证输入数据:
🌐 When validateInputs is enabled, time travel validates the input data against the step's schema:
try {
await run.timeTravel({
step: "step2",
inputData: { invalidField: "value" },
});
} catch (error) {
// "Invalid inputData: \n- step1Result: Required"
}
嵌套工作流上下文Direct link to 嵌套工作流上下文
🌐 Nested workflows context
在时间旅行进入嵌套工作流时,你可以为父工作流和嵌套工作流步骤提供上下文:
🌐 When time traveling into a nested workflow, you can provide context for both the parent and nested workflow steps:
const result = await run.timeTravel({
step: "nestedWorkflow.step3",
context: {
step1: {
status: "success",
payload: { value: 0 },
output: { step1Result: 2 },
startedAt: Date.now(),
endedAt: Date.now(),
},
nestedWorkflow: {
status: "running",
payload: { step1Result: 2 },
startedAt: Date.now(),
},
},
nestedStepsContext: {
nestedWorkflow: {
step2: {
status: "success",
payload: { step1Result: 2 },
output: { step2Result: 3 },
startedAt: Date.now(),
endedAt: Date.now(),
},
},
},
});
用例Direct link to 用例
🌐 Use cases
调试失败步骤Direct link to 调试失败步骤
🌐 Debugging failed steps
使用相同或修改后的输入重新运行失败的步骤以诊断问题:
🌐 Re-run a failed step with the same or modified input to diagnose issues:
const result = await run.timeTravel({
step: failedStepId,
context: originalContext, // Use context from the failed run
});
在新的工作流运行上测试步骤逻辑Direct link to 在新的工作流运行上测试步骤逻辑
🌐 Testing step logic on new workflow run
在新的工作流运行中使用特定输入测试各个步骤,适用于在不从头开始执行工作流的情况下测试步骤逻辑。
🌐 Test individual steps with specific inputs on a new workflow run, useful for testing a step logic without starting workflow execution from the beginning.
const result = await run.timeTravel({
step: "processData",
inputData: { testData: "specific test case" },
});
从短暂故障中恢复Direct link to 从短暂故障中恢复
🌐 Recovering from transient failures
重新运行因临时问题(网络错误、速率限制)而失败的步骤:
🌐 Re-run steps that failed due to temporary issues (network errors, rate limits):
// After fixing the external service issue
const result = await run.timeTravel({
step: "callExternalApi",
inputData: savedInputData,
});
相关Direct link to 相关
🌐 Related