Skip to main content

处理器

🌐 Processors

处理器在消息通过代理时对其进行转换、验证或控制。它们在代理执行管道的特定点运行,允许你在输入到达语言模型之前修改输入,或在输出返回给用户之前修改输出。

🌐 Processors transform, validate, or control messages as they pass through an agent. They run at specific points in the agent's execution pipeline, allowing you to modify inputs before they reach the language model or outputs before they're returned to users.

处理器配置如下:

🌐 Processors are configured as:

  • inputProcessors:在消息到达语言模型之前运行。
  • outputProcessors:在语言模型生成回应之后,但在返回给用户之前运行。

你可以使用单个 Processor 对象,或使用 Mastra 的工作流原语将它们组合成工作流。工作流可以让你对处理器的执行顺序、并行处理和条件逻辑进行高级控制。

🌐 You can use individual Processor objects or compose them into workflows using Mastra's workflow primitives. Workflows give you advanced control over processor execution order, parallel processing, and conditional logic.

一些处理器同时实现了输入和输出逻辑,可以根据转换应发生的位置在任意数组中使用。

🌐 Some processors implement both input and output logic and can be used in either array depending on where the transformation should occur.

何时使用处理器
Direct link to 何时使用处理器

🌐 When to use processors

使用处理器来:

🌐 Use processors to:

  • 规范或验证用户输入
  • 为你的代理添加防护措施
  • 检测并防止提示注入或越狱尝试
  • 出于安全或合规的适度内容
  • 转换消息(例如,翻译语言、过滤工具调用)
  • 限制令牌使用或消息历史长度
  • 编辑敏感信息(个人身份信息)
  • 将自定义业务逻辑应用到消息

Mastra 包含了几个用于常见用例的处理器。你也可以为特定应用需求创建自定义处理器。

🌐 Mastra includes several processors for common use cases. You can also create custom processors for application-specific requirements.

向代理添加处理器
Direct link to 向代理添加处理器

🌐 Adding processors to an agent

导入并实例化处理器,然后将其传递给代理的 inputProcessorsoutputProcessors 数组:

🌐 Import and instantiate the processor, then pass it to the agent's inputProcessors or outputProcessors array:

src/mastra/agents/moderated-agent.ts
import { Agent } from "@mastra/core/agent";
import { ModerationProcessor } from "@mastra/core/processors";

export const moderatedAgent = new Agent({
name: "moderated-agent",
instructions: "You are a helpful assistant",
model: "openai/gpt-4o-mini",
inputProcessors: [
new ModerationProcessor({
model: "openai/gpt-4.1-nano",
categories: ["hate", "harassment", "violence"],
threshold: 0.7,
strategy: "block",
}),
],
});

执行顺序
Direct link to 执行顺序

🌐 Execution order

处理器按它们在数组中出现的顺序运行:

🌐 Processors run in the order they appear in the array:

inputProcessors: [
new UnicodeNormalizer(),
new PromptInjectionDetector(),
new ModerationProcessor(),
];

对于输出处理器,顺序决定了应用于模型响应的转换序列。

🌐 For output processors, the order determines the sequence of transformations applied to the model's response.

启用内存
Direct link to 启用内存

🌐 With memory enabled

当在代理上启用内存功能时,内存处理器会自动添加到流程中:

🌐 When memory is enabled on an agent, memory processors are automatically added to the pipeline:

输入处理器:

[Memory Processors] → [Your inputProcessors]

Memory loads message history first, then your processors run.

输出处理器:

[Your outputProcessors] → [Memory Processors]

Your processors run first, then memory persists messages.

此排序确保如果你的输出保护调用 abort(),内存处理器将被跳过,并且不会保存任何消息。有关详细信息,请参见内存处理器

🌐 This ordering ensures that if your output guardrail calls abort(), memory processors are skipped and no messages are saved. See Memory Processors for details.

创建自定义处理器
Direct link to 创建自定义处理器

🌐 Creating custom processors

自定义处理器实现了 Processor 接口:

🌐 Custom processors implement the Processor interface:

自定义输入处理器
Direct link to 自定义输入处理器

🌐 Custom input processor

src/mastra/processors/custom-input.ts
import type {
Processor,
MastraDBMessage,
RequestContext,
} from "@mastra/core";

export class CustomInputProcessor implements Processor {
id = "custom-input";

async processInput({
messages,
systemMessages,
context,
}: {
messages: MastraDBMessage[];
systemMessages: CoreMessage[];
context: RequestContext;
}): Promise<MastraDBMessage[]> {
// Transform messages before they reach the LLM
return messages.map((msg) => ({
...msg,
content: {
...msg.content,
content: msg.content.content.toLowerCase(),
},
}));
}
}

processInput 方法接收:

🌐 The processInput method receives:

  • messages:用户和助手消息(非系统消息)
  • systemMessages:所有系统消息(代理指令、内存上下文、用户提供的系统提示)
  • messageList:用于高级用例的完整 MessageList 实例
  • abort:停止处理并提前返回的函数
  • requestContext:执行元数据,如 threadIdresourceId

该方法可以返回:

🌐 The method can return:

  • MastraDBMessage[] — 已转换的消息数组(向后兼容)
  • { messages: MastraDBMessage[]; systemMessages: CoreMessage[] } — 包含消息和修改后的系统消息

该框架处理两种返回格式,因此修改系统消息是可选的,现有处理器仍然可以正常工作。

🌐 The framework handles both return formats, so modifying system messages is optional and existing processors continue to work.

修改系统消息
Direct link to 修改系统消息

🌐 Modifying system messages

要修改系统消息(例如,为较小的模型精简冗长提示),请返回同时包含 messagessystemMessages 的对象:

🌐 To modify system messages (e.g., trim verbose prompts for smaller models), return an object with both messages and systemMessages:

src/mastra/processors/system-trimmer.ts
import type { Processor, CoreMessage, MastraDBMessage } from "@mastra/core";

export class SystemTrimmer implements Processor {
id = "system-trimmer";

async processInput({
messages,
systemMessages,
}): Promise<{ messages: MastraDBMessage[]; systemMessages: CoreMessage[] }> {
// Trim system messages for smaller models
const trimmedSystemMessages = systemMessages.map((msg) => ({
...msg,
content:
typeof msg.content === "string"
? msg.content.substring(0, 500)
: msg.content,
}));

return { messages, systemMessages: trimmedSystemMessages };
}
}

这对于以下情况很有用:

🌐 This is useful for:

  • 为上下文窗口较小的模型修剪冗长的系统提示
  • 过滤或修改语义回忆内容以防止“提示过长”错误
  • 根据对话动态调整系统指令

使用 processInputStep 进行逐步处理
Direct link to 使用 processInputStep 进行逐步处理

🌐 Per-step processing with processInputStep

processInput 在代理执行开始时只运行一次时,processInputStep 会在代理循环的每一步运行(包括工具调用的后续执行)。这使得可以进行逐步配置变更,例如动态模型切换或工具选择修改。

🌐 While processInput runs once at the start of agent execution, processInputStep runs at each step of the agentic loop (including tool call continuations). This enables per-step configuration changes like dynamic model switching or tool choice modifications.

src/mastra/processors/step-processor.ts
import type { Processor, ProcessInputStepArgs, ProcessInputStepResult } from "@mastra/core";

export class DynamicModelProcessor implements Processor {
id = "dynamic-model";

async processInputStep({
stepNumber,
model,
toolChoice,
messageList,
}: ProcessInputStepArgs): Promise<ProcessInputStepResult> {
// Use a fast model for initial response
if (stepNumber === 0) {
return { model: "openai/gpt-4o-mini" };
}

// Disable tools after 5 steps to force completion
if (stepNumber > 5) {
return { toolChoice: "none" };
}

// No changes for other steps
return {};
}
}

processInputStep 方法接收:

🌐 The processInputStep method receives:

  • stepNumber:代理循环中的当前步骤(从0开始计数)
  • steps:前几步的结果
  • messages:当前消息快照(只读)
  • systemMessages:当前系统消息(只读)
  • messageList:用于变更的完整 MessageList 实例
  • model:当前使用的模型
  • tools:此步骤可用的当前工具
  • toolChoice:当前工具选择设置
  • activeTools:当前活动工具
  • providerOptions:特定于提供商的选项
  • modelSettings:模型设置,如温度
  • structuredOutput:结构化输出配置

该方法可以返回以下任意组合:

🌐 The method can return any combination of:

  • model:更改此步骤的模型
  • tools:替换或添加工具(使用扩展运算符合并:{ tools: { ...tools, newTool } }
  • toolChoice:更改工具选择行为
  • activeTools:筛选可用的工具
  • messages:替换消息(应用于消息列表)
  • systemMessages:替换所有系统消息
  • providerOptions:修改提供商选项
  • modelSettings:修改模型设置
  • structuredOutput:修改结构化输出配置

使用 prepareStep 回调
Direct link to 使用 prepareStep 回调

🌐 Using prepareStep callback

为了简化每一步的逻辑,你可以在 generate()stream() 上使用 prepareStep 回调,而不是创建完整的处理器:

🌐 For simpler per-step logic, you can use the prepareStep callback on generate() or stream() instead of creating a full processor:

await agent.generate("Complex task", {
prepareStep: async ({ stepNumber, model }) => {
if (stepNumber === 0) {
return { model: "openai/gpt-4o-mini" };
}
if (stepNumber > 5) {
return { toolChoice: "none" };
}
},
});

自定义输出处理器
Direct link to 自定义输出处理器

🌐 Custom output processor

src/mastra/processors/custom-output.ts
import type {
Processor,
MastraDBMessage,
RequestContext,
} from "@mastra/core";

export class CustomOutputProcessor implements Processor {
id = "custom-output";

async processOutputResult({
messages,
context,
}: {
messages: MastraDBMessage[];
context: RequestContext;
}): Promise<MastraDBMessage[]> {
// Transform messages after the LLM generates them
return messages.filter((msg) => msg.role !== "system");
}

async processOutputStream({
stream,
context,
}: {
stream: ReadableStream;
context: RequestContext;
}): Promise<ReadableStream> {
// Transform streaming responses
return stream;
}
}

在输出处理器中添加元数据
Direct link to 在输出处理器中添加元数据

🌐 Adding metadata in output processors

你可以在 processOutputResult 中向消息添加自定义元数据。这些元数据可以通过响应对象访问:

🌐 You can add custom metadata to messages in processOutputResult. This metadata is accessible via the response object:

src/mastra/processors/metadata-processor.ts
import type { Processor, MastraDBMessage } from "@mastra/core";

export class MetadataProcessor implements Processor {
id = "metadata-processor";

async processOutputResult({
messages,
}: {
messages: MastraDBMessage[];
}): Promise<MastraDBMessage[]> {
return messages.map((msg) => {
if (msg.role === "assistant") {
return {
...msg,
content: {
...msg.content,
metadata: {
...msg.content.metadata,
processedAt: new Date().toISOString(),
customData: "your data here",
},
},
};
}
return msg;
});
}
}

使用 generate() 访问元数据:

🌐 Access the metadata with generate():

const result = await agent.generate("Hello");

// The response includes uiMessages with processor-added metadata
const assistantMessage = result.response?.uiMessages?.find((m) => m.role === "assistant");
console.log(assistantMessage?.metadata?.customData);

在流播放时访问元数据:

🌐 Access the metadata when streaming:

const stream = await agent.stream("Hello");

for await (const chunk of stream.fullStream) {
if (chunk.type === "finish") {
// Access response with processor-added metadata from the finish chunk
const uiMessages = chunk.payload.response?.uiMessages;
const assistantMessage = uiMessages?.find((m) => m.role === "assistant");
console.log(assistantMessage?.metadata?.customData);
}
}

// Or via the response promise after consuming the stream
const response = await stream.response;
console.log(response.uiMessages);

内置实用处理器
Direct link to 内置实用处理器

🌐 Built-in Utility Processors

Mastra 为常见任务提供实用处理器:

🌐 Mastra provides utility processors for common tasks:

有关安全性和验证处理器,请参阅 Guardrails 页面,了解输入/输出防护和审核处理器。 有关特定于内存的处理器,请参阅 Memory Processors 页面,了解处理消息历史、语义回忆和工作内存的处理器。

TokenLimiter
Direct link to TokenLimiter

当总令牌数超过指定限制时,通过删除较旧的消息来防止上下文窗口溢出。

🌐 Prevents context window overflow by removing older messages when the total token count exceeds a specified limit.

import { Agent } from "@mastra/core/agent";
import { TokenLimiter } from "@mastra/core/processors";

const agent = new Agent({
name: "my-agent",
model: "openai/gpt-4o",
inputProcessors: [
// Ensure the total tokens don't exceed ~127k
new TokenLimiter(127000),
],
});

TokenLimiter 默认使用 o200k_base 编码(适用于 GPT-4o)。你可以为不同的模型指定其他编码:

🌐 The TokenLimiter uses the o200k_base encoding by default (suitable for GPT-4o). You can specify other encodings for different models:

import cl100k_base from "js-tiktoken/ranks/cl100k_base";

const agent = new Agent({
name: "my-agent",
inputProcessors: [
new TokenLimiter({
limit: 16000, // Example limit for a 16k context model
encoding: cl100k_base,
}),
],
});

ToolCallFilter
Direct link to ToolCallFilter

从发送给大语言模型的消息中移除工具调用,通过排除可能冗长的工具交互来节省令牌。

🌐 Removes tool calls from messages sent to the LLM, saving tokens by excluding potentially verbose tool interactions.

import { Agent } from "@mastra/core/agent";
import { ToolCallFilter, TokenLimiter } from "@mastra/core/processors";

const agent = new Agent({
name: "my-agent",
model: "openai/gpt-4o",
inputProcessors: [
// Example 1: Remove all tool calls/results
new ToolCallFilter(),

// Example 2: Remove only specific tool calls
new ToolCallFilter({ exclude: ["generateImageTool"] }),

// Always place TokenLimiter last
new TokenLimiter(127000),
],
});
note

上面的示例过滤了工具调用并限制了大语言模型的令牌,但这些被过滤的消息仍然会被保存到内存中。如果希望在消息保存到内存之前也进行过滤,可以在实用处理器之前手动添加内存处理器。详情请参阅 Memory Processors

🌐 The example above filters tool calls and limits tokens for the LLM, but these filtered messages will still be saved to memory. To also filter messages before they're saved to memory, manually add memory processors before utility processors. See Memory Processors for details.

ToolSearchProcessor
Direct link to ToolSearchProcessor

能够为拥有大量工具库的代理启用动态工具发现和加载。代理无需一次性提供所有工具,而是通过关键字搜索工具并按需加载,从而减少上下文令牌的使用。

🌐 Enables dynamic tool discovery and loading for agents with large tool libraries. Instead of providing all tools upfront, the agent searches for tools by keyword and loads them on demand, reducing context token usage.

import { Agent } from "@mastra/core/agent";
import { ToolSearchProcessor } from "@mastra/core/processors";

const agent = new Agent({
name: "my-agent",
model: "openai/gpt-4o",
inputProcessors: [
new ToolSearchProcessor({
tools: {
createIssue: githubTools.createIssue,
sendEmail: emailTools.send,
// ... hundreds of tools
},
search: { topK: 5, minScore: 0.1 },
}),
],
});

处理器为代理提供了两种元工具:search_tools 用于按关键字查找工具,load_tool 用于向对话中添加工具。已加载的工具会在会话线程中保持。完整的配置选项请参阅 ToolSearchProcessor 参考

🌐 The processor gives the agent two meta-tools: search_tools to find tools by keyword and load_tool to add a tool to the conversation. Loaded tools persist within the thread. See the ToolSearchProcessor reference for full configuration options.

将工作流用作处理器
Direct link to 将工作流用作处理器

🌐 Using workflows as processors

你可以将 Mastra 工作流用作处理器,以创建具有并行执行、条件分支和错误处理的复杂处理管道:

🌐 You can use Mastra workflows as processors to create complex processing pipelines with parallel execution, conditional branching, and error handling:

src/mastra/processors/moderation-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { ProcessorStepSchema } from "@mastra/core/processors";
import { Agent } from "@mastra/core/agent";

// Create a workflow that runs multiple checks in parallel
const moderationWorkflow = createWorkflow({
id: "moderation-pipeline",
inputSchema: ProcessorStepSchema,
outputSchema: ProcessorStepSchema,
})
.then(createStep(new LengthValidator({ maxLength: 10000 })))
.parallel([
createStep(new PIIDetector({ strategy: "redact" })),
createStep(new ToxicityChecker({ threshold: 0.8 })),
])
.commit();

// Use the workflow as an input processor
const agent = new Agent({
id: "moderated-agent",
name: "Moderated Agent",
model: "openai/gpt-4o",
inputProcessors: [moderationWorkflow],
});

当代理在 Mastra 注册时,处理器工作流会自动注册为工作流,使你能够在 Studio 中查看和调试它们。

🌐 When an agent is registered with Mastra, processor workflows are automatically registered as workflows, allowing you to view and debug them in the Studio.

重试机制
Direct link to 重试机制

🌐 Retry mechanism

处理器可以请求大语言模型根据反馈重试其回应。这对于实现质量检查、输出验证或迭代优化非常有用:

🌐 Processors can request that the LLM retry its response with feedback. This is useful for implementing quality checks, output validation, or iterative refinement:

src/mastra/processors/quality-checker.ts
import type { Processor } from "@mastra/core";

export class QualityChecker implements Processor {
id = "quality-checker";

async processOutputStep({ text, abort, retryCount }) {
const qualityScore = await evaluateQuality(text);

if (qualityScore < 0.7 && retryCount < 3) {
// Request a retry with feedback for the LLM
abort("Response quality score too low. Please provide a more detailed answer.", {
retry: true,
metadata: { score: qualityScore },
});
}

return [];
}
}

const agent = new Agent({
id: "quality-agent",
name: "Quality Agent",
model: "openai/gpt-4o",
outputProcessors: [new QualityChecker()],
maxProcessorRetries: 3, // Maximum retry attempts (default: 3)
});

重试机制:

🌐 The retry mechanism:

  • 仅适用于 processOutputStepprocessInputStep 方法
  • 重新执行该步骤,并将中止原因作为上下文提供给大型语言模型
  • 通过 retryCount 参数跟踪重试次数
  • 遵守代理的 maxProcessorRetries 限制

🌐 Related documentation