Skip to main content

Agent.stream()

.stream() 方法能够实现来自具备增强能力的代理的实时响应流传输,并支持灵活的格式。该方法接受消息和可选的流式传输选项,提供下一代流式体验,同时支持 Mastra 的原生格式以及 AI SDK v5 及以上版本的兼容性。

🌐 The .stream() method enables real-time streaming of responses from an agent with enhanced capabilities and format flexibility. This method accepts messages and optional streaming options, providing a next-generation streaming experience with support for both Mastra's native format and AI SDK v5+ compatibility.

使用示例
Direct link to 使用示例

🌐 Usage example

index.ts
const stream = await agent.stream("message for agent");
info

模型兼容性:此方法适用于 V2 模型。V1 模型应使用 .streamLegacy() 方法。框架会自动检测你的模型版本,如果不匹配,将会抛出错误。

参数
Direct link to 参数

🌐 Parameters

messages:

string | string[] | CoreMessage[] | AiMessageType[] | UIMessageWithMetadata[]
The messages to send to the agent. Can be a single string, array of strings, or structured message objects.

options?:

AgentExecutionOptions<Output, Format>
Optional configuration for the streaming process.

选项
Direct link to 选项

🌐 Options

maxSteps?:

number
Maximum number of steps to run during execution.

scorers?:

MastraScorers | Record<string, { scorer: MastraScorer['name']; sampling?: ScoringSamplingConfig }>
Evaluation scorers to run on the execution results.

scorer:

string
Name of the scorer to use.

sampling?:

ScoringSamplingConfig
Sampling configuration for the scorer.

tracingContext?:

TracingContext
Tracing context for span hierarchy and metadata.

returnScorerData?:

boolean
Whether to return detailed scoring data in the response.

onChunk?:

(chunk: ChunkType) => Promise<void> | void
Callback function called for each chunk during streaming.

onError?:

({ error }: { error: Error | string }) => Promise<void> | void
Callback function called when an error occurs during streaming.

onAbort?:

(event: any) => Promise<void> | void
Callback function called when the stream is aborted.

abortSignal?:

AbortSignal
Signal object that allows you to abort the agent's execution. When the signal is aborted, all ongoing operations will be terminated.

activeTools?:

Array<keyof ToolSet> | undefined
Array of active tool names that can be used during execution.

prepareStep?:

PrepareStepFunction<any>
Callback function called before each step of multi-step execution.

context?:

ModelMessage[]
Additional context messages to provide to the agent.

structuredOutput?:

StructuredOutputOptions<S extends ZodTypeAny = ZodTypeAny>
Options to fine tune your structured output generation.

schema:

z.ZodSchema<S>
Zod schema defining the expected output structure.

model?:

MastraLanguageModel
Language model to use for structured output generation. If provided, enables the agent to respond in multi step with tool calls, text, and structured output

errorStrategy?:

'strict' | 'warn' | 'fallback'
Strategy for handling schema validation errors. 'strict' throws errors, 'warn' logs warnings, 'fallback' uses fallback values.

fallbackValue?:

<S extends ZodTypeAny>
Fallback value to use when schema validation fails and errorStrategy is 'fallback'.

instructions?:

string
Additional instructions for the structured output model.

jsonPromptInjection?:

boolean
Injects system prompt into the main agent instructing it to return structured output, useful for when a model does not natively support structured outputs.

providerOptions?:

ProviderOptions
Provider-specific options passed to the internal structuring agent. Use this to control model behavior like reasoning effort for thinking models (e.g., `{ openai: { reasoningEffort: 'low' } }`).

outputProcessors?:

Processor[]
Overrides the output processors set on the agent. Output processors that can modify or validate messages from the agent before they are returned to the user. Must implement either (or both) of the `processOutputResult` and `processOutputStream` functions.

includeRawChunks?:

boolean
Whether to include raw chunks in the stream output (not available on all model providers).

inputProcessors?:

Processor[]
Overrides the input processors set on the agent. Input processors that can modify or validate messages before they are processed by the agent. Must implement the `processInput` function.

instructions?:

string
Custom instructions that override the agent's default instructions for this specific generation. Useful for dynamically modifying agent behavior without creating a new agent instance.

system?:

string | string[] | CoreSystemMessage | SystemModelMessage | CoreSystemMessage[] | SystemModelMessage[]
Custom system message(s) to include in the prompt. Can be a single string, message object, or array of either. System messages provide additional context or behavior instructions that supplement the agent's main instructions.

output?:

Zod schema | JsonSchema7
**Deprecated.** Use structuredOutput without a model to achieve the same thing. Defines the expected structure of the output. Can be a JSON Schema object or a Zod schema.

memory?:

object
Configuration for memory. This is the preferred way to manage memory.

thread:

string | { id: string; metadata?: Record<string, any>, title?: string }
The conversation thread, as a string ID or an object with an `id` and optional `metadata`.

resource:

string
Identifier for the user or resource associated with the thread.

options?:

MemoryConfig
Configuration for memory behavior including lastMessages, readOnly, semanticRecall, and workingMemory.

onFinish?:

StreamTextOnFinishCallback<any> | StreamObjectOnFinishCallback<OUTPUT>
Callback function called when streaming completes. Receives the final result.

onStepFinish?:

StreamTextOnStepFinishCallback<any> | never
Callback function called after each execution step. Receives step details as a JSON string. Unavailable for structured output

telemetry?:

TelemetrySettings
Settings for OTLP telemetry collection during streaming (not Tracing).

isEnabled?:

boolean
Enable or disable telemetry. Disabled by default while experimental.

recordInputs?:

boolean
Enable or disable input recording. Enabled by default. You might want to disable input recording to avoid recording sensitive information.

recordOutputs?:

boolean
Enable or disable output recording. Enabled by default. You might want to disable output recording to avoid recording sensitive information.

functionId?:

string
Identifier for this function. Used to group telemetry data by function.

modelSettings?:

CallSettings
Model-specific settings like temperature, maxOutputTokens, topP, etc. These settings control how the language model generates responses.

temperature?:

number
Controls randomness in generation (0-2). Higher values make output more random.

maxOutputTokens?:

number
Maximum number of tokens to generate in the response. Note: Use maxOutputTokens (not maxTokens) as per AI SDK v5 convention.

maxRetries?:

number
Maximum number of retry attempts for failed requests.

topP?:

number
Nucleus sampling parameter (0-1). Controls diversity of generated text.

topK?:

number
Top-k sampling parameter. Limits vocabulary to k most likely tokens.

presencePenalty?:

number
Penalty for token presence (-2 to 2). Reduces repetition.

frequencyPenalty?:

number
Penalty for token frequency (-2 to 2). Reduces repetition of frequent tokens.

stopSequences?:

string[]
Stop sequences. If set, the model will stop generating text when one of the stop sequences is generated.

toolChoice?:

'auto' | 'none' | 'required' | { type: 'tool'; toolName: string }
= 'auto'
Controls how the agent uses tools during streaming.

'auto':

string
Let the model decide whether to use tools (default).

'none':

string
Do not use any tools.

'required':

string
Require the model to use at least one tool.

{ type: 'tool'; toolName: string }:

object
Require the model to use a specific tool by name.

toolsets?:

ToolsetsInput
Additional toolsets to make available to the agent during streaming.

clientTools?:

ToolsInput
Tools that are executed on the 'client' side of the request. These tools do not have execute functions in the definition.

savePerStep?:

boolean
Save messages incrementally after each stream step completes (default: false).

requireToolApproval?:

boolean
When true, all tool calls require explicit approval before execution. The stream will emit `tool-call-approval` chunks and pause until `approveToolCall()` or `declineToolCall()` is called.

autoResumeSuspendedTools?:

boolean
When true, automatically resumes suspended tools when the user sends a new message on the same thread. The agent extracts `resumeData` from the user's message based on the tool's `resumeSchema`. Requires memory to be configured.

toolCallConcurrency?:

number
Maximum number of tool calls to execute concurrently. Defaults to 1 when approval may be required, otherwise 10.

providerOptions?:

Record<string, Record<string, JSONValue>>
Additional provider-specific options that are passed through to the underlying LLM provider. The structure is `{ providerName: { optionKey: value } }`. For example: `{ openai: { reasoningEffort: 'high' }, anthropic: { maxTokens: 1000 } }`.

openai?:

Record<string, JSONValue>
OpenAI-specific options. Example: `{ reasoningEffort: 'high' }`

anthropic?:

Record<string, JSONValue>
Anthropic-specific options. Example: `{ maxTokens: 1000 }`

google?:

Record<string, JSONValue>
Google-specific options. Example: `{ safetySettings: [...] }`

[providerName]?:

Record<string, JSONValue>
Other provider-specific options. The key is the provider name and the value is a record of provider-specific options.

runId?:

string
Unique ID for this generation run. Useful for tracking and debugging purposes.

requestContext?:

RequestContext
Request Context for dependency injection and contextual information.

tracingContext?:

TracingContext
Tracing context for creating child spans and adding metadata. Automatically injected when using Mastra's tracing system.

currentSpan?:

Span
Current span for creating child spans and adding metadata. Use this to create custom child spans or update span attributes during execution.

tracingOptions?:

TracingOptions
Options for Tracing configuration.

metadata?:

Record<string, any>
Metadata to add to the root trace span. Useful for adding custom attributes like user IDs, session IDs, or feature flags.

requestContextKeys?:

string[]
Additional RequestContext keys to extract as metadata for this trace. Supports dot notation for nested values (e.g., 'user.id').

traceId?:

string
Trace ID to use for this execution (1-32 hexadecimal characters). If provided, this trace will be part of the specified trace.

parentSpanId?:

string
Parent span ID to use for this execution (1-16 hexadecimal characters). If provided, the root span will be created as a child of this span.

tags?:

string[]
Tags to apply to this trace. String labels for categorizing and filtering traces.

返回
Direct link to 返回

🌐 Returns

stream:

MastraModelOutput<Output>
Returns a MastraModelOutput instance that provides access to the streaming output.

traceId?:

string
The trace ID associated with this execution when Tracing is enabled. Use this to correlate logs and debug execution flow.

扩展使用示例
Direct link to 扩展使用示例

🌐 Extended usage example

Mastra 格式(默认)
Direct link to Mastra 格式(默认)

🌐 Mastra Format (Default)

index.ts
import { stepCountIs } from "ai-v5";

const stream = await agent.stream("Tell me a story", {
stopWhen: stepCountIs(3), // Stop after 3 steps
modelSettings: {
temperature: 0.7,
},
});

// Access text stream
for await (const chunk of stream.textStream) {
console.log(chunk);
}

// or access full stream
for await (const chunk of stream.fullStream) {
console.log(chunk);
}

// Get full text after streaming
const fullText = await stream.text;

AI SDK v5+ 格式
Direct link to AI SDK v5+ 格式

🌐 AI SDK v5+ Format

要在 AI SDK v5(及更高版本)中使用该流,你可以使用我们的工具函数 toAISdkStream 进行转换。

🌐 To use the stream with AI SDK v5 (and later), you can convert it using our utility function toAISdkStream.

index.ts
import { stepCountIs, createUIMessageStreamResponse } from "ai";
import { toAISdkStream } from "@mastra/ai-sdk";

const stream = await agent.stream("Tell me a story", {
stopWhen: stepCountIs(3), // Stop after 3 steps
modelSettings: {
temperature: 0.7,
},
});

// In an API route for frontend integration
return createUIMessageStreamResponse({
stream: toAISdkStream(stream, { from: "agent" }),
})

使用回调
Direct link to 使用回调

🌐 Using Callbacks

现在所有回调函数都可以作为顶层属性使用,以提供更简洁的 API 体验。

🌐 All callback functions are now available as top-level properties for a cleaner API experience.

index.ts
const stream = await agent.stream("Tell me a story", {
onFinish: (result) => {
console.log("Streaming finished:", result);
},
onStepFinish: (step) => {
console.log("Step completed:", step);
},
onChunk: (chunk) => {
console.log("Received chunk:", chunk);
},
onError: ({ error }) => {
console.error("Streaming error:", error);
},
onAbort: (event) => {
console.log("Stream aborted:", event);
},
});

// Process the stream
for await (const chunk of stream.textStream) {
console.log(chunk);
}

带选项的高级示例
Direct link to 带选项的高级示例

🌐 Advanced Example with Options

index.ts
import { z } from "zod";
import { stepCountIs } from "ai";

await agent.stream("message for agent", {
stopWhen: stepCountIs(3), // Stop after 3 steps
modelSettings: {
temperature: 0.7,
},
memory: {
thread: "user-123",
resource: "test-app",
},
toolChoice: "auto",
// Structured output with better DX
structuredOutput: {
schema: z.object({
sentiment: z.enum(["positive", "negative", "neutral"]),
confidence: z.number(),
}),
model: "openai/gpt-5.1",
errorStrategy: "warn",
},
// Output processors for streaming response validation
outputProcessors: [
new ModerationProcessor({ model: "openrouter/openai/gpt-oss-safeguard-20b" }),
new BatchPartsProcessor({ maxBatchSize: 3, maxWaitTime: 100 }),
],
});

🌐 Related