使用 AI SDK 界面
🌐 Using AI SDK UI
AI SDK UI 是一个用于构建 AI 驱动界面的 React 工具和组件库。在本指南中,你将学习如何使用 @mastra/ai-sdk 将 Mastra 的输出转换为与 AI SDK 兼容的格式,从而在前端中使用其钩子和组件。
从 AI SDK v4 迁移到 v5?请参阅迁移指南。
想查看更多示例吗?请访问 Mastra 的 UI Dojo 或 Next.js 快速入门指南。
🌐 Want to see more examples? Visit Mastra's UI Dojo or the Next.js quickstart guide.
入门Direct link to 入门
🌐 Getting Started
通过安装 @mastra/ai-sdk 软件包,将 Mastra 与 AI SDK UI 一起使用。@mastra/ai-sdk 提供自定义 API 路由和实用工具,用于以 AI SDK 兼容的格式流式传输 Mastra 代理。这包括聊天、工作流和网络路由处理程序,以及用于 UI 集成的实用工具和导出类型。
🌐 Use Mastra and AI SDK UI together by installing the @mastra/ai-sdk package. @mastra/ai-sdk provides custom API routes and utilities for streaming Mastra agents in AI SDK-compatible formats. This includes chat, workflow, and network route handlers, along with utilities and exported types for UI integrations.
@mastra/ai-sdk 与 AI SDK UI 的三个主要钩子集成:useChat()、useCompletion() 和 useObject()。
安装所需的软件包以开始:
🌐 Install the required packages to get started:
- npm
- pnpm
- Yarn
- Bun
npm install @mastra/ai-sdk@latest @ai-sdk/react ai
pnpm add @mastra/ai-sdk@latest @ai-sdk/react ai
yarn add @mastra/ai-sdk@latest @ai-sdk/react ai
bun add @mastra/ai-sdk@latest @ai-sdk/react ai
你现在可以按照下面的集成指南和使用教程进行操作了!
🌐 You're now ready to follow the integration guides and recipes below!
集成指南Direct link to 集成指南
🌐 Integration Guides
通常,你会设置 API 路由,以 AI SDK 兼容的格式流式传输 Mastra 内容,然后在 AI SDK UI 钩子中使用这些路由,例如 useChat()。下面你会找到实现这一目标的两种主要方法:
🌐 Typically, you'll set up API routes that stream Mastra content in AI SDK-compatible format, and then use those routes in AI SDK UI hooks like useChat(). Below you'll find two main approaches to achieve this:
一旦你设置好了 API 路由,就可以在 useChat() 钩子中使用它们。
🌐 Once you have your API routes set up, you can use them in the useChat() hook.
Mastra的服务器Direct link to Mastra的服务器
🌐 Mastra's server
将 Mastra 作为独立服务器运行,并将你的前端(例如使用 Vite + React)连接到其 API 端点。为此,你将使用 Mastra 的自定义 API 路由功能。
🌐 Run Mastra as a standalone server and connect your frontend (e.g. using Vite + React) to its API endpoints. You'll be using Mastra's custom API routes feature for this.
你可以使用 chatRoute()、workflowRoute() 和 networkRoute() 创建以 AI SDK 兼容的格式流式传输 Mastra 内容的 API 路由。实现后,你可以在 useChat() 中使用这些 API 路由。
🌐 You can use chatRoute(), workflowRoute(), and networkRoute() to create API routes that stream Mastra content in AI SDK-compatible format. Once implemented, you can use these API routes in useChat().
- chatRoute()
- workflowRoute()
- networkRoute()
此示例演示了如何在 /chat 端点设置聊天路由,该路由使用 ID 为 weatherAgent 的代理。
🌐 This example shows how to set up a chat route at the /chat endpoint that uses an agent with the ID weatherAgent.
import { Mastra } from "@mastra/core";
import { chatRoute } from "@mastra/ai-sdk";
export const mastra = new Mastra({
server: {
apiRoutes: [
chatRoute({
path: "/chat",
agent: "weatherAgent",
}),
],
},
});
你也可以使用动态代理路由,详情请参阅 chatRoute() 参考文档。
🌐 You can also use dynamic agent routing, see the chatRoute() reference documentation for more details.
此示例演示了如何在 /workflow 端点设置使用 ID 为 weatherWorkflow 的工作流的工作流路线。
🌐 This example shows how to set up a workflow route at the /workflow endpoint that uses a workflow with the ID weatherWorkflow.
import { Mastra } from "@mastra/core";
import { workflowRoute } from "@mastra/ai-sdk";
export const mastra = new Mastra({
server: {
apiRoutes: [
workflowRoute({
path: "/workflow",
workflow: "weatherWorkflow",
}),
],
},
});
你也可以使用动态工作流路由,更多详情请参阅 workflowRoute() 参考文档。
🌐 You can also use dynamic workflow routing, see the workflowRoute() reference documentation for more details.
当工作流步骤将代理的流传输给工作流写入器(例如 await response.fullStream.pipeTo(writer))时,代理的文本块和工具调用会实时转发到 UI 流,即使代理在工作流步骤内运行也是如此。
🌐 When a workflow step pipes an agent's stream to the workflow writer (e.g., await response.fullStream.pipeTo(writer)), the agent's text chunks and tool calls are forwarded to the UI stream in real time, even when the agent runs inside workflow steps.
有关更多详细信息,请参见 工作流流式处理。
🌐 See Workflow Streaming for more details.
此示例演示了如何在 /network 端点设置使用 ID 为 weatherAgent 的代理的网络路由。
🌐 This example shows how to set up a network route at the /network endpoint that uses an agent with the ID weatherAgent.
import { Mastra } from "@mastra/core";
import { networkRoute } from "@mastra/ai-sdk";
export const mastra = new Mastra({
server: {
apiRoutes: [
networkRoute({
path: "/network",
agent: "weatherAgent",
}),
],
},
});
你也可以使用动态网络路由,详情请参阅 networkRoute() 参考文档。
🌐 You can also use dynamic network routing, see the networkRoute() reference documentation for more details.
与框架无关Direct link to 与框架无关
🌐 Framework-agnostic
如果你不想运行 Mastra 的服务器,而是想使用像 Next.js 或 Express 这样的框架,你可以在自己的 API 路由处理程序中使用 handleChatStream()、handleWorkflowStream() 和 handleNetworkStream() 函数。
🌐 If you don't want to run Mastra's server and instead use frameworks like Next.js or Express, you can use the handleChatStream(), handleWorkflowStream(), and handleNetworkStream() functions in your own API route handlers.
它们返回一个 ReadableStream,你可以用 createUIMessageStreamResponse() 封装它。
🌐 They return a ReadableStream that you can wrap with createUIMessageStreamResponse().
下面的示例向你展示如何在 Next.js 应用路由中使用它们。
🌐 The examples below show you how to use them with Next.js App Router.
- handleChatStream()
- handleWorkflowStream()
- handleNetworkStream()
此示例演示了如何在 /chat 端点设置聊天路由,该路由使用 ID 为 weatherAgent 的代理。
🌐 This example shows how to set up a chat route at the /chat endpoint that uses an agent with the ID weatherAgent.
import { handleChatStream } from '@mastra/ai-sdk';
import { createUIMessageStreamResponse } from 'ai';
import { mastra } from '@/src/mastra';
export async function POST(req: Request) {
const params = await req.json();
const stream = await handleChatStream({
mastra,
agentId: 'weatherAgent',
params,
});
return createUIMessageStreamResponse({ stream });
}
此示例演示了如何在 /workflow 端点设置使用 ID 为 weatherWorkflow 的工作流的工作流路线。
🌐 This example shows how to set up a workflow route at the /workflow endpoint that uses a workflow with the ID weatherWorkflow.
import { handleWorkflowStream } from '@mastra/ai-sdk';
import { createUIMessageStreamResponse } from 'ai';
import { mastra } from '@/src/mastra';
export async function POST(req: Request) {
const params = await req.json();
const stream = await handleWorkflowStream({
mastra,
workflowId: 'weatherWorkflow',
params,
});
return createUIMessageStreamResponse({ stream });
}
此示例演示了如何在 /network 端点设置使用 ID 为 routingAgent 的代理的网络路由。
🌐 This example shows how to set up a network route at the /network endpoint that uses an agent with the ID routingAgent.
import { handleNetworkStream } from '@mastra/ai-sdk';
import { createUIMessageStreamResponse } from 'ai';
import { mastra } from '@/src/mastra';
export async function POST(req: Request) {
const params = await req.json();
const stream = await handleNetworkStream({
mastra,
agentId: 'routingAgent',
params,
});
return createUIMessageStreamResponse({ stream });
}
useChat()Direct link to usechat
无论你是通过 Mastra 的服务器 创建 API 路由,还是使用 你选择的框架,你现在都可以在 useChat() 钩子中使用这些 API 端点。
🌐 Whether you created API routes through Mastra's server or used a framework of your choice, you can now use the API endpoints in the useChat() hook.
假设你在 /chat 设置了一条使用天气代理的路由,你可以像下面看到的那样向它提问。重要的是你要设置正确的 api URL。
🌐 Assuming you set up a route at /chat that uses a weather agent, you can ask it questions as seen below. It's important that you set the correct api URL.
import { useChat } from "@ai-sdk/react";
import { useState } from "react";
import { DefaultChatTransport } from "ai";
export default function Chat() {
const [inputValue, setInputValue] = useState("")
const { messages, sendMessage } = useChat({
transport: new DefaultChatTransport({
api: "http://localhost:4111/chat",
}),
});
const handleFormSubmit = (e: React.FormEvent) => {
e.preventDefault();
sendMessage({ text: inputValue });
};
return (
<div>
<pre>{JSON.stringify(messages, null, 2)}</pre>
<form onSubmit={handleFormSubmit}>
<input value={inputValue} onChange={e => setInputValue(e.target.value)} placeholder="Name of the city" />
</form>
</div>
);
}
使用 prepareSendMessagesRequest 来自定义发送到聊天接口的请求,例如传递额外的配置给代理。
🌐 Use prepareSendMessagesRequest to customize the request sent to the chat route, for example to pass additional configuration to the agent.
useCompletion()Direct link to usecompletion
useCompletion() 钩子处理前端与 Mastra 代理之间的单轮完成,使你可以发送提示并通过 HTTP 接收流式响应。
🌐 The useCompletion() hook handles single-turn completions between your frontend and a Mastra agent, allowing you to send a prompt and receive a streamed response over HTTP.
你的前端可能看起来像这样:
🌐 Your frontend could look like this:
import { useCompletion } from '@ai-sdk/react';
export default function Page() {
const { completion, input, handleInputChange, handleSubmit } = useCompletion({
api: '/api/completion',
});
return (
<form onSubmit={handleSubmit}>
<input
name="prompt"
value={input}
onChange={handleInputChange}
id="input"
/>
<button type="submit">Submit</button>
<div>{completion}</div>
</form>
);
}
以下是实现后端的两种方法:
🌐 Below are two approaches to implementing the backend:
- Mastra Server
- Next.js
import { Mastra } from '@mastra/core/mastra';
import { registerApiRoute } from '@mastra/core/server';
import { handleChatStream } from '@mastra/ai-sdk';
import { createUIMessageStreamResponse } from 'ai';
export const mastra = new Mastra({
server: {
apiRoutes: [
registerApiRoute('/completion', {
method: 'POST',
handler: async (c) => {
const { prompt } = await c.req.json();
const mastra = c.get('mastra');
const stream = await handleChatStream({
mastra,
agentId: 'weatherAgent',
params: {
messages: [
{
id: "1",
role: 'user',
parts: [
{
type: 'text',
text: prompt
}
]
}
],
}
})
return createUIMessageStreamResponse({ stream });
}
})
]
}
});
import { handleChatStream } from '@mastra/ai-sdk';
import { createUIMessageStreamResponse } from 'ai';
import { mastra } from '@/src/mastra';
// Allow streaming responses up to 30 seconds
export const maxDuration = 30;
export async function POST(req: Request) {
const { prompt }: { prompt: string } = await req.json();
const stream = await handleChatStream({
mastra,
agentId: 'weatherAgent',
params: {
messages: [
{
id: "1",
role: 'user',
parts: [
{
type: 'text',
text: prompt
}
]
}
],
},
});
return createUIMessageStreamResponse({ stream });
}
自定义界面Direct link to 自定义界面
🌐 Custom UI
自定义 UI(也称为生成式 UI)允许你根据从 Mastra 流式传输的数据渲染自定义 React 组件。你可以创建用于工具输出、工作流进度、代理网络执行和自定义事件的可视化组件,而不是显示原始文本或 JSON。
🌐 Custom UI (also known as Generative UI) allows you to render custom React components based on data streamed from Mastra. Instead of displaying raw text or JSON, you can create visual components for tool outputs, workflow progress, agent network execution, and custom events.
当你想要时使用自定义用户界面:
🌐 Use Custom UI when you want to:
- 将工具输出呈现为可视组件(例如,将天气信息卡显示出来而不是显示 JSON)
- 使用状态指示器显示工作流步骤进度
- 通过逐步更新可视化代理网络执行
- 在长时间运行的操作中显示进度指示器或状态更新
数据部分类型Direct link to 数据部分类型
🌐 Data part types
Mastra 将数据作为消息中的“部分”传输到前端。每个部分都有一个 type 来决定如何渲染它。@mastra/ai-sdk 包将 Mastra 流转换为兼容 AI SDK 的 UI 消息数据部分。
🌐 Mastra streams data to the frontend as "parts" within messages. Each part has a type that determines how to render it. The @mastra/ai-sdk package transforms Mastra streams into AI SDK-compatible UI Message DataParts.
| 数据部分类型 | 来源 | 描述 |
|---|---|---|
tool-{toolKey} | AI SDK 内置 | 使用状态调用工具:input-available、output-available、output-error |
data-workflow | workflowRoute() | 带步骤输入、输出和状态的工作流执行 |
data-network | networkRoute() | 带有有序步骤和输出的代理网络执行 |
data-tool-agent | 工具中的嵌套代理 | 从工具的 execute() 内部流式输出代理结果 |
data-tool-workflow | 工具中的嵌套工作流 | 从工具的 execute() 内部流式输出工作流结果 |
data-tool-network | 工具中的嵌套网络 | 从工具的 execute() 内部流式输出网络结果 |
data-{custom} | writer.custom() | 用于进度指示、状态更新等的自定义事件 |
渲染工具输出Direct link to 渲染工具输出
🌐 Rendering tool outputs
当代理调用工具时,AI SDK 会自动创建 tool-{toolKey} 部件。这些部件包括工具的状态和输出,你可以使用它们来渲染自定义组件。
🌐 AI SDK automatically creates tool-{toolKey} parts when an agent calls a tool. These parts include the tool's state and output, which you can use to render custom components.
工具部件在各个状态之间循环:
🌐 The tool part cycles through states:
input-streaming:工具输入正在被流式传输(当启用工具调用流式传输时)input-available:工具已被调用,输入完整,正在等待执行output-available:工具执行完成,输出结果如下output-error:工具执行失败
这是将天气工具的输出呈现为自定义 WeatherCard 组件的一个示例。
🌐 Here's an example of rendering a weather tool's output as a custom WeatherCard component.
- Backend
- Frontend
定义一个带有 outputSchema 的工具,这样前端就知道要渲染的数据形状。
🌐 Define a tool with an outputSchema so the frontend knows the shape of the data to render.
import { createTool } from "@mastra/core/tools";
import { z } from "zod";
export const weatherTool = createTool({
id: "get-weather",
description: "Get current weather for a location",
inputSchema: z.object({
location: z.string().describe("The location to get the weather for"),
}),
outputSchema: z.object({
temperature: z.number(),
feelsLike: z.number(),
humidity: z.number(),
windSpeed: z.number(),
conditions: z.string(),
location: z.string(),
}),
execute: async (inputData) => {
const response = await fetch(
`https://api.weatherapi.com/v1/current.json?key=${process.env.WEATHER_API_KEY}&q=${inputData.location}`
);
const data = await response.json();
return {
temperature: data.current.temp_c,
feelsLike: data.current.feelslike_c,
humidity: data.current.humidity,
windSpeed: data.current.wind_kph,
conditions: data.current.condition.text,
location: data.location.name,
};
},
});
检查消息中的 tool-{toolKey} 部分,并根据工具的状态和输出显示自定义组件。
🌐 Check for tool-{toolKey} parts in the message and render a custom component based on the tool's state and output.
import { useChat } from "@ai-sdk/react";
import { DefaultChatTransport } from "ai";
import { WeatherCard } from "./weather-card";
import { Loader } from "./loader";
export function Chat() {
const { messages, sendMessage } = useChat({
transport: new DefaultChatTransport({
api: "http://localhost:4111/chat/weatherAgent",
}),
});
return (
<div>
{messages.map((message) => (
<div key={message.id}>
{message.parts.map((part, index) => {
// Handle user text messages
if (part.type === "text" && message.role === "user") {
return <p key={index}>{part.text}</p>;
}
// Handle weather tool output
if (part.type === "tool-weatherTool") {
switch (part.state) {
case "input-available":
return <Loader key={index} />;
case "output-available":
return <WeatherCard key={index} {...part.output} />;
case "output-error":
return <div key={index}>Error: {part.errorText}</div>;
default:
return null;
}
}
return null;
})}
</div>
))}
</div>
);
}
工具部件类型遵循 tool-{toolKey} 的模式,其中 toolKey 是将工具注册到代理时使用的关键字。例如,如果你将工具注册为 tools: { weatherTool },部件类型将是 tool-weatherTool。
🌐 The tool part type follows the pattern tool-{toolKey}, where toolKey is the key used when registering the tool with the agent. For example, if you register tools as tools: { weatherTool }, the part type will be tool-weatherTool.
渲染工作流程数据Direct link to 渲染工作流程数据
🌐 Rendering workflow data
在使用 workflowRoute() 或 handleWorkflowStream() 时,Mastra 会发送包含工作流执行状态的 data-workflow 部分,包括步骤状态和输出。
🌐 When using workflowRoute() or handleWorkflowStream(), Mastra emits data-workflow parts that contain the workflow's execution state, including step statuses and outputs.
- Backend
- Frontend
定义一个包含多个步骤的工作流,在执行过程中将输出 data-workflow 部件。
🌐 Define a workflow with multiple steps that will emit data-workflow parts as it executes.
import { createStep, createWorkflow } from "@mastra/core/workflows";
import { z } from "zod";
const fetchWeather = createStep({
id: "fetch-weather",
inputSchema: z.object({
location: z.string(),
}),
outputSchema: z.object({
temperature: z.number(),
conditions: z.string(),
}),
execute: async ({ inputData }) => {
// Fetch weather data...
return { temperature: 22, conditions: "Sunny" };
},
});
const planActivities = createStep({
id: "plan-activities",
inputSchema: z.object({
temperature: z.number(),
conditions: z.string(),
}),
outputSchema: z.object({
activities: z.string(),
}),
execute: async ({ inputData, mastra }) => {
const agent = mastra?.getAgent("activityAgent");
const response = await agent?.generate(
`Suggest activities for ${inputData.conditions} weather at ${inputData.temperature}°C`
);
return { activities: response?.text || "" };
},
});
export const activitiesWorkflow = createWorkflow({
id: "activities-workflow",
inputSchema: z.object({
location: z.string(),
}),
outputSchema: z.object({
activities: z.string(),
}),
})
.then(fetchWeather)
.then(planActivities);
activitiesWorkflow.commit();
将工作流注册到 Mastra,并通过 workflowRoute() 暴露它,以便将工作流事件传输到前端。
🌐 Register the workflow with Mastra and expose it via workflowRoute() to stream workflow events to the frontend.
import { Mastra } from "@mastra/core";
import { workflowRoute } from "@mastra/ai-sdk";
export const mastra = new Mastra({
workflows: { activitiesWorkflow },
server: {
apiRoutes: [
workflowRoute({
path: "/workflow/activitiesWorkflow",
workflow: "activitiesWorkflow",
}),
],
},
});
检查 data-workflow 部件,并使用 WorkflowDataPart 类型显示每个步骤的状态和输出,以确保类型安全。
🌐 Check for data-workflow parts and render each step's status and output using the WorkflowDataPart type for type safety.
import { useChat } from "@ai-sdk/react";
import { DefaultChatTransport } from "ai";
import type { WorkflowDataPart } from "@mastra/ai-sdk";
type WorkflowData = WorkflowDataPart["data"];
type StepStatus = "running" | "success" | "failed" | "suspended" | "waiting";
function StepIndicator({ name, status, output }: {
name: string;
status: StepStatus;
output: unknown;
}) {
return (
<div className="step">
<div className="step-header">
<span>{name}</span>
<span className={`status status-${status}`}>{status}</span>
</div>
{status === "success" && output && (
<pre>{JSON.stringify(output, null, 2)}</pre>
)}
</div>
);
}
export function WorkflowChat() {
const { messages, sendMessage, status } = useChat({
transport: new DefaultChatTransport({
api: "http://localhost:4111/workflow/activitiesWorkflow",
prepareSendMessagesRequest: ({ messages }) => ({
body: {
inputData: {
location: messages[messages.length - 1]?.parts[0]?.text,
},
},
}),
}),
});
return (
<div>
{messages.map((message) => (
<div key={message.id}>
{message.parts.map((part, index) => {
if (part.type === "data-workflow") {
const workflowData = part.data as WorkflowData;
const steps = Object.values(workflowData.steps);
return (
<div key={index} className="workflow-progress">
<h3>Workflow: {workflowData.name}</h3>
<p>Status: {workflowData.status}</p>
{steps.map((step) => (
<StepIndicator
key={step.name}
name={step.name}
status={step.status}
output={step.output}
/>
))}
</div>
);
}
return null;
})}
</div>
))}
</div>
);
}
有关工作流流式处理的更多详情,请参见 工作流流式处理。
🌐 For more details on workflow streaming, see Workflow Streaming.
渲染网络数据Direct link to 渲染网络数据
🌐 Rendering network data
在使用 networkRoute() 或 handleNetworkStream() 时,Mastra 会生成包含代理网络执行状态的 data-network 部分,包括调用了哪些代理及其输出。
🌐 When using networkRoute() or handleNetworkStream(), Mastra emits data-network parts that contain the agent network's execution state, including which agents were called and their outputs.
- Backend
- Frontend
在 Mastra 中注册代理,并通过 networkRoute() 暴露路由代理,将网络执行事件流传输到前端。
🌐 Register agents with Mastra and expose the routing agent via networkRoute() to stream network execution events to the frontend.
import { Mastra } from "@mastra/core";
import { networkRoute } from "@mastra/ai-sdk";
export const mastra = new Mastra({
agents: { routingAgent, researchAgent, weatherAgent },
server: {
apiRoutes: [
networkRoute({
path: "/network",
agent: "routingAgent",
}),
],
},
});
检查 data-network 部件,并使用 NetworkDataPart 类型渲染每个代理的执行步骤,以确保类型安全。
🌐 Check for data-network parts and render each agent's execution step using the NetworkDataPart type for type safety.
import { useChat } from "@ai-sdk/react";
import { DefaultChatTransport } from "ai";
import type { NetworkDataPart } from "@mastra/ai-sdk";
type NetworkData = NetworkDataPart["data"];
function AgentStep({ step }: { step: NetworkData["steps"][number] }) {
return (
<div className="agent-step">
<div className="step-header">
<span className="agent-name">{step.name}</span>
<span className={`status status-${step.status}`}>{step.status}</span>
</div>
{step.input && (
<div className="step-input">
<strong>Input:</strong>
<pre>{JSON.stringify(step.input, null, 2)}</pre>
</div>
)}
{step.output && (
<div className="step-output">
<strong>Output:</strong>
<pre>{typeof step.output === "string" ? step.output : JSON.stringify(step.output, null, 2)}</pre>
</div>
)}
</div>
);
}
export function NetworkChat() {
const { messages, sendMessage, status } = useChat({
transport: new DefaultChatTransport({
api: "http://localhost:4111/network",
}),
});
return (
<div>
{messages.map((message) => (
<div key={message.id}>
{message.parts.map((part, index) => {
if (part.type === "data-network") {
const networkData = part.data as NetworkData;
return (
<div key={index} className="network-execution">
<div className="network-header">
<h3>Agent Network: {networkData.name}</h3>
<span className={`status status-${networkData.status}`}>
{networkData.status}
</span>
</div>
<div className="network-steps">
{networkData.steps.map((step, stepIndex) => (
<AgentStep key={stepIndex} step={step} />
))}
</div>
</div>
);
}
return null;
})}
</div>
))}
</div>
);
}
有关代理网络的更多详情,请参见 代理网络。
🌐 For more details on agent networks, see Agent Networks.
自定义事件Direct link to 自定义事件
🌐 Custom events
在工具的 execute() 功能中使用 writer.custom() 来发送自定义数据部分。这对于进度指示器、状态更新或在工具执行期间的任何自定义 UI 更新都很有用。
🌐 Use writer.custom() within a tool's execute() function to emit custom data parts. This is useful for progress indicators, status updates, or any custom UI updates during tool execution.
自定义事件类型必须以 data- 开头才能被识别为数据部分。
🌐 Custom event types must start with data- to be recognized as data parts.
你必须 await writer.custom() 调用,否则可能会遇到 WritableStream is locked 错误。
🌐 You must await the writer.custom() call, otherwise you may encounter a WritableStream is locked error.
- Backend
- Frontend
在工具的 execute() 功能中使用 writer.custom() 来在执行的不同阶段发出带有自定义 data- 前缀的事件。
🌐 Use writer.custom() inside the tool's execute() function to emit custom data- prefixed events at different stages of execution.
import { createTool } from "@mastra/core/tools";
import { z } from "zod";
export const taskTool = createTool({
id: "process-task",
description: "Process a task with progress updates",
inputSchema: z.object({
task: z.string().describe("The task to process"),
}),
outputSchema: z.object({
result: z.string(),
status: z.string(),
}),
execute: async (inputData, context) => {
const { task } = inputData;
// Emit "in progress" custom event
await context?.writer?.custom({
type: "data-tool-progress",
data: {
status: "in-progress",
message: "Gathering information...",
},
});
// Simulate work
await new Promise((resolve) => setTimeout(resolve, 3000));
// Emit "done" custom event
await context?.writer?.custom({
type: "data-tool-progress",
data: {
status: "done",
message: `Successfully processed "${task}"`,
},
});
return {
result: `Task "${task}" has been completed successfully!`,
status: "completed",
};
},
});
筛选自定义事件类型的消息部分,并渲染一个随着新事件到来而更新的进度指示器。
🌐 Filter message parts for your custom event type and render a progress indicator that updates as new events arrive.
import { useChat } from "@ai-sdk/react";
import { DefaultChatTransport } from "ai";
import { useMemo } from "react";
type ProgressData = {
status: "in-progress" | "done";
message: string;
};
function ProgressIndicator({ progress }: { progress: ProgressData }) {
return (
<div className="progress-indicator">
{progress.status === "in-progress" ? (
<span className="spinner" />
) : (
<span className="check-icon" />
)}
<span className={`status-${progress.status}`}>{progress.message}</span>
</div>
);
}
export function TaskChat() {
const { messages, sendMessage } = useChat({
transport: new DefaultChatTransport({
api: "http://localhost:4111/chat/taskAgent",
}),
});
// Extract the latest progress event from messages
const latestProgress = useMemo(() => {
const allProgressParts: ProgressData[] = [];
messages.forEach((message) => {
message.parts.forEach((part) => {
if (part.type === "data-tool-progress") {
allProgressParts.push(part.data as ProgressData);
}
});
});
return allProgressParts[allProgressParts.length - 1];
}, [messages]);
return (
<div>
{latestProgress && <ProgressIndicator progress={latestProgress} />}
{messages.map((message) => (
<div key={message.id}>
{message.parts.map((part, index) => {
if (part.type === "text") {
return <p key={index}>{part.text}</p>;
}
return null;
})}
</div>
))}
</div>
);
}
工具流Direct link to 工具流
🌐 Tool streaming
工具还可以使用 context.writer.write() 流式传输数据以实现更底层的控制,或者将代理的流直接传输到工具的写入器。更多详情,请参见 工具流式传输。
🌐 Tools can also stream data using context.writer.write() for lower-level control, or pipe an agent's stream directly to the tool's writer. For more details, see Tool Streaming.
示例Direct link to 示例
🌐 Examples
有关自定义 UI 模式的实时示例,请访问 Mastra 的 UI Dojo。该仓库包括以下实现内容:
🌐 For live examples of Custom UI patterns, visit Mastra's UI Dojo. The repository includes implementations for:
秘诀Direct link to 秘诀
🌐 Recipes
流转换Direct link to 流转换
🌐 Stream transformations
要手动将 Mastra 的流转换为 AI SDK 兼容格式,请使用 toAISdkStream() 工具。具体用法示例请参见 examples。
🌐 To manually transform Mastra's streams to AI SDK-compatible format, use the toAISdkStream() utility. See the examples for concrete usage patterns.
正在加载历史消息Direct link to 正在加载历史消息
🌐 Loading historical messages
在从 Mastra 的内存中加载消息以在聊天界面显示时,使用 toAISdkV5Messages() 或 toAISdkV4Messages() 将它们转换为适用于 useChat() 的 initialMessages 的 AI SDK 格式。
🌐 When loading messages from Mastra's memory to display in a chat UI, use toAISdkV5Messages() or toAISdkV4Messages() to convert them to the appropriate AI SDK format for useChat()'s initialMessages.
传递额外数据Direct link to 传递额外数据
🌐 Passing additional data
[sendMessage()](https://ai-sdk.dev/docs/reference/ai-sdk-ui/use-chat#send-message) 允许你将额外的数据从前端传递到 Mastra。这些数据随后可以在服务器端作为 [RequestContext](/docs/server/request-context) 使用。
这是前端代码的一个示例:
🌐 Here's an example of the frontend code:
import { useChat } from "@ai-sdk/react";
import { useState } from "react";
import { DefaultChatTransport } from 'ai';
export function ChatAdditional() {
const [inputValue, setInputValue] = useState('')
const { messages, sendMessage } = useChat({
transport: new DefaultChatTransport({
api: 'http://localhost:4111/chat-extra',
}),
});
const handleFormSubmit = (e: React.FormEvent) => {
e.preventDefault();
sendMessage({ text: inputValue }, {
body: {
data: {
userId: "user123",
preferences: {
language: "en",
temperature: "celsius"
}
}
}
});
};
return (
<div>
<pre>{JSON.stringify(messages, null, 2)}</pre>
<form onSubmit={handleFormSubmit}>
<input value={inputValue} onChange={e => setInputValue(e.target.value)} placeholder="Name of the city" />
</form>
</div>
);
}
两个关于如何实现其后端部分的示例。
🌐 Two examples on how to implement the backend portion of it.
- Mastra Server
- Next.js
像上面显示的那样在你的 Mastra 配置中添加一个 chatRoute()。然后,添加一个服务器级别的中间件:
🌐 Add a chatRoute() to your Mastra configuration like shown above. Then, add a server-level middleware:
import { Mastra } from "@mastra/core";
export const mastra = new Mastra({
server: {
middleware: [
async (c, next) => {
const requestContext = c.get("requestContext");
if (c.req.method === "POST") {
const clonedReq = c.req.raw.clone();
const body = await clonedReq.json();
if (body?.data) {
for (const [key, value] of Object.entries(body.data)) {
requestContext.set(key, value);
}
}
}
await next();
},
],
},
});
你可以通过 requestContext 参数在你的工具中访问这些数据。有关更多详细信息,请参阅 请求上下文文档。
🌐 You can access this data in your tools via the requestContext parameter. See the Request Context documentation for more details.
import { handleChatStream } from '@mastra/ai-sdk';
import { RequestContext } from "@mastra/core/request-context";
import { createUIMessageStreamResponse } from 'ai';
import { mastra } from '@/src/mastra';
export async function POST(req: Request) {
const { messages, data } = await req.json();
const requestContext = new RequestContext();
if (data) {
for (const [key, value] of Object.entries(data)) {
requestContext.set(key, value);
}
}
const stream = await handleChatStream({
mastra,
agentId: 'weatherAgent',
params: {
messages,
requestContext,
},
});
return createUIMessageStreamResponse({ stream });
}
工作流暂停/恢复(需用户批准)Direct link to 工作流暂停/恢复(需用户批准)
🌐 Workflow suspend/resume with user approval
工作流可以暂停执行并等待用户输入后再继续。这对于审批流程、确认或任何需要人工干预的场景非常有用。
🌐 Workflows can suspend execution and wait for user input before continuing. This is useful for approval flows, confirmations, or any human-in-the-loop scenario.
该工作流使用:
🌐 The workflow uses:
suspendSchema/resumeSchema- 定义挂起负载和恢复输入的数据结构suspend()- 暂停工作流并将挂起数据发送到用户界面resumeData- 包含工作流恢复时用户的响应bail()- 提前退出工作流程(例如,当用户拒绝时)
- Backend
- Frontend
创建一个暂停以待审批的工作流步骤。该步骤检查 resumeData 来确定是否恢复,并在首次执行时调用 suspend()。
🌐 Create a workflow step that suspends for approval. The step checks resumeData to determine if it's resuming, and calls suspend() on first execution.
import { createStep, createWorkflow } from "@mastra/core/workflows";
import { z } from "zod";
const requestApproval = createStep({
id: "request-approval",
inputSchema: z.object({ requestId: z.string(), summary: z.string() }),
outputSchema: z.object({
approved: z.boolean(),
requestId: z.string(),
approvedBy: z.string().optional(),
}),
resumeSchema: z.object({
approved: z.boolean(),
approverName: z.string().optional(),
}),
suspendSchema: z.object({
message: z.string(),
requestId: z.string(),
}),
execute: async ({ inputData, resumeData, suspend, bail }) => {
// User rejected - bail out
if (resumeData?.approved === false) {
return bail({ message: "Request rejected" });
}
// User approved - continue
if (resumeData?.approved) {
return {
approved: true,
requestId: inputData.requestId,
approvedBy: resumeData.approverName || "User",
};
}
// First execution - suspend and wait
return await suspend({
message: `Please approve: ${inputData.summary}`,
requestId: inputData.requestId,
});
},
});
export const approvalWorkflow = createWorkflow({
id: "approval-workflow",
inputSchema: z.object({ requestId: z.string(), summary: z.string() }),
outputSchema: z.object({
approved: z.boolean(),
requestId: z.string(),
approvedBy: z.string().optional(),
}),
})
.then(requestApproval);
approvalWorkflow.commit();
注册工作流。挂起/恢复需要存储以保持状态。
🌐 Register the workflow. Storage is required for suspend/resume to persist state.
import { Mastra } from "@mastra/core";
import { workflowRoute } from "@mastra/ai-sdk";
import { LibSQLStore } from "@mastra/libsql";
export const mastra = new Mastra({
workflows: { approvalWorkflow },
storage: new LibSQLStore({
url: "file:../mastra.db",
}),
server: {
apiRoutes: [
workflowRoute({ path: "/workflow/approvalWorkflow", workflow: "approvalWorkflow" }),
],
},
});
检测工作流何时被挂起,并使用 runId、step 和 resumeData 发送恢复数据。
🌐 Detect when the workflow is suspended and send resume data with runId, step, and resumeData.
import { useChat } from "@ai-sdk/react";
import { DefaultChatTransport } from "ai";
import { useMemo, useState } from "react";
import type { WorkflowDataPart } from "@mastra/ai-sdk";
type WorkflowData = WorkflowDataPart["data"];
export function ApprovalWorkflow() {
const [requestId, setRequestId] = useState("");
const [summary, setSummary] = useState("");
const { messages, sendMessage, setMessages, status } = useChat({
transport: new DefaultChatTransport({
api: "http://localhost:4111/workflow/approvalWorkflow",
prepareSendMessagesRequest: ({ messages }) => {
const lastMessage = messages[messages.length - 1];
const text = lastMessage.parts.find((p) => p.type === "text")?.text;
const metadata = lastMessage.metadata as Record<string, string>;
// Resuming: send runId, step, and resumeData
if (text === "Approve" || text === "Reject") {
return {
body: {
runId: metadata.runId,
step: "request-approval",
resumeData: { approved: text === "Approve" },
},
};
}
// Starting: send inputData
return {
body: { inputData: { requestId: metadata.requestId, summary: metadata.summary } },
};
},
}),
});
// Find suspended workflow
const suspended = useMemo(() => {
for (const m of messages) {
for (const p of m.parts) {
if (p.type === "data-workflow" && (p.data as WorkflowData).status === "suspended") {
return { data: p.data as WorkflowData, runId: p.id };
}
}
}
return null;
}, [messages]);
const handleApprove = () => {
setMessages([]);
sendMessage({ text: "Approve", metadata: { runId: suspended?.runId } });
};
const handleReject = () => {
setMessages([]);
sendMessage({ text: "Reject", metadata: { runId: suspended?.runId } });
};
return (
<div>
{!suspended ? (
<form onSubmit={(e) => {
e.preventDefault();
setMessages([]);
sendMessage({ text: "Start", metadata: { requestId, summary } });
}}>
<input value={requestId} onChange={(e) => setRequestId(e.target.value)} placeholder="Request ID" />
<input value={summary} onChange={(e) => setSummary(e.target.value)} placeholder="Summary" />
<button type="submit" disabled={status !== "ready"}>Submit</button>
</form>
) : (
<div>
<p>{(suspended.data.steps["request-approval"]?.suspendPayload as { message: string })?.message}</p>
<button onClick={handleApprove}>Approve</button>
<button onClick={handleReject}>Reject</button>
</div>
)}
</div>
);
}
关键点:
🌐 Key points:
- 挂起负载可以通过
step.suspendPayload访问 - 要恢复,请在请求正文中发送
runId、step(步骤 ID)和resumeData - 必须配置存储以便在挂起/恢复时保留工作流状态
有关完整的实现,请参阅 UI Dojo 中的 workflow-suspend-resume 示例。
🌐 For a complete implementation, see the workflow-suspend-resume example in UI Dojo.
工具中的嵌套代理流Direct link to 工具中的嵌套代理流
🌐 Nested agent streams in tools
工具可以在内部调用代理,并将代理的输出实时传回到用户界面。这会创建可以与工具最终输出一起渲染的 data-tool-agent 部分。
🌐 Tools can call agents internally and stream the agent's output back to the UI. This creates data-tool-agent parts that can be rendered alongside the tool's final output.
该图案使用了:
🌐 The pattern uses:
context.mastra.getAgent()- 从工具中获取代理实例agent.stream()- 流式传输代理的响应stream.fullStream.pipeTo(context.writer)- 将代理的流导入工具的写入器
- Backend
- Frontend
创建一个工具,该工具调用一个代理并将其流传输到工具的写入器。
🌐 Create a tool that calls an agent and pipes its stream to the tool's writer.
import { createTool } from "@mastra/core/tools";
import { z } from "zod";
export const nestedAgentTool = createTool({
id: "nested-agent-stream",
description: "Analyze weather using a nested agent",
inputSchema: z.object({
city: z.string().describe("The city to analyze"),
}),
outputSchema: z.object({
summary: z.string(),
}),
execute: async (inputData, context) => {
const agent = context?.mastra?.getAgent("weatherAgent");
if (!agent) {
return { summary: "Weather agent not available" };
}
const stream = await agent.stream(
`Analyze the weather in ${inputData.city} and provide a summary.`
);
// Pipe the agent's stream to emit data-tool-agent parts
await stream.fullStream.pipeTo(context!.writer!);
return { summary: (await stream.text) ?? "No summary available" };
},
});
创建一个使用此工具的代理。
🌐 Create an agent that uses this tool.
import { Agent } from "@mastra/core/agent";
import { nestedAgentTool } from "../tools/nested-agent-tool";
export const forecastAgent = new Agent({
id: "forecast-agent",
instructions: "Use the nested-agent-stream tool when asked about weather.",
model: "openai/gpt-4o-mini",
tools: { nestedAgentTool },
});
处理 data-tool-agent 部分以显示嵌套代理的流式输出。
🌐 Handle data-tool-agent parts to display the nested agent's streamed output.
import { useChat } from "@ai-sdk/react";
import { DefaultChatTransport } from "ai";
import { useState } from "react";
import type { AgentDataPart } from "@mastra/ai-sdk";
export function NestedAgentChat() {
const [input, setInput] = useState("");
const { messages, sendMessage, status } = useChat({
transport: new DefaultChatTransport({
api: "http://localhost:4111/chat/forecastAgent",
}),
});
return (
<div>
<form onSubmit={(e) => {
e.preventDefault();
sendMessage({ text: input });
setInput("");
}}>
<input value={input} onChange={(e) => setInput(e.target.value)} placeholder="Enter a city" />
<button type="submit" disabled={status !== "ready"}>Get Forecast</button>
</form>
{messages.map((message) => (
<div key={message.id}>
{message.parts.map((part, index) => {
if (part.type === "text") {
return <p key={index}>{part.text}</p>;
}
if (part.type === "data-tool-agent") {
const { id, data } = part as AgentDataPart;
return (
<div key={index} className="nested-agent">
<strong>Nested Agent: {id}</strong>
{data.text && <p>{data.text}</p>}
</div>
);
}
return null;
})}
</div>
))}
</div>
);
}
关键点:
🌐 Key points:
- 将
fullStream管道输送到context.writer会产生data-tool-agent个零件 AgentDataPart有id(部件上的)和data.text(代理的流式文本)- 在流完成后,该工具仍会返回它自己的输出
有关完整的实现,请参阅 UI Dojo 中的 tool-nested-streams 示例。
🌐 For a complete implementation, see the tool-nested-streams example in UI Dojo.
来自工作流步骤的流式代理文本Direct link to 来自工作流步骤的流式代理文本
🌐 Streaming agent text from workflow steps
工作流步骤可以通过将代理的流输出传输到步骤的 writer 来实时传输代理的文本输出。这使用户在工作流执行过程中可以看到代理的“思考”过程,而不必等待步骤完成。
🌐 Workflow steps can stream an agent's text output in real-time by piping the agent's stream to the step's writer. This lets users see the agent "thinking" while the workflow executes, rather than waiting for the step to complete.
该图案使用了:
🌐 The pattern uses:
- 工作流步骤中的
writer- 将代理的fullStream管道到该步骤的写入器 text和data-workflow部分 - 前端接收逐步进度的流式文本
- Backend
- Frontend
创建一个工作流步骤,通过将代理的响应传输到步骤的 writer 来进行流式处理。
🌐 Create a workflow step that streams an agent's response by piping to the step's writer.
import { createStep, createWorkflow } from "@mastra/core/workflows";
import { z } from "zod";
import { weatherAgent } from "../agents/weather-agent";
const analyzeWeather = createStep({
id: "analyze-weather",
inputSchema: z.object({ location: z.string() }),
outputSchema: z.object({ analysis: z.string(), location: z.string() }),
execute: async ({ inputData, writer }) => {
const response = await weatherAgent.stream(
`Analyze the weather in ${inputData.location} and provide insights.`
);
// Pipe agent stream to step writer for real-time text streaming
await response.fullStream.pipeTo(writer);
return {
analysis: await response.text,
location: inputData.location,
};
},
});
const calculateScore = createStep({
id: "calculate-score",
inputSchema: z.object({ analysis: z.string(), location: z.string() }),
outputSchema: z.object({ score: z.number(), summary: z.string() }),
execute: async ({ inputData }) => {
const score = inputData.analysis.includes("sunny") ? 85 : 50;
return { score, summary: `Comfort score for ${inputData.location}: ${score}/100` };
},
});
export const weatherWorkflow = createWorkflow({
id: "weather-workflow",
inputSchema: z.object({ location: z.string() }),
outputSchema: z.object({ score: z.number(), summary: z.string() }),
})
.then(analyzeWeather)
.then(calculateScore);
weatherWorkflow.commit();
使用 workflowRoute() 注册工作流。文本流默认启用。
🌐 Register the workflow with a workflowRoute(). Text streaming is enabled by default.
import { Mastra } from "@mastra/core";
import { workflowRoute } from "@mastra/ai-sdk";
export const mastra = new Mastra({
agents: { weatherAgent },
workflows: { weatherWorkflow },
server: {
apiRoutes: [
workflowRoute({ path: "/workflow/weather", workflow: "weatherWorkflow" }),
],
},
});
渲染 text 部分(流式代理输出)和 data-workflow 部分(步骤进度)。
🌐 Render both text parts (streaming agent output) and data-workflow parts (step progress).
import { useChat } from "@ai-sdk/react";
import { DefaultChatTransport } from "ai";
import { useState } from "react";
import type { WorkflowDataPart } from "@mastra/ai-sdk";
type WorkflowData = WorkflowDataPart["data"];
export function WeatherWorkflow() {
const [location, setLocation] = useState("");
const { messages, sendMessage, status } = useChat({
transport: new DefaultChatTransport({
api: "http://localhost:4111/workflow/weather",
prepareSendMessagesRequest: ({ messages }) => ({
body: {
inputData: {
location: messages[messages.length - 1].parts.find((p) => p.type === "text")?.text,
},
},
}),
}),
});
return (
<div>
<form onSubmit={(e) => {
e.preventDefault();
sendMessage({ text: location });
setLocation("");
}}>
<input value={location} onChange={(e) => setLocation(e.target.value)} placeholder="Enter city" />
<button type="submit" disabled={status !== "ready"}>Analyze</button>
</form>
{messages.map((message) => (
<div key={message.id}>
{message.parts.map((part, index) => {
// Streaming agent text
if (part.type === "text" && message.role === "assistant") {
return (
<div key={index}>
{status === "streaming" && <p><em>Agent analyzing...</em></p>}
<p>{part.text}</p>
</div>
);
}
// Workflow step progress
if (part.type === "data-workflow") {
const workflow = part.data as WorkflowData;
return (
<div key={index}>
{Object.entries(workflow.steps).map(([stepId, step]) => (
<div key={stepId}>
<strong>{stepId}</strong>: {step.status}
</div>
))}
</div>
);
}
return null;
})}
</div>
))}
</div>
);
}
关键点:
🌐 Key points:
- 该步骤的
writer可在execute函数中使用(不是通过context) includeTextStreamParts在workflowRoute()上默认为true,因此文本默认情况下是流式处理的- 文本部分实时流式传输,而
data-workflow部分会随着步骤状态更新
有关完整实现,请参阅 UI Dojo 中的 workflow-agent-text-stream 示例。
🌐 For a complete implementation, see the workflow-agent-text-stream example in UI Dojo.
多阶段进展与分支工作流Direct link to 多阶段进展与分支工作流
🌐 Multi-stage progress with branching workflows
对于具有条件分支的工作流程(例如,快递与标准运输),你可以通过在自定义事件中包含标识符来跟踪不同分支的进度。
🌐 For workflows with conditional branching (e.g., express vs standard shipping), you can track progress across different branches by including a identifier in your custom events.
UI Dojo 示例在事件数据中使用 stage 字段来标识正在执行的分支(例如,"validation"、"standard-processing"、"express-processing")。前端通过这个字段对事件进行分组,以显示流水线式的进度界面。
🌐 The UI Dojo example uses a stage field in the event data to identify which branch is executing (e.g., "validation", "standard-processing", "express-processing"). The frontend groups events by this field to show a pipeline-style progress UI.
请查看 UI Dojo 中的 branching-workflow.ts(后端)和 workflow-custom-events.tsx(前端)。
🌐 See the branching-workflow.ts (backend) and workflow-custom-events.tsx (frontend) in UI Dojo.
代理网络中的进度指示器Direct link to 代理网络中的进度指示器
🌐 Progress indicators in agent networks
在使用代理网络时,你可以从子代理使用的工具中发出自定义进度事件,以显示当前哪个代理正在活动。
🌐 When using agent networks, you can emit custom progress events from tools used by sub-agents to show which agent is currently active.
UI Dojo 示例在事件数据中包括一个 stage 字段,用于标识正在运行的子代理(例如,"report-generation"、"report-review")。前端根据此字段对事件进行分组,并显示每个子代理的最新状态。
🌐 The UI Dojo example includes a stage field in the event data to identify which sub-agent is running (e.g., "report-generation", "report-review"). The frontend groups events by this field and displays the latest status for each.
请查看 UI Dojo 中的 report-generation-tool.ts(后端)和 agent-network-custom-events.tsx(前端)。
🌐 See the report-generation-tool.ts (backend) and agent-network-custom-events.tsx (frontend) in UI Dojo.