Skip to main content

处理器接口

🌐 Processor Interface

Processor 接口定义了 Mastra 中所有处理器的契约。处理器可以实现一个或多个方法来处理代理执行管道的不同阶段。

🌐 The Processor interface defines the contract for all processors in Mastra. Processors can implement one or more methods to handle different stages of the agent execution pipeline.

当处理器方法运行时
Direct link to 当处理器方法运行时

🌐 When processor methods run

这五种处理器方法在代理执行生命周期的不同阶段运行:

🌐 The five processor methods run at different points in the agent execution lifecycle:

┌─────────────────────────────────────────────────────────────────┐
│ Agent Execution Flow │
├─────────────────────────────────────────────────────────────────┤
│ │
│ User Input │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ processInput │ ← Runs ONCE at start │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Agentic Loop │ │
│ │ ┌─────────────────────┐ │ │
│ │ │ processInputStep │ ← Runs at EACH step │ │
│ │ └──────────┬──────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ LLM Execution │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────────────────┐ │ │
│ │ │ processOutputStream │ ← Runs on EACH stream chunk │ │
│ │ └──────────┬───────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────────────────┐ │ │
│ │ │ processOutputStep │ ← Runs after EACH LLM step │ │
│ │ └──────────┬───────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Tool Execution (if needed) │ │
│ │ │ │ │
│ │ └──────── Loop back if tools called ────────│ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ processOutputResult │ ← Runs ONCE after completion │
│ └─────────────────────┘ │
│ │ │
│ ▼ │
│ Final Response │
│ │
└─────────────────────────────────────────────────────────────────┘
方法运行时间使用场景
processInput在开始时运行一次,在代理循环之前验证/转换初始用户输入,添加上下文
processInputStep在代理循环的每一步,调用每次 LLM 之前在步骤之间转换消息,处理工具结果
processOutputStream在 LLM 响应的每个流数据块时过滤/修改流式内容,实时检测模式
processOutputStep在每次 LLM 响应后,执行工具之前验证输出质量,使用重试实现安全措施
processOutputResult在生成完成后运行一次对最终响应进行后处理,记录结果

接口定义
Direct link to 接口定义

🌐 Interface definition

interface Processor<TId extends string = string> {
readonly id: TId;
readonly name?: string;

processInput?(args: ProcessInputArgs): Promise<ProcessInputResult> | ProcessInputResult;
processInputStep?(args: ProcessInputStepArgs): ProcessorMessageResult;
processOutputStream?(args: ProcessOutputStreamArgs): Promise<ChunkType | null | undefined>;
processOutputStep?(args: ProcessOutputStepArgs): ProcessorMessageResult;
processOutputResult?(args: ProcessOutputResultArgs): ProcessorMessageResult;
}

属性
Direct link to 属性

🌐 Properties

id:

string
Unique identifier for the processor. Used for tracing and debugging.

name?:

string
Optional display name for the processor. Falls back to id if not provided.

方法
Direct link to 方法

🌐 Methods

processInput
Direct link to processInput

在输入消息发送到大型语言模型(LLM)之前处理它们。在代理执行开始时运行一次。

🌐 Processes input messages before they are sent to the LLM. Runs once at the start of agent execution.

processInput?(args: ProcessInputArgs): Promise<ProcessInputResult> | ProcessInputResult;

ProcessInputArgs
Direct link to ProcessInputArgs

messages:

MastraDBMessage[]
User and assistant messages to process (excludes system messages).

systemMessages:

CoreMessage[]
All system messages (agent instructions, memory context, user-provided). Can be modified and returned.

messageList:

MessageList
Full MessageList instance for advanced message management.

abort:

(reason?: string, options?: { retry?: boolean; metadata?: unknown }) => never
Function to abort processing. Throws a TripWire error that stops execution. Pass `retry: true` to request the LLM retry the step with feedback.

retryCount?:

number
Number of times processors have triggered retry for this generation. Use this to limit retry attempts.

tracingContext?:

TracingContext
Tracing context for observability.

requestContext?:

RequestContext
Request-scoped context with execution metadata like threadId and resourceId.

ProcessInputResult
Direct link to ProcessInputResult

该方法可以返回三种类型之一:

🌐 The method can return one of three types:

MastraDBMessage[]:

array
Transformed messages array. System messages remain unchanged.

MessageList:

MessageList
The same messageList instance passed in. Indicates you've mutated it directly.

{ messages, systemMessages }:

object
Object with both transformed messages and modified system messages.

processInputStep
Direct link to processInputStep

在代理循环的每个步骤处理输入消息,在发送给大型语言模型(LLM)之前执行。不同于仅在开始时运行一次的 processInput,此方法在每个步骤执行,包括工具调用的延续步骤。

🌐 Processes input messages at each step of the agentic loop, before they are sent to the LLM. Unlike processInput which runs once at the start, this runs at every step including tool call continuations.

processInputStep?(args: ProcessInputStepArgs): ProcessorMessageResult;

代理循环中的执行顺序
Direct link to 代理循环中的执行顺序

🌐 Execution order in the agentic loop

  1. processInput(仅在开始时一次)
  2. processInputStep 来自 inputProcessors(在每一步,在调用 LLM 之前)
  3. prepareStep 回调(作为 processInputStep 流程的一部分运行,在 inputProcessors 之后)
  4. 大型语言模型执行
  5. 工具执行(如有需要)
  6. 如果调用了工具,请从第2步重复

ProcessInputStepArgs
Direct link to ProcessInputStepArgs

messages:

MastraDBMessage[]
All messages including tool calls and results from previous steps (read-only snapshot).

messageList:

MessageList
MessageList instance for managing messages. Can mutate directly or return in result.

stepNumber:

number
Current step number (0-indexed). Step 0 is the initial LLM call.

steps:

StepResult[]
Results from previous steps, including text, toolCalls, and toolResults.

systemMessages:

CoreMessage[]
All system messages (read-only snapshot). Return in result to replace.

model:

MastraLanguageModelV2
Current model being used. Return a different model in result to switch.

toolChoice?:

ToolChoice
Current tool choice setting ('auto', 'none', 'required', or specific tool).

activeTools?:

string[]
Currently active tool names. Return filtered array to limit tools.

tools?:

ToolSet
Current tools available for this step. Return in result to add/replace tools.

providerOptions?:

SharedV2ProviderOptions
Provider-specific options (e.g., Anthropic cacheControl, OpenAI reasoningEffort).

modelSettings?:

CallSettings
Model settings like temperature, maxTokens, topP.

structuredOutput?:

StructuredOutputOptions
Structured output configuration (schema, output mode). Return in result to modify.

abort:

(reason?: string) => never
Function to abort processing.

tracingContext?:

TracingContext
Tracing context for observability.

requestContext?:

RequestContext
Request-scoped context with execution metadata.

ProcessInputStepResult
Direct link to ProcessInputStepResult

该方法可以返回这些属性的任意组合:

🌐 The method can return any combination of these properties:

model?:

LanguageModelV2 | string
Change the model for this step. Can be a model instance or router ID like 'openai/gpt-4o'.

toolChoice?:

ToolChoice
Change tool selection behavior for this step.

activeTools?:

string[]
Filter which tools are available for this step.

tools?:

ToolSet
Replace or modify tools for this step. Use spread to merge: { tools: { ...tools, newTool } }.

messages?:

MastraDBMessage[]
Replace all messages. Cannot be used with messageList.

messageList?:

MessageList
Return the same messageList instance (indicates you mutated it). Cannot be used with messages.

systemMessages?:

CoreMessage[]
Replace all system messages for this step only.

providerOptions?:

SharedV2ProviderOptions
Change provider-specific options for this step.

modelSettings?:

CallSettings
Change model settings for this step.

structuredOutput?:

StructuredOutputOptions
Change structured output configuration for this step.

处理器链
Direct link to 处理器链

🌐 Processor chaining

当多个处理器实现 processInputStep 时,它们按顺序运行,变更依次传递:

🌐 When multiple processors implement processInputStep, they run in order and changes chain through:

Processor 1: receives { model: 'gpt-4o' } → returns { model: 'gpt-4o-mini' }
Processor 2: receives { model: 'gpt-4o-mini' } → returns { toolChoice: 'none' }
Final: model = 'gpt-4o-mini', toolChoice = 'none'

系统消息隔离
Direct link to 系统消息隔离

🌐 System message isolation

系统消息在每个步骤开始时会重置为其原始值。在 processInputStep 中所做的修改仅影响当前步骤,而不会影响后续步骤。

🌐 System messages are reset to their original values at the start of each step. Modifications made in processInputStep only affect the current step, not subsequent steps.

用例
Direct link to 用例

🌐 Use cases

  • 基于步骤编号或上下文的动态模型切换
  • 在达到一定步骤后禁用工具
  • 根据对话上下文动态添加或替换工具
  • 在不同供应商之间转换消息部分类型(例如,将 reasoning 转换为 Anthropic 的 thinking
  • 根据步骤编号或累积上下文修改消息
  • 添加特定步骤的系统指令
  • 根据步骤调整提供商选项(例如,缓存控制)
  • 根据步骤上下文修改结构化输出模式

processOutputStream
Direct link to processOutputStream

处理流式输出块,并内置状态管理。允许处理器累积块并根据更大的上下文做出决策。

🌐 Processes streaming output chunks with built-in state management. Allows processors to accumulate chunks and make decisions based on larger context.

processOutputStream?(args: ProcessOutputStreamArgs): Promise<ChunkType | null | undefined>;

ProcessOutputStreamArgs
Direct link to ProcessOutputStreamArgs

part:

ChunkType
The current stream chunk being processed.

streamParts:

ChunkType[]
All chunks seen so far in the stream.

state:

Record<string, unknown>
Mutable state object that persists across chunks within a single stream.

abort:

(reason?: string) => never
Function to abort the stream.

messageList?:

MessageList
MessageList instance for accessing conversation history.

tracingContext?:

TracingContext
Tracing context for observability.

requestContext?:

RequestContext
Request-scoped context with execution metadata.

返回值
Direct link to 返回值

🌐 Return value

  • 返回 ChunkType 以发出它(可能已修改)
  • 返回 nullundefined 以跳过输出该块

processOutputResult
Direct link to processOutputResult

在流式处理或生成完成后处理完整输出结果。

🌐 Processes the complete output result after streaming or generation is finished.

processOutputResult?(args: ProcessOutputResultArgs): ProcessorMessageResult;

ProcessOutputResultArgs
Direct link to ProcessOutputResultArgs

messages:

MastraDBMessage[]
The generated response messages.

messageList:

MessageList
MessageList instance for managing messages.

abort:

(reason?: string) => never
Function to abort processing.

tracingContext?:

TracingContext
Tracing context for observability.

requestContext?:

RequestContext
Request-scoped context with execution metadata.

processOutputStep
Direct link to processOutputStep

在每次 LLM 响应后、工具执行前处理输出。不同于只在最后运行一次的 processOutputResult,它在每一步都会运行。这是实现可以触发重试的防护措施的理想方法。

🌐 Processes output after each LLM response in the agentic loop, before tool execution. Unlike processOutputResult which runs once at the end, this runs at every step. This is the ideal method for implementing guardrails that can trigger retries.

processOutputStep?(args: ProcessOutputStepArgs): ProcessorMessageResult;

ProcessOutputStepArgs
Direct link to ProcessOutputStepArgs

messages:

MastraDBMessage[]
All messages including the latest LLM response.

messageList:

MessageList
MessageList instance for managing messages.

stepNumber:

number
Current step number (0-indexed).

finishReason?:

string
The finish reason from the LLM (stop, tool-use, length, etc.).

toolCalls?:

ToolCallInfo[]
Tool calls made in this step (if any).

text?:

string
Generated text from this step.

systemMessages?:

CoreMessage[]
All system messages for read/modify access.

abort:

(reason?: string, options?: { retry?: boolean; metadata?: unknown }) => never
Function to abort processing. Pass `retry: true` to request the LLM retry the step.

retryCount?:

number
Number of times processors have triggered retry. Use this to limit retry attempts.

tracingContext?:

TracingContext
Tracing context for observability.

requestContext?:

RequestContext
Request-scoped context with execution metadata.

用例
Direct link to 用例

🌐 Use cases

  • 实现可以请求重试的质量护栏
  • 在工具执行之前验证大语言模型输出
  • 添加每步日志或指标
  • 实现带有重试功能的输出管理

示例:带重试的高质量护栏
Direct link to 示例:带重试的高质量护栏

🌐 Example: Quality guardrail with retry

src/mastra/processors/quality-guardrail.ts
import type { Processor } from "@mastra/core";

export class QualityGuardrail implements Processor {
id = "quality-guardrail";

async processOutputStep({ text, abort, retryCount }) {
const score = await evaluateResponseQuality(text);

if (score < 0.7) {
if (retryCount < 3) {
// Request retry with feedback for the LLM
abort("Response quality too low. Please provide more detail.", {
retry: true,
metadata: { qualityScore: score },
});
} else {
// Max retries reached, block the response
abort("Response quality too low after multiple attempts.");
}
}

return [];
}
}

处理器类型
Direct link to 处理器类型

🌐 Processor types

Mastra 提供类型别名,以确保处理器实现所需的方法:

🌐 Mastra provides type aliases to ensure processors implement the required methods:

// Must implement processInput OR processInputStep (or both)
type InputProcessor = Processor & (
| { processInput: required }
| { processInputStep: required }
);

// Must implement processOutputStream, processOutputStep, OR processOutputResult (or any combination)
type OutputProcessor = Processor & (
| { processOutputStream: required }
| { processOutputStep: required }
| { processOutputResult: required }
);

用法示例
Direct link to 用法示例

🌐 Usage examples

基本输入处理器
Direct link to 基本输入处理器

🌐 Basic input processor

src/mastra/processors/lowercase.ts
import type { Processor, MastraDBMessage } from "@mastra/core";

export class LowercaseProcessor implements Processor {
id = "lowercase";

async processInput({ messages }): Promise<MastraDBMessage[]> {
return messages.map((msg) => ({
...msg,
content: {
...msg.content,
parts: msg.content.parts?.map((part) =>
part.type === "text"
? { ...part, text: part.text.toLowerCase() }
: part
),
},
}));
}
}

带有 processInputStep 的逐步处理器
Direct link to 带有 processInputStep 的逐步处理器

🌐 Per-step processor with processInputStep

src/mastra/processors/dynamic-model.ts
import type { Processor, ProcessInputStepArgs, ProcessInputStepResult } from "@mastra/core";

export class DynamicModelProcessor implements Processor {
id = "dynamic-model";

async processInputStep({
stepNumber,
steps,
toolChoice,
}: ProcessInputStepArgs): Promise<ProcessInputStepResult> {
// Use a fast model for initial response
if (stepNumber === 0) {
return { model: "openai/gpt-4o-mini" };
}

// Switch to powerful model after tool calls
if (steps.length > 0 && steps[steps.length - 1].toolCalls?.length) {
return { model: "openai/gpt-4o" };
}

// Disable tools after 5 steps to force completion
if (stepNumber > 5) {
return { toolChoice: "none" };
}

return {};
}
}

带有 processInputStep 的消息转换器
Direct link to 带有 processInputStep 的消息转换器

🌐 Message transformer with processInputStep

src/mastra/processors/reasoning-transformer.ts
import type { Processor, MastraDBMessage } from "@mastra/core";

export class ReasoningTransformer implements Processor {
id = "reasoning-transformer";

async processInputStep({ messages, messageList }) {
// Transform reasoning parts to thinking parts at each step
// This is useful when switching between model providers
for (const msg of messages) {
if (msg.role === "assistant" && msg.content.parts) {
for (const part of msg.content.parts) {
if (part.type === "reasoning") {
(part as any).type = "thinking";
}
}
}
}
return messageList;
}
}

混合处理器(输入和输出)
Direct link to 混合处理器(输入和输出)

🌐 Hybrid processor (input and output)

src/mastra/processors/content-filter.ts
import type { Processor, MastraDBMessage, ChunkType } from "@mastra/core";

export class ContentFilter implements Processor {
id = "content-filter";
private blockedWords: string[];

constructor(blockedWords: string[]) {
this.blockedWords = blockedWords;
}

async processInput({ messages, abort }): Promise<MastraDBMessage[]> {
for (const msg of messages) {
const text = msg.content.parts
?.filter((p) => p.type === "text")
.map((p) => p.text)
.join(" ");

if (this.blockedWords.some((word) => text?.includes(word))) {
abort("Blocked content detected in input");
}
}
return messages;
}

async processOutputStream({ part, abort }): Promise<ChunkType | null> {
if (part.type === "text-delta") {
if (this.blockedWords.some((word) => part.textDelta.includes(word))) {
abort("Blocked content detected in output");
}
}
return part;
}
}

带状态的流累加器
Direct link to 带状态的流累加器

🌐 Stream accumulator with state

src/mastra/processors/word-counter.ts
import type { Processor, ChunkType } from "@mastra/core";

export class WordCounter implements Processor {
id = "word-counter";

async processOutputStream({ part, state }): Promise<ChunkType> {
// Initialize state on first chunk
if (!state.wordCount) {
state.wordCount = 0;
}

// Count words in text chunks
if (part.type === "text-delta") {
const words = part.textDelta.split(/\s+/).filter(Boolean);
state.wordCount += words.length;
}

// Log word count on finish
if (part.type === "finish") {
console.log(`Total words: ${state.wordCount}`);
}

return part;
}
}

🌐 Related