快照
🌐 Snapshots
在 Mastra 中,快照是工作流在特定时间点的完整执行状态的可序列化表示。快照捕获恢复工作流所需的所有信息,使其能从中断的地方精确继续执行,包括:
🌐 In Mastra, a snapshot is a serializable representation of a workflow's complete execution state at a specific point in time. Snapshots capture all the information needed to resume a workflow from exactly where it left off, including:
- 工作流程中每个步骤的当前状态
- 已完成步骤的输出
- 工作流中执行的路径
- 任何挂起的步骤及其元数据
- 每个步骤剩余的重试次数
- 需要额外的上下文数据以恢复执行
每当工作流被暂停时,快照会由 Mastra 自动创建和管理,并保存到配置的存储系统中。
🌐 Snapshots are automatically created and managed by Mastra whenever a workflow is suspended, and are persisted to the configured storage system.
快照在挂起和恢复中的作用Direct link to 快照在挂起和恢复中的作用
🌐 The role of snapshots in suspend and resume
快照是实现 Mastra 挂起和恢复功能的关键机制。当工作流步骤调用 await suspend() 时:
🌐 Snapshots are the key mechanism enabling Mastra's suspend and resume capabilities. When a workflow step calls await suspend():
- 工作流执行在那个精确点暂停
- 工作流程的当前状态被捕获为快照
- 快照已保存到存储中
- 此工作流步骤被标记为“已挂起”,状态为
'suspended' - 稍后,当在挂起的步骤上调用
resume()时,快照会被检索 - 工作流执行从中断的地方准确地恢复
该机制提供了一种强大的方式来实现人机交互的工作流程、处理速率限制、等待外部资源,以及实现可能需要长时间暂停的复杂分支工作流程。
🌐 This mechanism provides a powerful way to implement human-in-the-loop workflows, handle rate limiting, wait for external resources, and implement complex branching workflows that may need to pause for extended periods.
快照剖析Direct link to 快照剖析
🌐 Snapshot anatomy
每个快照包括 runId、输入、步骤状态(success、suspended 等)、任何挂起和恢复的负载,以及最终输出。这确保在恢复执行时可用的上下文是完整的。
🌐 Each snapshot includes the runId, input, step status (success, suspended, etc.), any suspend and resume payloads, and the final output. This ensures full context is available when resuming execution.
{
"runId": "34904c14-e79e-4a12-9804-9655d4616c50",
"status": "success",
"value": {},
"context": {
"input": {
"value": 100,
"user": "Michael",
"requiredApprovers": ["manager", "finance"]
},
"approval-step": {
"payload": {
"value": 100,
"user": "Michael",
"requiredApprovers": ["manager", "finance"]
},
"startedAt": 1758027577955,
"status": "success",
"suspendPayload": {
"message": "Workflow suspended",
"requestedBy": "Michael",
"approvers": ["manager", "finance"]
},
"suspendedAt": 1758027578065,
"resumePayload": { "confirm": true, "approver": "manager" },
"resumedAt": 1758027578517,
"output": { "value": 100, "approved": true },
"endedAt": 1758027578634
}
},
"activePaths": [],
"serializedStepGraph": [
{
"type": "step",
"step": {
"id": "approval-step",
"description": "Accepts a value, waits for confirmation"
}
}
],
"suspendedPaths": {},
"waitingPaths": {},
"result": { "value": 100, "approved": true },
"requestContext": {},
"timestamp": 1758027578740
}
快照的保存和检索方式Direct link to 快照的保存和检索方式
🌐 How snapshots are saved and retrieved
快照会保存到配置的存储系统中。默认情况下,它们使用 libSQL,但你可以配置使用 Upstash 或 PostgreSQL。每个快照都保存在 workflow_snapshots 表中,并由工作流的 runId 标识。
🌐 Snapshots are saved to the configured storage system. By default, they use libSQL, but you can configure Upstash or PostgreSQL instead. Each snapshot is saved in the workflow_snapshots table and identified by the workflow's runId.
阅读更多关于:
🌐 Read more about:
保存快照Direct link to 保存快照
🌐 Saving snapshots
当工作流被暂停时,Mastra 会自动保存包含以下步骤的工作流快照:
🌐 When a workflow is suspended, Mastra automatically persists the workflow snapshot with these steps:
- 在步骤执行中,
suspend()函数会触发快照过程 WorkflowInstance.suspend()方法记录了被挂起的机器persistWorkflowSnapshot()被调用以保存当前状态- 快照被序列化并存储在配置的数据库的
workflow_snapshots表中 - 存储记录包括工作流名称、运行ID和序列化快照
正在检索快照Direct link to 正在检索快照
🌐 Retrieving snapshots
当工作流恢复时,Mastra 会通过以下步骤检索已保存的快照:
🌐 When a workflow is resumed, Mastra retrieves the persisted snapshot with these steps:
resume()方法会使用特定的步骤 ID 被调用- 快照是使用
loadWorkflowSnapshot()从存储中加载的 - 快照已被解析并准备恢复
- 工作流执行已使用快照状态重新创建
- 挂起的步骤已恢复,执行继续
const storage = mastra.getStorage();
const workflowStore = await storage?.getStore('workflows');
const snapshot = await workflowStore?.loadWorkflowSnapshot({
runId: "<run-id>",
workflowName: "<workflow-id>",
});
console.log(snapshot);
快照的存储选项Direct link to 快照的存储选项
🌐 Storage options for snapshots
快照使用配置在 Mastra 类上的 storage 实例进行持久化。此存储层在注册到该实例的所有工作流之间共享。Mastra 支持多种存储选项,以便在不同环境中灵活使用。
🌐 Snapshots are persisted using a storage instance configured on the Mastra class. This storage layer is shared across all workflows registered to that instance. Mastra supports multiple storage options for flexibility in different environments.
import { Mastra } from "@mastra/core";
import { LibSQLStore } from "@mastra/libsql";
import { approvalWorkflow } from "./workflows";
export const mastra = new Mastra({
storage: new LibSQLStore({
id: 'mastra-storage',
url: ":memory:",
}),
workflows: { approvalWorkflow },
});
最佳实践Direct link to 最佳实践
🌐 Best practices
- 确保可序列化:需要包含在快照中的任何数据都必须是可序列化的(可以转换为 JSON)。
- 最小化快照大小:避免将大型数据对象直接存储在工作流上下文中。相反,应存储对它们的引用(如 ID),并在需要时再获取数据。
- 小心处理恢复上下文:在恢复工作流时,仔细考虑提供什么样的上下文。这将与现有的快照数据合并。
- 建立适当的监控:对暂停的工作流,尤其是长时间运行的工作流,实现监控,以确保它们能够正确恢复。
- 考虑存储扩展:对于有许多挂起工作流的应用,请确保你的存储解决方案具有适当的扩展能力。
自定义快照元数据Direct link to 自定义快照元数据
🌐 Custom snapshot metadata
在挂起工作流时,你可以通过定义 suspendSchema 附加自定义元数据。这些元数据会存储在快照中,并在恢复工作流时可用。
🌐 You can attach custom metadata when suspending a workflow by defining a suspendSchema. This metadata is stored in the snapshot and made available when the workflow is resumed.
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const approvalStep = createStep({
id: "approval-step",
description: "Accepts a value, waits for confirmation",
inputSchema: z.object({
value: z.number(),
user: z.string(),
requiredApprovers: z.array(z.string()),
}),
suspendSchema: z.object({
message: z.string(),
requestedBy: z.string(),
approvers: z.array(z.string()),
}),
resumeSchema: z.object({
confirm: z.boolean(),
approver: z.string(),
}),
outputSchema: z.object({
value: z.number(),
approved: z.boolean(),
}),
execute: async ({ inputData, resumeData, suspend }) => {
const { value, user, requiredApprovers } = inputData;
const { confirm } = resumeData ?? {};
if (!confirm) {
return await suspend({
message: "Workflow suspended",
requestedBy: user,
approvers: [...requiredApprovers],
});
}
return {
value,
approved: confirm,
};
},
});
提供简历数据Direct link to 提供简历数据
🌐 Providing resume data
在恢复暂停的步骤时,使用 resumeData 传递结构化输入。它必须与步骤的 resumeSchema 匹配。
🌐 Use resumeData to pass structured input when resuming a suspended step. It must match the step’s resumeSchema.
const workflow = mastra.getWorkflow("approvalWorkflow");
const run = await workflow.createRun();
const result = await run.start({
inputData: {
value: 100,
user: "Michael",
requiredApprovers: ["manager", "finance"],
},
});
if (result.status === "suspended") {
const resumedResult = await run.resume({
step: "approval-step",
resumeData: {
confirm: true,
approver: "manager",
},
});
}
相关Direct link to 相关
🌐 Related