工作流流式处理
🌐 Workflow streaming
Mastra 中的工作流流式处理使工作流在执行过程中可以发送增量结果,而无需等到完成。这使你能够将部分进度、中间状态或渐进数据直接呈现给用户或上游代理和工作流。
🌐 Workflow streaming in Mastra enables workflows to send incremental results while they execute, rather than waiting until completion. This allows you to surface partial progress, intermediate states, or progressive data directly to users or upstream agents and workflows.
流可以通过两种主要方式进行写入:
🌐 Streams can be written to in two main ways:
- 在工作流步骤内部:每个工作流步骤都会接收一个
writer参数,它是一个可写流,你可以在执行过程中用它来推送更新。 - 来自代理流:你也可以将代理的
stream输出直接传递到工作流步骤的写入器中,这使得将代理的响应链接到工作流结果变得容易,无需额外的辅助代码。
通过将可写工作流流与代理流结合,你可以对中间结果如何流经系统并进入用户体验进行细粒度控制。
🌐 By combining writable workflow streams with agent streaming, you gain fine-grained control over how intermediate results flow through your system and into the user experience.
使用 writer 参数Direct link to using-the-writer-argument
🌐 Using the writer argument
writer 参数会传递给工作流步骤的 execute 函数,并可用于向活动流中发送自定义事件、数据或值。这使得工作流步骤在执行过程中仍然可以提供中间结果或状态更新。
🌐 The writer argument is passed to a workflow step's execute function and can be used to emit custom events, data, or values into the active stream. This enables workflow steps to provide intermediate results or status updates while execution is still in progress.
你必须 await 对 writer.write(...) 的调用,否则你将锁定流并收到 WritableStream is locked 错误。
🌐 You must await the call to writer.write(...) or else you will lock the stream and get a WritableStream is locked error.
import { createStep } from "@mastra/core/workflows";
export const testStep = createStep({
execute: async ({ inputData, writer }) => {
const { value } = inputData;
await writer?.write({
type: "custom-event",
status: "pending"
});
const response = await fetch(...);
await writer?.write({
type: "custom-event",
status: "success"
});
return {
value: ""
};
},
});
检查工作流流数据Direct link to 检查工作流流数据
🌐 Inspecting workflow stream payloads
写入流的事件会包含在发出的数据块中。这些数据块可以被检查,以访问任何自定义字段,例如事件类型、中间值或特定步骤的数据。
🌐 Events written to the stream are included in the emitted chunks. These chunks can be inspected to access any custom fields, such as event types, intermediate values, or step-specific data.
const testWorkflow = mastra.getWorkflow("testWorkflow");
const run = await testWorkflow.createRun();
const stream = await run.stream({
inputData: {
value: "initial data",
},
});
for await (const chunk of stream) {
console.log(chunk);
}
if (result!.status === "suspended") {
// if the workflow is suspended, we can resume it with the resumeStream method
const resumedStream = await run.resumeStream({
resumeData: { value: "resume data" },
});
for await (const chunk of resumedStream) {
console.log(chunk);
}
}
恢复中断的工作流程Direct link to 恢复中断的工作流程
🌐 Resuming an interrupted workflow stream
如果工作流流由于任何原因被关闭或中断,你可以使用 resumeStream 方法恢复它。这将返回一个新的 ReadableStream,你可以用它来观察工作流事件。
🌐 If a workflow stream is closed or interrupted for any reason, you can resume it with the resumeStream method. This will return a new ReadableStream that you can use to observe the workflow events.
const newStream = await run.resumeStream();
for await (const chunk of newStream) {
console.log(chunk);
}
使用代理的工作流程Direct link to 使用代理的工作流程
🌐 Workflow using an agent
将代理的 textStream 输出传入工作流步骤的 writer。这会流式传输部分输出,Mastra 会自动将代理的使用情况汇总到工作流运行中。
🌐 Pipe an agent's textStream to the workflow step's writer. This streams partial output, and Mastra automatically aggregates the agent's usage into the workflow run.
import { createStep } from "@mastra/core/workflows";
import { z } from "zod";
export const testStep = createStep({
execute: async ({ inputData, mastra, writer }) => {
const { city } = inputData;
const testAgent = mastra?.getAgent("testAgent");
const stream = await testAgent?.stream(`What is the weather in ${city}$?`);
await stream!.textStream.pipeTo(writer!);
return {
value: await stream!.text,
};
},
});