网络审批
🌐 Network Approval
代理网络可能需要与单个代理和工作流相同的人类参与(human-in-the-loop)监督。当网络中的工具、子代理或工作流需要批准或暂停执行时,网络会暂停并触发事件,允许你的应用在恢复之前收集用户输入。
🌐 Agent networks can require the same human-in-the-loop oversight used in individual agents and workflows. When a tool, sub-agent, or workflow within a network requires approval or suspends execution, the network pauses and emits events that allow your application to collect user input before resuming.
存储Direct link to 存储
🌐 Storage
网络审批使用快照来捕获执行状态。请确保在你的 Mastra 实例中已启用存储提供程序。如果未启用存储,你将看到与未找到快照相关的错误。
🌐 Network approval uses snapshots to capture execution state. Ensure you've enabled a storage provider in your Mastra instance. If storage isn't enabled you'll see an error relating to snapshot not found.
import { Mastra } from "@mastra/core/mastra";
import { LibSQLStore } from "@mastra/libsql";
export const mastra = new Mastra({
storage: new LibSQLStore({
id: "mastra-storage",
url: ":memory:"
})
});
批准网络工具调用Direct link to 批准网络工具调用
🌐 Approving network tool calls
当网络中的某个工具具有 requireApproval: true 时,网络流会发出一个 agent-execution-approval 数据块并暂停。为了让工具执行,请使用 runId 调用 approveNetworkToolCall。
🌐 When a tool within a network has requireApproval: true, the network stream emits an agent-execution-approval chunk and pauses. To allow the tool to execute, call approveNetworkToolCall with the runId.
const stream = await routingAgent.network("Process this query", {
memory: {
thread: "user-123",
resource: "my-app"
}
});
let runId: string;
for await (const chunk of stream) {
runId = stream.runId;
// if the requirApproval is in a tool inside a subAgent or the subAgent has requireToolApproval set to true
if (chunk.type === "agent-execution-approval") {
console.log("Tool requires approval:", chunk.payload);
}
// if the requirApproval is in a tool directly in the network agent
if (chunk.type === "tool-execution-approval") {
console.log("Tool requires approval:", chunk.payload);
}
}
// Approve and resume execution
const approvedStream = await routingAgent.approveNetworkToolCall({
runId,
memory: {
thread: "user-123",
resource: "my-app"
}
});
for await (const chunk of approvedStream) {
if (chunk.type === "network-execution-event-step-finish") {
console.log(chunk.payload.result);
}
}
拒绝网络工具调用Direct link to 拒绝网络工具调用
🌐 Declining network tool calls
要拒绝待处理的工具调用并防止执行,请调用 declineNetworkToolCall。网络将继续运行而不执行该工具。
🌐 To decline a pending tool call and prevent execution, call declineNetworkToolCall. The network continues without executing the tool.
const declinedStream = await routingAgent.declineNetworkToolCall({
runId,
memory: {
thread: "user-123",
resource: "my-app"
}
});
for await (const chunk of declinedStream) {
if (chunk.type === "network-execution-event-step-finish") {
console.log(chunk.payload.result);
}
}
恢复已暂停的网络Direct link to 恢复已暂停的网络
🌐 Resuming suspended networks
当网络中的原语调用 suspend() 时,流会发出一个带有包含原语上下文的 suspendPayload 的 agent-execution-suspended/tool-execution-suspended/workflow-execution-suspended 块。使用 resumeNetwork 提供原语请求的数据并继续执行。
🌐 When a primitive in the network calls suspend(), the stream emits an agent-execution-suspended/tool-execution-suspended/workflow-execution-suspended chunk with a suspendPayload containing context from the primitive. Use resumeNetwork to provide the data requested by the primitive and continue execution.
import { createTool } from "@mastra/core/tools";
import { z } from "zod";
const confirmationTool = createTool({
id: "confirmation-tool",
description: "Requests user confirmation before proceeding",
inputSchema: z.object({
action: z.string()
}),
outputSchema: z.object({
confirmed: z.boolean(),
action: z.string()
}),
suspendSchema: z.object({
message: z.string(),
action: z.string()
}),
resumeSchema: z.object({
confirmed: z.boolean()
}),
execute: async (inputData, context) => {
const { resumeData, suspend } = context?.agent ?? {};
if (!resumeData?.confirmed) {
return suspend?.({
message: `Please confirm: ${inputData.action}`,
action: inputData.action
});
}
return { confirmed: true, action: inputData.action };
}
});
使用用户提供的数据处理挂起和恢复:
🌐 Handle the suspension and resume with user-provided data:
const stream = await routingAgent.network("Delete the old records", {
memory: {
thread: "user-123",
resource: "my-app"
}
});
for await (const chunk of stream) {
if (chunk.type === "workflow-execution-suspended") {
console.log(chunk.payload.suspendPayload);
// { message: "Please confirm: delete old records", action: "delete old records" }
}
}
// Resume with user confirmation
const resumedStream = await routingAgent.resumeNetwork(
{ confirmed: true },
{
runId: stream.runId,
memory: {
thread: "user-123",
resource: "my-app"
}
}
);
for await (const chunk of resumedStream) {
if (chunk.type === "network-execution-event-step-finish") {
console.log(chunk.payload.result);
}
}
自动原始恢复Direct link to 自动原始恢复
🌐 Automatic primitive resumption
在使用调用 suspend() 的原语时,你可以启用自动恢复功能,让网络根据用户的下一条消息恢复暂停的原语。这会创建一种对话流,使用户自然地提供所需信息。
🌐 When using primitives that call suspend(), you can enable automatic resumption so the network resumes suspended primitives based on the user's next message. This creates a conversational flow where users provide the required information naturally.
启用自动恢复Direct link to 启用自动恢复
🌐 Enabling auto-resume
在代理的 defaultNetworkOptions 中或调用 network() 时,将 autoResumeSuspendedTools 设置为 true:
🌐 Set autoResumeSuspendedTools to true in the agent's defaultNetworkOptions or when calling network():
import { Agent } from "@mastra/core/agent";
import { Memory } from "@mastra/memory";
// Option 1: In agent configuration
const routingAgent = new Agent({
id: "routing-agent",
name: "Routing Agent",
instructions: "You coordinate tasks across multiple agents",
model: "openai/gpt-4o-mini",
tools: { confirmationTool },
memory: new Memory(),
defaultNetworkOptions: {
autoResumeSuspendedTools: true,
},
});
// Option 2: Per-request
const stream = await routingAgent.network("Process this request", {
autoResumeSuspendedTools: true,
memory: {
thread: "user-123",
resource: "my-app"
}
});
它是如何工作的Direct link to 它是如何工作的
🌐 How it works
当启用 autoResumeSuspendedTools 时:
🌐 When autoResumeSuspendedTools is enabled:
- 一个原语通过调用
suspend()并携带有效负载来挂起执行 - 挂起状态会连同对话一起保存到内存中
- 当用户在同一线程中发送下一条消息时,网络:
- 从消息历史中检测挂起的原语
- 根据工具的
resumeSchema从用户的消息中提取resumeData - 使用提取的数据自动恢复原始过程
示例Direct link to 示例
🌐 Example
const stream = await routingAgent.network("Delete the old records", {
autoResumeSuspendedTools: true,
memory: {
thread: "user-123",
resource: "my-app"
}
});
for await (const chunk of stream) {
if (chunk.type === "workflow-execution-suspended") {
console.log(chunk.payload.suspendPayload);
// { message: "Please confirm: delete old records", action: "delete old records" }
}
}
// User provides confirmation in their next message
const resumedStream = await routingAgent.network("Yes, confirmed", {
autoResumeSuspendedTools: true,
memory: {
thread: "user-123",
resource: "my-app"
}
});
for await (const chunk of resumedStream) {
if (chunk.type === "network-execution-event-step-finish") {
console.log(chunk.payload.result);
}
}
对话流程:
User: "Delete the old records"
Agent: "Please confirm: delete old records"
User: "Yes, confirmed"
Agent: "Records deleted successfully"
要求Direct link to 要求
🌐 Requirements
要使自动工具恢复功能正常工作:
🌐 For automatic tool resumption to work:
- 内存已配置:代理需要内存来跟踪跨消息挂起的工具
- 相同线程:后续消息必须使用相同的内存线程和资源标识符
resumeSchema定义:该工具(可以直接在网络代理中或在子代理中)/ 工作流(被挂起的步骤)必须定义一个resumeSchema,这样代理才能知道从用户的消息中提取哪些数据
手动与自动恢复Direct link to 手动与自动恢复
🌐 Manual vs automatic resumption
| 方法 | 使用场景 |
|---|---|
手动 (resumeNetwork()) | 程序化控制、Webhook、按钮点击、外部触发 |
自动 (autoResumeSuspendedTools) | 用户以自然语言提供简历数据的对话流程 |
两种方法都使用相同的工具定义。只有当消息历史中存在已暂停的工具且用户在同一对话线程中发送新消息时,自动恢复才会触发。
🌐 Both approaches work with the same tool definitions. Automatic resumption triggers only when suspended tools exist in the message history and the user sends a new message on the same thread.
相关Direct link to 相关
🌐 Related