Skip to main content

运行.timeTravelStream()

🌐 Run.timeTravelStream()

.timeTravelStream() 方法可以从任意特定步骤重新执行工作流,并支持流式事件。这使你在时间旅行执行期间能够实时获取更新,同时保持对每个步骤进展的完整可见性。

🌐 The .timeTravelStream() method re-executes a workflow starting from any specific step with streaming events. This allows you to receive real-time updates during time travel execution while maintaining full visibility into each step's progress.

使用示例
Direct link to 使用示例

🌐 Usage example

const run = await workflow.createRun();

const output = run.timeTravelStream({
step: "step2",
inputData: { value: 10 },
});

// Process events as they arrive
for await (const event of output.fullStream) {
console.log(event.type, event.payload);
}

// Get the final result
const result = await output.result;

参数
Direct link to 参数

🌐 Parameters

所有参数与Run.timeTravel()相同。详细的参数文档请参见timeTravel参考

🌐 All parameters are the same as Run.timeTravel(). See the timeTravel reference for detailed parameter documentation.

返回
Direct link to 返回

🌐 Returns

output:

WorkflowRunOutput<WorkflowResult<TState, TInput, TOutput, TSteps>>
An object containing both the stream and result promise

output.fullStream:

ReadableStream<WorkflowStreamEvent>
A readable stream that emits workflow events as execution progresses

output.result:

Promise<WorkflowResult<TState, TInput, TOutput, TSteps>>
A promise that resolves to the final workflow execution result

output.traceId?:

string
The trace ID associated with this execution when Tracing is enabled

流事件
Direct link to 流事件

🌐 Stream events

在执行过程中,流会发出各种工作流事件:

🌐 The stream emits various workflow events during execution:

  • workflow-step-start:步骤开始执行时发出
  • workflow-step-finish:步骤成功完成时发出
  • workflow-step-error:当步骤遇到错误时触发
  • workflow-step-suspended:当步骤暂停时发出
  • 取决于步骤类型的附加事件(代理、工具等)

扩展使用示例
Direct link to 扩展使用示例

🌐 Extended usage examples

在时间旅行过程中处理事件
Direct link to 在时间旅行过程中处理事件

🌐 Processing events during time travel

const run = await workflow.createRun();

const output = run.timeTravelStream({
step: "step2",
inputData: { value: 10 },
});

for await (const event of output.fullStream) {
switch (event.type) {
case "workflow-step-start":
console.log(`Starting step: ${event.payload.stepName}`);
break;
case "workflow-step-finish":
console.log(`Completed step: ${event.payload.stepName}`);
break;
case "workflow-step-error":
console.error(`Error in step: ${event.payload.stepName}`, event.payload.error);
break;
}
}

const result = await output.result;
console.log("Time travel completed:", result);

带有上下文的时间旅行流
Direct link to 带有上下文的时间旅行流

🌐 Time travel stream with context

const output = run.timeTravelStream({
step: "step2",
context: {
step1: {
status: "success",
payload: { value: 0 },
output: { step1Result: 2 },
startedAt: Date.now(),
endedAt: Date.now(),
},
},
});

for await (const event of output.fullStream) {
// Handle events
console.log(event);
}

const result = await output.result;

带嵌套工作流的时间旅行流
Direct link to 带嵌套工作流的时间旅行流

🌐 Time travel stream with nested workflows

const output = run.timeTravelStream({
step: ["nestedWorkflow", "step3"],
inputData: { value: 10 },
nestedStepsContext: {
nestedWorkflow: {
step2: {
status: "success",
payload: { step1Result: 2 },
output: { step2Result: 3 },
startedAt: Date.now(),
endedAt: Date.now(),
},
},
},
});

for await (const event of output.fullStream) {
console.log(event.type, event.payload);
}

const result = await output.result;

注意
Direct link to 注意

🌐 Notes

  • 当时间旅行执行完成或遇到错误时,流会自动关闭
  • 你可以在工作流仍在执行时处理来自流的事件
  • result 承诺只有在所有步骤完成后才会解决
  • 流事件遵循与常规工作流流式处理相同的格式
  • 时间旅行流需要配置存储,因为它依赖于持久化的工作流快照

🌐 Related