Skip to main content

工作流状态

🌐 Workflow State

工作流状态允许你在各个步骤之间共享数值,而无需通过每个步骤的 inputSchema 和 outputSchema 传递。这对于跟踪进度、累积结果或在整个工作流中共享配置非常有用。

🌐 Workflow state lets you share values across steps without passing them through every step's inputSchema and outputSchema. This is useful for tracking progress, accumulating results, or sharing configuration across the entire workflow.

状态与阶跃输入/输出
Direct link to 状态与阶跃输入/输出

🌐 State vs step input/output

理解 状态步进输入/输出 之间的区别很重要:

🌐 It's important to understand the difference between state and step input/output:

  • 步骤输入/输出:数据在步骤之间按顺序流动。每个步骤接收前一步的输出作为其 inputData,并返回供下一步使用的输出。
  • 状态:一个共享存储,所有步骤都可以通过 statesetState 读取和更新。状态在整个工作流执行期间保持,包括挂起/恢复周期。
src/mastra/workflows/test-workflow.ts
const step1 = createStep({
id: "step-1",
inputSchema: z.object({ workflowInput: z.string() }),
outputSchema: z.object({ step1Output: z.string() }),
stateSchema: z.object({ sharedCounter: z.number() }),
execute: async ({ inputData, state, setState }) => {
// inputData comes from workflow input or previous step's output
console.log(inputData.workflowInput);

// state is the shared workflow state
console.log(state.sharedCounter);

// Update state for subsequent steps
await setState({ sharedCounter: state.sharedCounter + 1 });

// Return output that flows to next step's inputData
return { step1Output: "processed" };
},
});

定义状态模式
Direct link to 定义状态模式

🌐 Defining state schemas

在工作流和各个步骤上定义一个 stateSchema。工作流的 stateSchema 是包含所有可能状态值的主模式,而每个步骤仅声明它所需要的子集:

🌐 Define a stateSchema on both the workflow and individual steps. The workflow's stateSchema is the master schema containing all possible state values, while each step declares only the subset it needs:

src/mastra/workflows/test-workflow.ts
const step1 = createStep({
stateSchema: z.object({
processedItems: z.array(z.string()),
}),
execute: async ({ inputData, state, setState }) => {
const { message } = inputData;
const { processedItems } = state;

await setState({
processedItems: [...processedItems, "item-1", "item-2"],
});

return {
formatted: message.toUpperCase(),
};
},
});

const step2 = createStep({
stateSchema: z.object({
metadata: z.object({
processedBy: z.string(),
}),
}),
execute: async ({ inputData, state }) => {
const { formatted } = inputData;
const { metadata } = state;

return {
emphasized: `${formatted}!! ${metadata.processedBy}`,
};
},
});

export const testWorkflow = createWorkflow({
stateSchema: z.object({
processedItems: z.array(z.string()),
metadata: z.object({
processedBy: z.string(),
}),
}),
})
.then(step1)
.then(step2)
.commit();

设置初始状态
Direct link to 设置初始状态

🌐 Setting initial state

在启动工作流运行时传递 initialState 以设置初始值:

🌐 Pass initialState when starting a workflow run to set the starting values:

const run = await workflow.createRun();

const result = await run.start({
inputData: { message: "Hello" },
initialState: {
processedItems: [],
metadata: { processedBy: "system" },
},
});

initialState 对象应与工作流中的 stateSchema 定义的结构匹配。

🌐 The initialState object should match the structure defined in the workflow's stateSchema.

在挂起/恢复期间保持状态
Direct link to 在挂起/恢复期间保持状态

🌐 State persistence across suspend/resume

状态会在挂起和恢复周期中自动保持。当工作流挂起后再次恢复时,挂起前所做的所有状态更新都会被保留:

🌐 State automatically persists across suspend and resume cycles. When a workflow suspends and later resumes, all state updates made before the suspension are preserved:

src/mastra/workflows/test-workflow.ts
const step1 = createStep({
id: "step-1",
inputSchema: z.object({}),
outputSchema: z.object({}),
stateSchema: z.object({ count: z.number(), items: z.array(z.string()) }),
resumeSchema: z.object({ proceed: z.boolean() }),
execute: async ({ state, setState, suspend, resumeData }) => {
if (!resumeData) {
// First run: update state and suspend
await setState({ count: state.count + 1, items: [...state.items, "item-1"] });
await suspend({});
return {};
}
// After resume: state changes are preserved (count: 1, items: ["item-1"])
return {};
},
});

嵌套工作流中的状态
Direct link to 嵌套工作流中的状态

🌐 State in nested workflows

在使用嵌套工作流时,状态会从父工作流传播到子工作流。父工作流在调用嵌套工作流之前所做的更改,对于嵌套工作流中的步骤是可见的:

🌐 When using nested workflows, state propagates from parent to child. Changes made by the parent workflow before calling a nested workflow are visible to steps inside the nested workflow:

src/mastra/workflows/test-workflow.ts
const nestedStep = createStep({
id: "nested-step",
inputSchema: z.object({}),
outputSchema: z.object({ result: z.string() }),
stateSchema: z.object({ sharedValue: z.string() }),
execute: async ({ state }) => {
// Receives state modified by parent workflow
return { result: `Received: ${state.sharedValue}` };
},
});

const nestedWorkflow = createWorkflow({
id: "nested-workflow",
inputSchema: z.object({}),
outputSchema: z.object({ result: z.string() }),
stateSchema: z.object({ sharedValue: z.string() }),
})
.then(nestedStep)
.commit();

const parentStep = createStep({
id: "parent-step",
inputSchema: z.object({}),
outputSchema: z.object({}),
stateSchema: z.object({ sharedValue: z.string() }),
execute: async ({ state, setState }) => {
// Modify state before nested workflow runs
await setState({ sharedValue: "modified-by-parent" });
return {};
},
});

const parentWorkflow = createWorkflow({
id: "parent-workflow",
inputSchema: z.object({}),
outputSchema: z.object({ result: z.string() }),
stateSchema: z.object({ sharedValue: z.string() }),
})
.then(parentStep)
.then(nestedWorkflow)
.commit();

🌐 Related