Skip to main content

Run.stream()

.stream() 方法支持从工作流实时流式传输响应。它直接返回一个 ReadableStream 事件流。

🌐 The .stream() method enables real-time streaming of responses from a workflow. It returns a ReadableStream of events directly.

使用示例
Direct link to 使用示例

🌐 Usage example

const run = await workflow.createRun();

const stream = await run.stream({
inputData: {
value: "initial data",
},
});

for await (const chunk of stream) {
console.log(chunk);
}

参数
Direct link to 参数

🌐 Parameters

inputData?:

z.infer<TInput>
Input data that matches the workflow's input schema

requestContext?:

RequestContext
Request Context data to use during workflow execution

tracingContext?:

TracingContext
Tracing context for creating child spans and adding metadata.

currentSpan?:

Span
Current span for creating child spans and adding metadata.

tracingOptions?:

TracingOptions
Options for Tracing configuration.

metadata?:

Record<string, any>
Metadata to add to the root trace span.

requestContextKeys?:

string[]
Additional RequestContext keys to extract as metadata for this trace. Supports dot notation for nested values (e.g., 'user.id').

traceId?:

string
Trace ID to use for this execution (1-32 hexadecimal characters). If provided, this trace will be part of the specified trace.

parentSpanId?:

string
Parent span ID to use for this execution (1-16 hexadecimal characters). If provided, the root span will be created as a child of this span.

tags?:

string[]
Tags to apply to this trace. String labels for categorizing and filtering traces.

closeOnSuspend?:

boolean
Whether to close the stream when the workflow is suspended, or to keep the stream open until the workflow is finished (by success or error). Default value is true.

返回
Direct link to 返回

🌐 Returns

返回一个实现异步可迭代接口的 WorkflowRunOutput 对象(可以直接用于 for await...of 循环),并提供对流和工作流执行结果的访问。

🌐 Returns a WorkflowRunOutput object that implements the async iterable interface (can be used directly in for await...of loops) and provides access to the stream and workflow execution results.

fullStream:

ReadableStream<WorkflowStreamEvent>
A ReadableStream of workflow events that you can iterate over to track progress in real-time. You can also iterate over the WorkflowRunOutput object directly.

result:

Promise<WorkflowResult<TState, TInput, TOutput, TSteps>>
A promise that resolves to the final workflow result

status:

WorkflowRunStatus
The current workflow run status ('running', 'suspended', 'success', 'failed', 'canceled', or 'tripwire')

usage:

Promise<{ inputTokens: number; outputTokens: number; totalTokens: number, reasoningTokens?: number, cachedInputTokens?: number }>
A promise that resolves to token usage statistics

扩展使用示例
Direct link to 扩展使用示例

🌐 Extended usage example

const run = await workflow.createRun();

const stream = run.stream({
inputData: {
value: "initial data",
},
});

// Iterate over stream events (you can iterate over stream directly or use stream.fullStream)
for await (const chunk of stream) {
console.log(chunk);
}

// Access the final result
const result = await stream.result;
console.log("Final result:", result);

// Access token usage
const usage = await stream.usage;
console.log("Token usage:", usage);

// Check current status
console.log("Status:", stream.status);

流事件
Direct link to 流事件

🌐 Stream Events

在工作流执行过程中,流会发出各种类型的事件。每个事件都有一个 type 字段和一个包含相关数据的 payload

🌐 The stream emits various event types during workflow execution. Each event has a type field and a payload containing relevant data:

  • workflow-start:工作流执行开始
  • workflow-step-start:一个步骤开始执行
  • workflow-step-output:步骤的自定义输出
  • workflow-step-result:一个步骤完成并产生结果
  • workflow-finish:工作流执行完成,附带使用统计

🌐 Related