Skip to main content

使用 AI SDK 界面

🌐 Using AI SDK UI

AI SDK UI 是一个用于构建 AI 驱动界面的 React 工具和组件库。在本指南中,你将学习如何使用 @mastra/ai-sdk 将 Mastra 的输出转换为与 AI SDK 兼容的格式,从而在前端中使用其钩子和组件。

note

从 AI SDK v4 迁移到 v5?请参阅迁移指南

tip

想查看更多示例吗?请访问 Mastra 的 UI DojoNext.js 快速入门指南

🌐 Want to see more examples? Visit Mastra's UI Dojo or the Next.js quickstart guide.

入门
Direct link to 入门

🌐 Getting Started

通过安装 @mastra/ai-sdk 软件包,将 Mastra 与 AI SDK UI 一起使用。@mastra/ai-sdk 提供自定义 API 路由和实用工具,用于以 AI SDK 兼容的格式流式传输 Mastra 代理。这包括聊天、工作流和网络路由处理程序,以及用于 UI 集成的实用工具和导出类型。

🌐 Use Mastra and AI SDK UI together by installing the @mastra/ai-sdk package. @mastra/ai-sdk provides custom API routes and utilities for streaming Mastra agents in AI SDK-compatible formats. This includes chat, workflow, and network route handlers, along with utilities and exported types for UI integrations.

@mastra/ai-sdk 与 AI SDK UI 的三个主要钩子集成:useChat()useCompletion()useObject()

安装所需的软件包以开始:

🌐 Install the required packages to get started:

npm install @mastra/ai-sdk@latest @ai-sdk/react ai

你现在可以按照下面的集成指南和使用教程进行操作了!

🌐 You're now ready to follow the integration guides and recipes below!

集成指南
Direct link to 集成指南

🌐 Integration Guides

通常,你会设置 API 路由,以 AI SDK 兼容的格式流式传输 Mastra 内容,然后在 AI SDK UI 钩子中使用这些路由,例如 useChat()。下面你会找到实现这一目标的两种主要方法:

🌐 Typically, you'll set up API routes that stream Mastra content in AI SDK-compatible format, and then use those routes in AI SDK UI hooks like useChat(). Below you'll find two main approaches to achieve this:

一旦你设置好了 API 路由,就可以在 useChat() 钩子中使用它们。

🌐 Once you have your API routes set up, you can use them in the useChat() hook.

Mastra的服务器
Direct link to Mastra的服务器

🌐 Mastra's server

将 Mastra 作为独立服务器运行,并将你的前端(例如使用 Vite + React)连接到其 API 端点。为此,你将使用 Mastra 的自定义 API 路由功能。

🌐 Run Mastra as a standalone server and connect your frontend (e.g. using Vite + React) to its API endpoints. You'll be using Mastra's custom API routes feature for this.

info

Mastra 的 UI Dojo 是这种设置的一个例子。

🌐 Mastra's UI Dojo is an example of this setup.

你可以使用 chatRoute()workflowRoute()networkRoute() 创建以 AI SDK 兼容的格式流式传输 Mastra 内容的 API 路由。实现后,你可以在 useChat() 中使用这些 API 路由。

🌐 You can use chatRoute(), workflowRoute(), and networkRoute() to create API routes that stream Mastra content in AI SDK-compatible format. Once implemented, you can use these API routes in useChat().

此示例演示了如何在 /chat 端点设置聊天路由,该路由使用 ID 为 weatherAgent 的代理。

🌐 This example shows how to set up a chat route at the /chat endpoint that uses an agent with the ID weatherAgent.

src/mastra/index.ts
import { Mastra } from "@mastra/core";
import { chatRoute } from "@mastra/ai-sdk";

export const mastra = new Mastra({
server: {
apiRoutes: [
chatRoute({
path: "/chat",
agent: "weatherAgent",
}),
],
},
});

你也可以使用动态代理路由,详情请参阅 chatRoute() 参考文档

🌐 You can also use dynamic agent routing, see the chatRoute() reference documentation for more details.

与框架无关
Direct link to 与框架无关

🌐 Framework-agnostic

如果你不想运行 Mastra 的服务器,而是想使用像 Next.js 或 Express 这样的框架,你可以在自己的 API 路由处理程序中使用 handleChatStream()handleWorkflowStream()handleNetworkStream() 函数。

🌐 If you don't want to run Mastra's server and instead use frameworks like Next.js or Express, you can use the handleChatStream(), handleWorkflowStream(), and handleNetworkStream() functions in your own API route handlers.

它们返回一个 ReadableStream,你可以用 createUIMessageStreamResponse() 封装它。

🌐 They return a ReadableStream that you can wrap with createUIMessageStreamResponse().

下面的示例向你展示如何在 Next.js 应用路由中使用它们。

🌐 The examples below show you how to use them with Next.js App Router.

此示例演示了如何在 /chat 端点设置聊天路由,该路由使用 ID 为 weatherAgent 的代理。

🌐 This example shows how to set up a chat route at the /chat endpoint that uses an agent with the ID weatherAgent.

app/chat/route.ts
import { handleChatStream } from '@mastra/ai-sdk';
import { createUIMessageStreamResponse } from 'ai';
import { mastra } from '@/src/mastra';

export async function POST(req: Request) {
const params = await req.json();
const stream = await handleChatStream({
mastra,
agentId: 'weatherAgent',
params,
});
return createUIMessageStreamResponse({ stream });
}

useChat()
Direct link to usechat

无论你是通过 Mastra 的服务器 创建 API 路由,还是使用 你选择的框架,你现在都可以在 useChat() 钩子中使用这些 API 端点。

🌐 Whether you created API routes through Mastra's server or used a framework of your choice, you can now use the API endpoints in the useChat() hook.

假设你在 /chat 设置了一条使用天气代理的路由,你可以像下面看到的那样向它提问。重要的是你要设置正确的 api URL。

🌐 Assuming you set up a route at /chat that uses a weather agent, you can ask it questions as seen below. It's important that you set the correct api URL.

import { useChat } from "@ai-sdk/react";
import { useState } from "react";
import { DefaultChatTransport } from "ai";

export default function Chat() {
const [inputValue, setInputValue] = useState("")
const { messages, sendMessage } = useChat({
transport: new DefaultChatTransport({
api: "http://localhost:4111/chat",
}),
});

const handleFormSubmit = (e: React.FormEvent) => {
e.preventDefault();
sendMessage({ text: inputValue });
};

return (
<div>
<pre>{JSON.stringify(messages, null, 2)}</pre>
<form onSubmit={handleFormSubmit}>
<input value={inputValue} onChange={e => setInputValue(e.target.value)} placeholder="Name of the city" />
</form>
</div>
);
}

使用 prepareSendMessagesRequest 来自定义发送到聊天接口的请求,例如传递额外的配置给代理。

🌐 Use prepareSendMessagesRequest to customize the request sent to the chat route, for example to pass additional configuration to the agent.

useCompletion()
Direct link to usecompletion

useCompletion() 钩子处理前端与 Mastra 代理之间的单轮完成,使你可以发送提示并通过 HTTP 接收流式响应。

🌐 The useCompletion() hook handles single-turn completions between your frontend and a Mastra agent, allowing you to send a prompt and receive a streamed response over HTTP.

你的前端可能看起来像这样:

🌐 Your frontend could look like this:

app/page.tsx
import { useCompletion } from '@ai-sdk/react';

export default function Page() {
const { completion, input, handleInputChange, handleSubmit } = useCompletion({
api: '/api/completion',
});

return (
<form onSubmit={handleSubmit}>
<input
name="prompt"
value={input}
onChange={handleInputChange}
id="input"
/>
<button type="submit">Submit</button>
<div>{completion}</div>
</form>
);
}

以下是实现后端的两种方法:

🌐 Below are two approaches to implementing the backend:

src/mastra/index.ts
import { Mastra } from '@mastra/core/mastra';
import { registerApiRoute } from '@mastra/core/server';
import { handleChatStream } from '@mastra/ai-sdk';
import { createUIMessageStreamResponse } from 'ai';

export const mastra = new Mastra({
server: {
apiRoutes: [
registerApiRoute('/completion', {
method: 'POST',
handler: async (c) => {
const { prompt } = await c.req.json();
const mastra = c.get('mastra');
const stream = await handleChatStream({
mastra,
agentId: 'weatherAgent',
params: {
messages: [
{
id: "1",
role: 'user',
parts: [
{
type: 'text',
text: prompt
}
]
}
],
}
})

return createUIMessageStreamResponse({ stream });
}
})
]
}
});

自定义界面
Direct link to 自定义界面

🌐 Custom UI

自定义 UI(也称为生成式 UI)允许你根据从 Mastra 流式传输的数据渲染自定义 React 组件。你可以创建用于工具输出、工作流进度、代理网络执行和自定义事件的可视化组件,而不是显示原始文本或 JSON。

🌐 Custom UI (also known as Generative UI) allows you to render custom React components based on data streamed from Mastra. Instead of displaying raw text or JSON, you can create visual components for tool outputs, workflow progress, agent network execution, and custom events.

当你想要时使用自定义用户界面:

🌐 Use Custom UI when you want to:

  • 将工具输出呈现为可视组件(例如,将天气信息卡显示出来而不是显示 JSON)
  • 使用状态指示器显示工作流步骤进度
  • 通过逐步更新可视化代理网络执行
  • 在长时间运行的操作中显示进度指示器或状态更新

数据部分类型
Direct link to 数据部分类型

🌐 Data part types

Mastra 将数据作为消息中的“部分”传输到前端。每个部分都有一个 type 来决定如何渲染它。@mastra/ai-sdk 包将 Mastra 流转换为兼容 AI SDK 的 UI 消息数据部分

🌐 Mastra streams data to the frontend as "parts" within messages. Each part has a type that determines how to render it. The @mastra/ai-sdk package transforms Mastra streams into AI SDK-compatible UI Message DataParts.

数据部分类型来源描述
tool-{toolKey}AI SDK 内置使用状态调用工具:input-availableoutput-availableoutput-error
data-workflowworkflowRoute()带步骤输入、输出和状态的工作流执行
data-networknetworkRoute()带有有序步骤和输出的代理网络执行
data-tool-agent工具中的嵌套代理从工具的 execute() 内部流式输出代理结果
data-tool-workflow工具中的嵌套工作流从工具的 execute() 内部流式输出工作流结果
data-tool-network工具中的嵌套网络从工具的 execute() 内部流式输出网络结果
data-{custom}writer.custom()用于进度指示、状态更新等的自定义事件

渲染工具输出
Direct link to 渲染工具输出

🌐 Rendering tool outputs

当代理调用工具时,AI SDK 会自动创建 tool-{toolKey} 部件。这些部件包括工具的状态和输出,你可以使用它们来渲染自定义组件。

🌐 AI SDK automatically creates tool-{toolKey} parts when an agent calls a tool. These parts include the tool's state and output, which you can use to render custom components.

工具部件在各个状态之间循环:

🌐 The tool part cycles through states:

  • input-streaming:工具输入正在被流式传输(当启用工具调用流式传输时)
  • input-available:工具已被调用,输入完整,正在等待执行
  • output-available:工具执行完成,输出结果如下
  • output-error:工具执行失败

这是将天气工具的输出呈现为自定义 WeatherCard 组件的一个示例。

🌐 Here's an example of rendering a weather tool's output as a custom WeatherCard component.

定义一个带有 outputSchema 的工具,这样前端就知道要渲染的数据形状。

🌐 Define a tool with an outputSchema so the frontend knows the shape of the data to render.

src/mastra/tools/weather-tool.ts
import { createTool } from "@mastra/core/tools";
import { z } from "zod";

export const weatherTool = createTool({
id: "get-weather",
description: "Get current weather for a location",
inputSchema: z.object({
location: z.string().describe("The location to get the weather for"),
}),
outputSchema: z.object({
temperature: z.number(),
feelsLike: z.number(),
humidity: z.number(),
windSpeed: z.number(),
conditions: z.string(),
location: z.string(),
}),
execute: async (inputData) => {
const response = await fetch(
`https://api.weatherapi.com/v1/current.json?key=${process.env.WEATHER_API_KEY}&q=${inputData.location}`
);
const data = await response.json();
return {
temperature: data.current.temp_c,
feelsLike: data.current.feelslike_c,
humidity: data.current.humidity,
windSpeed: data.current.wind_kph,
conditions: data.current.condition.text,
location: data.location.name,
};
},
});
tip

工具部件类型遵循 tool-{toolKey} 的模式,其中 toolKey 是将工具注册到代理时使用的关键字。例如,如果你将工具注册为 tools: { weatherTool },部件类型将是 tool-weatherTool

🌐 The tool part type follows the pattern tool-{toolKey}, where toolKey is the key used when registering the tool with the agent. For example, if you register tools as tools: { weatherTool }, the part type will be tool-weatherTool.

渲染工作流程数据
Direct link to 渲染工作流程数据

🌐 Rendering workflow data

在使用 workflowRoute()handleWorkflowStream() 时,Mastra 会发送包含工作流执行状态的 data-workflow 部分,包括步骤状态和输出。

🌐 When using workflowRoute() or handleWorkflowStream(), Mastra emits data-workflow parts that contain the workflow's execution state, including step statuses and outputs.

定义一个包含多个步骤的工作流,在执行过程中将输出 data-workflow 部件。

🌐 Define a workflow with multiple steps that will emit data-workflow parts as it executes.

src/mastra/workflows/activities-workflow.ts
import { createStep, createWorkflow } from "@mastra/core/workflows";
import { z } from "zod";

const fetchWeather = createStep({
id: "fetch-weather",
inputSchema: z.object({
location: z.string(),
}),
outputSchema: z.object({
temperature: z.number(),
conditions: z.string(),
}),
execute: async ({ inputData }) => {
// Fetch weather data...
return { temperature: 22, conditions: "Sunny" };
},
});

const planActivities = createStep({
id: "plan-activities",
inputSchema: z.object({
temperature: z.number(),
conditions: z.string(),
}),
outputSchema: z.object({
activities: z.string(),
}),
execute: async ({ inputData, mastra }) => {
const agent = mastra?.getAgent("activityAgent");
const response = await agent?.generate(
`Suggest activities for ${inputData.conditions} weather at ${inputData.temperature}°C`
);
return { activities: response?.text || "" };
},
});

export const activitiesWorkflow = createWorkflow({
id: "activities-workflow",
inputSchema: z.object({
location: z.string(),
}),
outputSchema: z.object({
activities: z.string(),
}),
})
.then(fetchWeather)
.then(planActivities);

activitiesWorkflow.commit();

将工作流注册到 Mastra,并通过 workflowRoute() 暴露它,以便将工作流事件传输到前端。

🌐 Register the workflow with Mastra and expose it via workflowRoute() to stream workflow events to the frontend.

src/mastra/index.ts
import { Mastra } from "@mastra/core";
import { workflowRoute } from "@mastra/ai-sdk";

export const mastra = new Mastra({
workflows: { activitiesWorkflow },
server: {
apiRoutes: [
workflowRoute({
path: "/workflow/activitiesWorkflow",
workflow: "activitiesWorkflow",
}),
],
},
});

有关工作流流式处理的更多详情,请参见 工作流流式处理

🌐 For more details on workflow streaming, see Workflow Streaming.

渲染网络数据
Direct link to 渲染网络数据

🌐 Rendering network data

在使用 networkRoute()handleNetworkStream() 时,Mastra 会生成包含代理网络执行状态的 data-network 部分,包括调用了哪些代理及其输出。

🌐 When using networkRoute() or handleNetworkStream(), Mastra emits data-network parts that contain the agent network's execution state, including which agents were called and their outputs.

在 Mastra 中注册代理,并通过 networkRoute() 暴露路由代理,将网络执行事件流传输到前端。

🌐 Register agents with Mastra and expose the routing agent via networkRoute() to stream network execution events to the frontend.

src/mastra/index.ts
import { Mastra } from "@mastra/core";
import { networkRoute } from "@mastra/ai-sdk";

export const mastra = new Mastra({
agents: { routingAgent, researchAgent, weatherAgent },
server: {
apiRoutes: [
networkRoute({
path: "/network",
agent: "routingAgent",
}),
],
},
});

有关代理网络的更多详情,请参见 代理网络

🌐 For more details on agent networks, see Agent Networks.

自定义事件
Direct link to 自定义事件

🌐 Custom events

在工具的 execute() 功能中使用 writer.custom() 来发送自定义数据部分。这对于进度指示器、状态更新或在工具执行期间的任何自定义 UI 更新都很有用。

🌐 Use writer.custom() within a tool's execute() function to emit custom data parts. This is useful for progress indicators, status updates, or any custom UI updates during tool execution.

自定义事件类型必须以 data- 开头才能被识别为数据部分。

🌐 Custom event types must start with data- to be recognized as data parts.

warning

你必须 await writer.custom() 调用,否则可能会遇到 WritableStream is locked 错误。

🌐 You must await the writer.custom() call, otherwise you may encounter a WritableStream is locked error.

在工具的 execute() 功能中使用 writer.custom() 来在执行的不同阶段发出带有自定义 data- 前缀的事件。

🌐 Use writer.custom() inside the tool's execute() function to emit custom data- prefixed events at different stages of execution.

src/mastra/tools/task-tool.ts
import { createTool } from "@mastra/core/tools";
import { z } from "zod";

export const taskTool = createTool({
id: "process-task",
description: "Process a task with progress updates",
inputSchema: z.object({
task: z.string().describe("The task to process"),
}),
outputSchema: z.object({
result: z.string(),
status: z.string(),
}),
execute: async (inputData, context) => {
const { task } = inputData;

// Emit "in progress" custom event
await context?.writer?.custom({
type: "data-tool-progress",
data: {
status: "in-progress",
message: "Gathering information...",
},
});

// Simulate work
await new Promise((resolve) => setTimeout(resolve, 3000));

// Emit "done" custom event
await context?.writer?.custom({
type: "data-tool-progress",
data: {
status: "done",
message: `Successfully processed "${task}"`,
},
});

return {
result: `Task "${task}" has been completed successfully!`,
status: "completed",
};
},
});

工具流
Direct link to 工具流

🌐 Tool streaming

工具还可以使用 context.writer.write() 流式传输数据以实现更底层的控制,或者将代理的流直接传输到工具的写入器。更多详情,请参见 工具流式传输

🌐 Tools can also stream data using context.writer.write() for lower-level control, or pipe an agent's stream directly to the tool's writer. For more details, see Tool Streaming.

示例
Direct link to 示例

🌐 Examples

有关自定义 UI 模式的实时示例,请访问 Mastra 的 UI Dojo。该仓库包括以下实现内容:

🌐 For live examples of Custom UI patterns, visit Mastra's UI Dojo. The repository includes implementations for:

秘诀
Direct link to 秘诀

🌐 Recipes

流转换
Direct link to 流转换

🌐 Stream transformations

要手动将 Mastra 的流转换为 AI SDK 兼容格式,请使用 toAISdkStream() 工具。具体用法示例请参见 examples

🌐 To manually transform Mastra's streams to AI SDK-compatible format, use the toAISdkStream() utility. See the examples for concrete usage patterns.

正在加载历史消息
Direct link to 正在加载历史消息

🌐 Loading historical messages

在从 Mastra 的内存中加载消息以在聊天界面显示时,使用 toAISdkV5Messages()toAISdkV4Messages() 将它们转换为适用于 useChat()initialMessages 的 AI SDK 格式。

🌐 When loading messages from Mastra's memory to display in a chat UI, use toAISdkV5Messages() or toAISdkV4Messages() to convert them to the appropriate AI SDK format for useChat()'s initialMessages.

传递额外数据
Direct link to 传递额外数据

🌐 Passing additional data

[sendMessage()](https://ai-sdk.dev/docs/reference/ai-sdk-ui/use-chat#send-message) 允许你将额外的数据从前端传递到 Mastra。这些数据随后可以在服务器端作为 [RequestContext](/docs/server/request-context) 使用。

这是前端代码的一个示例:

🌐 Here's an example of the frontend code:

import { useChat } from "@ai-sdk/react";
import { useState } from "react";
import { DefaultChatTransport } from 'ai';

export function ChatAdditional() {
const [inputValue, setInputValue] = useState('')
const { messages, sendMessage } = useChat({
transport: new DefaultChatTransport({
api: 'http://localhost:4111/chat-extra',
}),
});

const handleFormSubmit = (e: React.FormEvent) => {
e.preventDefault();
sendMessage({ text: inputValue }, {
body: {
data: {
userId: "user123",
preferences: {
language: "en",
temperature: "celsius"
}
}
}
});
};

return (
<div>
<pre>{JSON.stringify(messages, null, 2)}</pre>
<form onSubmit={handleFormSubmit}>
<input value={inputValue} onChange={e => setInputValue(e.target.value)} placeholder="Name of the city" />
</form>
</div>
);
}

两个关于如何实现其后端部分的示例。

🌐 Two examples on how to implement the backend portion of it.

像上面显示的那样在你的 Mastra 配置中添加一个 chatRoute()。然后,添加一个服务器级别的中间件:

🌐 Add a chatRoute() to your Mastra configuration like shown above. Then, add a server-level middleware:

src/mastra/index.ts
import { Mastra } from "@mastra/core";

export const mastra = new Mastra({
server: {
middleware: [
async (c, next) => {
const requestContext = c.get("requestContext");

if (c.req.method === "POST") {
const clonedReq = c.req.raw.clone();
const body = await clonedReq.json();

if (body?.data) {
for (const [key, value] of Object.entries(body.data)) {
requestContext.set(key, value);
}
}
}
await next();
},
],
},
});
info

你可以通过 requestContext 参数在你的工具中访问这些数据。有关更多详细信息,请参阅 请求上下文文档

🌐 You can access this data in your tools via the requestContext parameter. See the Request Context documentation for more details.

工作流暂停/恢复(需用户批准)
Direct link to 工作流暂停/恢复(需用户批准)

🌐 Workflow suspend/resume with user approval

工作流可以暂停执行并等待用户输入后再继续。这对于审批流程、确认或任何需要人工干预的场景非常有用。

🌐 Workflows can suspend execution and wait for user input before continuing. This is useful for approval flows, confirmations, or any human-in-the-loop scenario.

该工作流使用:

🌐 The workflow uses:

  • suspendSchema / resumeSchema - 定义挂起负载和恢复输入的数据结构
  • suspend() - 暂停工作流并将挂起数据发送到用户界面
  • resumeData - 包含工作流恢复时用户的响应
  • bail() - 提前退出工作流程(例如,当用户拒绝时)

创建一个暂停以待审批的工作流步骤。该步骤检查 resumeData 来确定是否恢复,并在首次执行时调用 suspend()

🌐 Create a workflow step that suspends for approval. The step checks resumeData to determine if it's resuming, and calls suspend() on first execution.

src/mastra/workflows/approval-workflow.ts
import { createStep, createWorkflow } from "@mastra/core/workflows";
import { z } from "zod";

const requestApproval = createStep({
id: "request-approval",
inputSchema: z.object({ requestId: z.string(), summary: z.string() }),
outputSchema: z.object({
approved: z.boolean(),
requestId: z.string(),
approvedBy: z.string().optional(),
}),
resumeSchema: z.object({
approved: z.boolean(),
approverName: z.string().optional(),
}),
suspendSchema: z.object({
message: z.string(),
requestId: z.string(),
}),
execute: async ({ inputData, resumeData, suspend, bail }) => {
// User rejected - bail out
if (resumeData?.approved === false) {
return bail({ message: "Request rejected" });
}
// User approved - continue
if (resumeData?.approved) {
return {
approved: true,
requestId: inputData.requestId,
approvedBy: resumeData.approverName || "User",
};
}
// First execution - suspend and wait
return await suspend({
message: `Please approve: ${inputData.summary}`,
requestId: inputData.requestId,
});
},
});

export const approvalWorkflow = createWorkflow({
id: "approval-workflow",
inputSchema: z.object({ requestId: z.string(), summary: z.string() }),
outputSchema: z.object({
approved: z.boolean(),
requestId: z.string(),
approvedBy: z.string().optional(),
}),
})
.then(requestApproval);

approvalWorkflow.commit();

注册工作流。挂起/恢复需要存储以保持状态。

🌐 Register the workflow. Storage is required for suspend/resume to persist state.

src/mastra/index.ts
import { Mastra } from "@mastra/core";
import { workflowRoute } from "@mastra/ai-sdk";
import { LibSQLStore } from "@mastra/libsql";

export const mastra = new Mastra({
workflows: { approvalWorkflow },
storage: new LibSQLStore({
url: "file:../mastra.db",
}),
server: {
apiRoutes: [
workflowRoute({ path: "/workflow/approvalWorkflow", workflow: "approvalWorkflow" }),
],
},
});

关键点:

🌐 Key points:

  • 挂起负载可以通过 step.suspendPayload 访问
  • 要恢复,请在请求正文中发送 runIdstep(步骤 ID)和 resumeData
  • 必须配置存储以便在挂起/恢复时保留工作流状态

有关完整的实现,请参阅 UI Dojo 中的 workflow-suspend-resume 示例

🌐 For a complete implementation, see the workflow-suspend-resume example in UI Dojo.

工具中的嵌套代理流
Direct link to 工具中的嵌套代理流

🌐 Nested agent streams in tools

工具可以在内部调用代理,并将代理的输出实时传回到用户界面。这会创建可以与工具最终输出一起渲染的 data-tool-agent 部分。

🌐 Tools can call agents internally and stream the agent's output back to the UI. This creates data-tool-agent parts that can be rendered alongside the tool's final output.

该图案使用了:

🌐 The pattern uses:

  • context.mastra.getAgent() - 从工具中获取代理实例
  • agent.stream() - 流式传输代理的响应
  • stream.fullStream.pipeTo(context.writer) - 将代理的流导入工具的写入器

创建一个工具,该工具调用一个代理并将其流传输到工具的写入器。

🌐 Create a tool that calls an agent and pipes its stream to the tool's writer.

src/mastra/tools/nested-agent-tool.ts
import { createTool } from "@mastra/core/tools";
import { z } from "zod";

export const nestedAgentTool = createTool({
id: "nested-agent-stream",
description: "Analyze weather using a nested agent",
inputSchema: z.object({
city: z.string().describe("The city to analyze"),
}),
outputSchema: z.object({
summary: z.string(),
}),
execute: async (inputData, context) => {
const agent = context?.mastra?.getAgent("weatherAgent");
if (!agent) {
return { summary: "Weather agent not available" };
}

const stream = await agent.stream(
`Analyze the weather in ${inputData.city} and provide a summary.`
);

// Pipe the agent's stream to emit data-tool-agent parts
await stream.fullStream.pipeTo(context!.writer!);

return { summary: (await stream.text) ?? "No summary available" };
},
});

创建一个使用此工具的代理。

🌐 Create an agent that uses this tool.

src/mastra/agents/forecast-agent.ts
import { Agent } from "@mastra/core/agent";
import { nestedAgentTool } from "../tools/nested-agent-tool";

export const forecastAgent = new Agent({
id: "forecast-agent",
instructions: "Use the nested-agent-stream tool when asked about weather.",
model: "openai/gpt-4o-mini",
tools: { nestedAgentTool },
});

关键点:

🌐 Key points:

  • fullStream 管道输送到 context.writer 会产生 data-tool-agent 个零件
  • AgentDataPartid(部件上的)和 data.text(代理的流式文本)
  • 在流完成后,该工具仍会返回它自己的输出

有关完整的实现,请参阅 UI Dojo 中的 tool-nested-streams 示例

🌐 For a complete implementation, see the tool-nested-streams example in UI Dojo.

来自工作流步骤的流式代理文本
Direct link to 来自工作流步骤的流式代理文本

🌐 Streaming agent text from workflow steps

工作流步骤可以通过将代理的流输出传输到步骤的 writer 来实时传输代理的文本输出。这使用户在工作流执行过程中可以看到代理的“思考”过程,而不必等待步骤完成。

🌐 Workflow steps can stream an agent's text output in real-time by piping the agent's stream to the step's writer. This lets users see the agent "thinking" while the workflow executes, rather than waiting for the step to complete.

该图案使用了:

🌐 The pattern uses:

  • 工作流步骤中的 writer - 将代理的 fullStream 管道到该步骤的写入器
  • textdata-workflow 部分 - 前端接收逐步进度的流式文本

创建一个工作流步骤,通过将代理的响应传输到步骤的 writer 来进行流式处理。

🌐 Create a workflow step that streams an agent's response by piping to the step's writer.

src/mastra/workflows/weather-workflow.ts
import { createStep, createWorkflow } from "@mastra/core/workflows";
import { z } from "zod";
import { weatherAgent } from "../agents/weather-agent";

const analyzeWeather = createStep({
id: "analyze-weather",
inputSchema: z.object({ location: z.string() }),
outputSchema: z.object({ analysis: z.string(), location: z.string() }),
execute: async ({ inputData, writer }) => {
const response = await weatherAgent.stream(
`Analyze the weather in ${inputData.location} and provide insights.`
);

// Pipe agent stream to step writer for real-time text streaming
await response.fullStream.pipeTo(writer);

return {
analysis: await response.text,
location: inputData.location,
};
},
});

const calculateScore = createStep({
id: "calculate-score",
inputSchema: z.object({ analysis: z.string(), location: z.string() }),
outputSchema: z.object({ score: z.number(), summary: z.string() }),
execute: async ({ inputData }) => {
const score = inputData.analysis.includes("sunny") ? 85 : 50;
return { score, summary: `Comfort score for ${inputData.location}: ${score}/100` };
},
});

export const weatherWorkflow = createWorkflow({
id: "weather-workflow",
inputSchema: z.object({ location: z.string() }),
outputSchema: z.object({ score: z.number(), summary: z.string() }),
})
.then(analyzeWeather)
.then(calculateScore);

weatherWorkflow.commit();

使用 workflowRoute() 注册工作流。文本流默认启用。

🌐 Register the workflow with a workflowRoute(). Text streaming is enabled by default.

src/mastra/index.ts
import { Mastra } from "@mastra/core";
import { workflowRoute } from "@mastra/ai-sdk";

export const mastra = new Mastra({
agents: { weatherAgent },
workflows: { weatherWorkflow },
server: {
apiRoutes: [
workflowRoute({ path: "/workflow/weather", workflow: "weatherWorkflow" }),
],
},
});

关键点:

🌐 Key points:

  • 该步骤的 writer 可在 execute 函数中使用(不是通过 context
  • includeTextStreamPartsworkflowRoute() 上默认为 true,因此文本默认情况下是流式处理的
  • 文本部分实时流式传输,而 data-workflow 部分会随着步骤状态更新

有关完整实现,请参阅 UI Dojo 中的 workflow-agent-text-stream 示例

🌐 For a complete implementation, see the workflow-agent-text-stream example in UI Dojo.

多阶段进展与分支工作流
Direct link to 多阶段进展与分支工作流

🌐 Multi-stage progress with branching workflows

对于具有条件分支的工作流程(例如,快递与标准运输),你可以通过在自定义事件中包含标识符来跟踪不同分支的进度。

🌐 For workflows with conditional branching (e.g., express vs standard shipping), you can track progress across different branches by including a identifier in your custom events.

UI Dojo 示例在事件数据中使用 stage 字段来标识正在执行的分支(例如,"validation""standard-processing""express-processing")。前端通过这个字段对事件进行分组,以显示流水线式的进度界面。

🌐 The UI Dojo example uses a stage field in the event data to identify which branch is executing (e.g., "validation", "standard-processing", "express-processing"). The frontend groups events by this field to show a pipeline-style progress UI.

请查看 UI Dojo 中的 branching-workflow.ts(后端)和 workflow-custom-events.tsx(前端)。

🌐 See the branching-workflow.ts (backend) and workflow-custom-events.tsx (frontend) in UI Dojo.

代理网络中的进度指示器
Direct link to 代理网络中的进度指示器

🌐 Progress indicators in agent networks

在使用代理网络时,你可以从子代理使用的工具中发出自定义进度事件,以显示当前哪个代理正在活动。

🌐 When using agent networks, you can emit custom progress events from tools used by sub-agents to show which agent is currently active.

UI Dojo 示例在事件数据中包括一个 stage 字段,用于标识正在运行的子代理(例如,"report-generation""report-review")。前端根据此字段对事件进行分组,并显示每个子代理的最新状态。

🌐 The UI Dojo example includes a stage field in the event data to identify which sub-agent is running (e.g., "report-generation", "report-review"). The frontend groups events by this field and displays the latest status for each.

请查看 UI Dojo 中的 report-generation-tool.ts(后端)和 agent-network-custom-events.tsx(前端)。

🌐 See the report-generation-tool.ts (backend) and agent-network-custom-events.tsx (frontend) in UI Dojo.