Agent.stream()
.stream() 方法能够实现来自具备增强能力的代理的实时响应流传输,并支持灵活的格式。该方法接受消息和可选的流式传输选项,提供下一代流式体验,同时支持 Mastra 的原生格式以及 AI SDK v5 及以上版本的兼容性。
🌐 The .stream() method enables real-time streaming of responses from an agent with enhanced capabilities and format flexibility. This method accepts messages and optional streaming options, providing a next-generation streaming experience with support for both Mastra's native format and AI SDK v5+ compatibility.
使用示例Direct link to 使用示例
🌐 Usage example
const stream = await agent.stream("message for agent");
模型兼容性:此方法适用于 V2 模型。V1 模型应使用 .streamLegacy() 方法。框架会自动检测你的模型版本,如果不匹配,将会抛出错误。
参数Direct link to 参数
🌐 Parameters
messages:
options?:
选项Direct link to 选项
🌐 Options
maxSteps?:
scorers?:
scorer:
sampling?:
tracingContext?:
returnScorerData?:
onChunk?:
onError?:
onAbort?:
abortSignal?:
activeTools?:
prepareStep?:
context?:
structuredOutput?:
schema:
model?:
errorStrategy?:
fallbackValue?:
instructions?:
jsonPromptInjection?:
providerOptions?:
outputProcessors?:
includeRawChunks?:
inputProcessors?:
instructions?:
system?:
output?:
memory?:
thread:
resource:
options?:
onFinish?:
onStepFinish?:
telemetry?:
isEnabled?:
recordInputs?:
recordOutputs?:
functionId?:
modelSettings?:
temperature?:
maxOutputTokens?:
maxRetries?:
topP?:
topK?:
presencePenalty?:
frequencyPenalty?:
stopSequences?:
toolChoice?:
'auto':
'none':
'required':
{ type: 'tool'; toolName: string }:
toolsets?:
clientTools?:
savePerStep?:
requireToolApproval?:
autoResumeSuspendedTools?:
toolCallConcurrency?:
providerOptions?:
openai?:
anthropic?:
google?:
[providerName]?:
runId?:
requestContext?:
tracingContext?:
currentSpan?:
tracingOptions?:
metadata?:
requestContextKeys?:
traceId?:
parentSpanId?:
tags?:
返回Direct link to 返回
🌐 Returns
stream:
traceId?:
扩展使用示例Direct link to 扩展使用示例
🌐 Extended usage example
Mastra 格式(默认)Direct link to Mastra 格式(默认)
🌐 Mastra Format (Default)
import { stepCountIs } from "ai-v5";
const stream = await agent.stream("Tell me a story", {
stopWhen: stepCountIs(3), // Stop after 3 steps
modelSettings: {
temperature: 0.7,
},
});
// Access text stream
for await (const chunk of stream.textStream) {
console.log(chunk);
}
// or access full stream
for await (const chunk of stream.fullStream) {
console.log(chunk);
}
// Get full text after streaming
const fullText = await stream.text;
AI SDK v5+ 格式Direct link to AI SDK v5+ 格式
🌐 AI SDK v5+ Format
要在 AI SDK v5(及更高版本)中使用该流,你可以使用我们的工具函数 toAISdkStream 进行转换。
🌐 To use the stream with AI SDK v5 (and later), you can convert it using our utility function toAISdkStream.
import { stepCountIs, createUIMessageStreamResponse } from "ai";
import { toAISdkStream } from "@mastra/ai-sdk";
const stream = await agent.stream("Tell me a story", {
stopWhen: stepCountIs(3), // Stop after 3 steps
modelSettings: {
temperature: 0.7,
},
});
// In an API route for frontend integration
return createUIMessageStreamResponse({
stream: toAISdkStream(stream, { from: "agent" }),
})
使用回调Direct link to 使用回调
🌐 Using Callbacks
现在所有回调函数都可以作为顶层属性使用,以提供更简洁的 API 体验。
🌐 All callback functions are now available as top-level properties for a cleaner API experience.
const stream = await agent.stream("Tell me a story", {
onFinish: (result) => {
console.log("Streaming finished:", result);
},
onStepFinish: (step) => {
console.log("Step completed:", step);
},
onChunk: (chunk) => {
console.log("Received chunk:", chunk);
},
onError: ({ error }) => {
console.error("Streaming error:", error);
},
onAbort: (event) => {
console.log("Stream aborted:", event);
},
});
// Process the stream
for await (const chunk of stream.textStream) {
console.log(chunk);
}
带选项的高级示例Direct link to 带选项的高级示例
🌐 Advanced Example with Options
import { z } from "zod";
import { stepCountIs } from "ai";
await agent.stream("message for agent", {
stopWhen: stepCountIs(3), // Stop after 3 steps
modelSettings: {
temperature: 0.7,
},
memory: {
thread: "user-123",
resource: "test-app",
},
toolChoice: "auto",
// Structured output with better DX
structuredOutput: {
schema: z.object({
sentiment: z.enum(["positive", "negative", "neutral"]),
confidence: z.number(),
}),
model: "openai/gpt-5.1",
errorStrategy: "warn",
},
// Output processors for streaming response validation
outputProcessors: [
new ModerationProcessor({ model: "openrouter/openai/gpt-oss-safeguard-20b" }),
new BatchPartsProcessor({ maxBatchSize: 3, maxWaitTime: 100 }),
],
});
相关Direct link to 相关
🌐 Related