Skip to main content

工具流

🌐 Tool streaming

Mastra 中的工具流功能使工具在运行时可以发送增量结果,而无需等到执行完成。这使你能够将部分进展、中间状态或逐步生成的数据直接呈现给用户或上游的代理和工作流。

🌐 Tool streaming in Mastra enables tools to send incremental results while they run, rather than waiting until execution finishes. This allows you to surface partial progress, intermediate states, or progressive data directly to users or upstream agents and workflows.

流可以通过两种主要方式进行写入:

🌐 Streams can be written to in two main ways:

  • 从工具内部:每个工具都会收到一个 context.writer 对象,它是一个可写流,你可以使用它在执行过程中推送更新。
  • 来自代理流:你也可以将代理的 stream 输出直接传入工具的写入器,这样可以轻松地将代理的响应链式连接到工具结果,而无需额外的连接代码。

通过将可写工具流与代理流结合使用,你可以精细地控制中间结果如何在系统中流动并最终呈现给用户体验。

🌐 By combining writable tool streams with agent streaming, you gain fine grained control over how intermediate results flow through your system and into the user experience.

使用工具的代理
Direct link to 使用工具的代理

🌐 Agent using tool

代理流式传输可以与工具调用结合使用,允许将工具输出直接写入代理的流式响应中。这使得可以将工具活动作为整体交互的一部分呈现出来。

🌐 Agent streaming can be combined with tool calls, allowing tool outputs to be written directly into the agent’s streaming response. This makes it possible to surface tool activity as part of the overall interaction.

import { Agent } from "@mastra/core/agent";
import { testTool } from "../tools/test-tool";

export const testAgent = new Agent({
id: "test-agent",
name: "Test Agent",
instructions: "You are a weather agent.",
model: "openai/gpt-5.1",
tools: { testTool },
});

使用 context.writer
Direct link to using-contextwriter

🌐 Using context.writer

context.writer 对象可以在工具的 execute() 函数中使用,并且可用于向活动流发出自定义事件、数据或数值。这使得工具在执行过程中仍然可以提供中间结果或状态更新。

🌐 The context.writer object is available in a tool's execute() function and can be used to emit custom events, data, or values into the active stream. This enables tools to provide intermediate results or status updates while execution is still in progress.

warning

你必须 awaitwriter.write() 的调用,否则你将锁定流并收到 WritableStream is locked 错误。

🌐 You must await the call to writer.write() or else you will lock the stream and get a WritableStream is locked error.

import { createTool } from "@mastra/core/tools";

export const testTool = createTool({
execute: async (inputData, context) => {
const { value } = inputData;

await context?.writer?.write({
type: "custom-event",
status: "pending"
});

const response = await fetch(...);

await context?.writer?.write({
type: "custom-event",
status: "success"
});

return {
value: ""
};
}
});

如果你想发送顶Laminar块,你也可以使用 writer.custom(),这在与 UI 框架集成时非常有用和相关

🌐 You can also use writer.custom() if you want to emit top level stream chunks, This useful and relevant when integrating with UI Frameworks

import { createTool } from "@mastra/core/tools";

export const testTool = createTool({
execute: async (inputData, context) => {
const { value } = inputData;

await context?.writer?.custom({
type: "data-tool-progress",
status: "pending"
});

const response = await fetch(...);

await context?.writer?.custom({
type: "data-tool-progress",
status: "success"
});

return {
value: ""
};
}
});

检查流负载
Direct link to 检查流负载

🌐 Inspecting stream payloads

写入流的事件会包含在发出的数据块中。这些数据块可以被检查,以访问任何自定义字段,例如事件类型、中间值或特定工具的数据。

🌐 Events written to the stream are included in the emitted chunks. These chunks can be inspected to access any custom fields, such as event types, intermediate values, or tool-specific data.

const stream = await testAgent.stream([
"What is the weather in London?",
"Use the testTool",
]);

for await (const chunk of stream) {
if (chunk.payload.output?.type === "custom-event") {
console.log(JSON.stringify(chunk, null, 2));
}
}

工具生命周期钩子
Direct link to 工具生命周期钩子

🌐 Tool Lifecycle Hooks

工具支持生命周期钩子,允许你在流式传输过程中监控工具执行的不同阶段。这些钩子对于日志记录或分析特别有用。

🌐 Tools support lifecycle hooks that allow you to monitor different stages of tool execution during streaming. These hooks are particularly useful for logging or analytics.

示例:使用 onInputAvailable 和 onOutput
Direct link to 示例:使用 onInputAvailable 和 onOutput

🌐 Example: Using onInputAvailable and onOutput

import { createTool } from "@mastra/core/tools";
import { z } from "zod";

export const weatherTool = createTool({
id: "weather-tool",
description: "Get weather information",
inputSchema: z.object({
city: z.string(),
}),
outputSchema: z.object({
temperature: z.number(),
conditions: z.string(),
}),
// Called when the complete input is available
onInputAvailable: ({ input, toolCallId }) => {
console.log(`Weather requested for: ${input.city}`);
},
execute: async (input) => {
const weather = await fetchWeather(input.city);
return weather;
},
// Called after successful execution
onOutput: ({ output, toolName }) => {
console.log(`${toolName} result: ${output.temperature}°F, ${output.conditions}`);
},
});

可用钩子
Direct link to 可用钩子

🌐 Available Hooks

  • onInputStart:在工具开始输入流时调用
  • onInputDelta:在每个输入片段流入时调用
  • onInputAvailable:在完整输入被解析和验证后调用
  • onOutput:在工具成功执行并输出结果后调用

有关所有生命周期钩子的详细文档,请参阅 createTool() 参考

🌐 For detailed documentation on all lifecycle hooks, see the createTool() reference.

使用代理的工具
Direct link to 使用代理的工具

🌐 Tool using an agent

将代理的 fullStream 管道到工具的 writer。这会流式传输部分输出,Mastra 会自动将代理的使用情况汇总到工具运行中。

🌐 Pipe an agent's fullStream to the tool's writer. This streams partial output, and Mastra automatically aggregates the agent's usage into the tool run.

import { createTool } from "@mastra/core/tools";
import { z } from "zod";

export const testTool = createTool({
execute: async (inputData, context) => {
const { city } = inputData;

const agent = context?.mastra?.getAgent("testAgent");
const stream = await agent?.stream(`What is the weather in ${city}?`);

await stream!.fullStream.pipeTo(context?.writer!);

return {
value: await stream!.text,
};
},
});