Skip to main content

快照

🌐 Snapshots

在 Mastra 中,快照是工作流在特定时间点的完整执行状态的可序列化表示。快照捕获恢复工作流所需的所有信息,使其能从中断的地方精确继续执行,包括:

🌐 In Mastra, a snapshot is a serializable representation of a workflow's complete execution state at a specific point in time. Snapshots capture all the information needed to resume a workflow from exactly where it left off, including:

  • 工作流程中每个步骤的当前状态
  • 已完成步骤的输出
  • 工作流中执行的路径
  • 任何挂起的步骤及其元数据
  • 每个步骤剩余的重试次数
  • 需要额外的上下文数据以恢复执行

每当工作流被暂停时,快照会由 Mastra 自动创建和管理,并保存到配置的存储系统中。

🌐 Snapshots are automatically created and managed by Mastra whenever a workflow is suspended, and are persisted to the configured storage system.

快照在挂起和恢复中的作用
Direct link to 快照在挂起和恢复中的作用

🌐 The role of snapshots in suspend and resume

快照是实现 Mastra 挂起和恢复功能的关键机制。当工作流步骤调用 await suspend() 时:

🌐 Snapshots are the key mechanism enabling Mastra's suspend and resume capabilities. When a workflow step calls await suspend():

  1. 工作流执行在那个精确点暂停
  2. 工作流程的当前状态被捕获为快照
  3. 快照已保存到存储中
  4. 此工作流步骤被标记为“已挂起”,状态为 'suspended'
  5. 稍后,当在挂起的步骤上调用 resume() 时,快照会被检索
  6. 工作流执行从中断的地方准确地恢复

该机制提供了一种强大的方式来实现人机交互的工作流程、处理速率限制、等待外部资源,以及实现可能需要长时间暂停的复杂分支工作流程。

🌐 This mechanism provides a powerful way to implement human-in-the-loop workflows, handle rate limiting, wait for external resources, and implement complex branching workflows that may need to pause for extended periods.

快照剖析
Direct link to 快照剖析

🌐 Snapshot anatomy

每个快照包括 runId、输入、步骤状态(successsuspended 等)、任何挂起和恢复的负载,以及最终输出。这确保在恢复执行时可用的上下文是完整的。

🌐 Each snapshot includes the runId, input, step status (success, suspended, etc.), any suspend and resume payloads, and the final output. This ensures full context is available when resuming execution.

{
"runId": "34904c14-e79e-4a12-9804-9655d4616c50",
"status": "success",
"value": {},
"context": {
"input": {
"value": 100,
"user": "Michael",
"requiredApprovers": ["manager", "finance"]
},
"approval-step": {
"payload": {
"value": 100,
"user": "Michael",
"requiredApprovers": ["manager", "finance"]
},
"startedAt": 1758027577955,
"status": "success",
"suspendPayload": {
"message": "Workflow suspended",
"requestedBy": "Michael",
"approvers": ["manager", "finance"]
},
"suspendedAt": 1758027578065,
"resumePayload": { "confirm": true, "approver": "manager" },
"resumedAt": 1758027578517,
"output": { "value": 100, "approved": true },
"endedAt": 1758027578634
}
},
"activePaths": [],
"serializedStepGraph": [
{
"type": "step",
"step": {
"id": "approval-step",
"description": "Accepts a value, waits for confirmation"
}
}
],
"suspendedPaths": {},
"waitingPaths": {},
"result": { "value": 100, "approved": true },
"requestContext": {},
"timestamp": 1758027578740
}

快照的保存和检索方式
Direct link to 快照的保存和检索方式

🌐 How snapshots are saved and retrieved

快照会保存到配置的存储系统中。默认情况下,它们使用 libSQL,但你可以配置使用 Upstash 或 PostgreSQL。每个快照都保存在 workflow_snapshots 表中,并由工作流的 runId 标识。

🌐 Snapshots are saved to the configured storage system. By default, they use libSQL, but you can configure Upstash or PostgreSQL instead. Each snapshot is saved in the workflow_snapshots table and identified by the workflow's runId.

阅读更多关于:

🌐 Read more about:

保存快照
Direct link to 保存快照

🌐 Saving snapshots

当工作流被暂停时,Mastra 会自动保存包含以下步骤的工作流快照:

🌐 When a workflow is suspended, Mastra automatically persists the workflow snapshot with these steps:

  1. 在步骤执行中,suspend() 函数会触发快照过程
  2. WorkflowInstance.suspend() 方法记录了被挂起的机器
  3. persistWorkflowSnapshot() 被调用以保存当前状态
  4. 快照被序列化并存储在配置的数据库的 workflow_snapshots 表中
  5. 存储记录包括工作流名称、运行ID和序列化快照

正在检索快照
Direct link to 正在检索快照

🌐 Retrieving snapshots

当工作流恢复时,Mastra 会通过以下步骤检索已保存的快照:

🌐 When a workflow is resumed, Mastra retrieves the persisted snapshot with these steps:

  1. resume() 方法会使用特定的步骤 ID 被调用
  2. 快照是使用 loadWorkflowSnapshot() 从存储中加载的
  3. 快照已被解析并准备恢复
  4. 工作流执行已使用快照状态重新创建
  5. 挂起的步骤已恢复,执行继续
const storage = mastra.getStorage();
const workflowStore = await storage?.getStore('workflows');

const snapshot = await workflowStore?.loadWorkflowSnapshot({
runId: "<run-id>",
workflowName: "<workflow-id>",
});

console.log(snapshot);

快照的存储选项
Direct link to 快照的存储选项

🌐 Storage options for snapshots

快照使用配置在 Mastra 类上的 storage 实例进行持久化。此存储层在注册到该实例的所有工作流之间共享。Mastra 支持多种存储选项,以便在不同环境中灵活使用。

🌐 Snapshots are persisted using a storage instance configured on the Mastra class. This storage layer is shared across all workflows registered to that instance. Mastra supports multiple storage options for flexibility in different environments.

src/mastra/index.ts
import { Mastra } from "@mastra/core";
import { LibSQLStore } from "@mastra/libsql";
import { approvalWorkflow } from "./workflows";

export const mastra = new Mastra({
storage: new LibSQLStore({
id: 'mastra-storage',
url: ":memory:",
}),
workflows: { approvalWorkflow },
});

最佳实践
Direct link to 最佳实践

🌐 Best practices

  1. 确保可序列化:需要包含在快照中的任何数据都必须是可序列化的(可以转换为 JSON)。
  2. 最小化快照大小:避免将大型数据对象直接存储在工作流上下文中。相反,应存储对它们的引用(如 ID),并在需要时再获取数据。
  3. 小心处理恢复上下文:在恢复工作流时,仔细考虑提供什么样的上下文。这将与现有的快照数据合并。
  4. 建立适当的监控:对暂停的工作流,尤其是长时间运行的工作流,实现监控,以确保它们能够正确恢复。
  5. 考虑存储扩展:对于有许多挂起工作流的应用,请确保你的存储解决方案具有适当的扩展能力。

自定义快照元数据
Direct link to 自定义快照元数据

🌐 Custom snapshot metadata

在挂起工作流时,你可以通过定义 suspendSchema 附加自定义元数据。这些元数据会存储在快照中,并在恢复工作流时可用。

🌐 You can attach custom metadata when suspending a workflow by defining a suspendSchema. This metadata is stored in the snapshot and made available when the workflow is resumed.

src/mastra/workflows/test-workflow.ts
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";

const approvalStep = createStep({
id: "approval-step",
description: "Accepts a value, waits for confirmation",
inputSchema: z.object({
value: z.number(),
user: z.string(),
requiredApprovers: z.array(z.string()),
}),
suspendSchema: z.object({
message: z.string(),
requestedBy: z.string(),
approvers: z.array(z.string()),
}),
resumeSchema: z.object({
confirm: z.boolean(),
approver: z.string(),
}),
outputSchema: z.object({
value: z.number(),
approved: z.boolean(),
}),
execute: async ({ inputData, resumeData, suspend }) => {
const { value, user, requiredApprovers } = inputData;
const { confirm } = resumeData ?? {};

if (!confirm) {
return await suspend({
message: "Workflow suspended",
requestedBy: user,
approvers: [...requiredApprovers],
});
}

return {
value,
approved: confirm,
};
},
});

提供简历数据
Direct link to 提供简历数据

🌐 Providing resume data

在恢复暂停的步骤时,使用 resumeData 传递结构化输入。它必须与步骤的 resumeSchema 匹配。

🌐 Use resumeData to pass structured input when resuming a suspended step. It must match the step’s resumeSchema.

const workflow = mastra.getWorkflow("approvalWorkflow");

const run = await workflow.createRun();

const result = await run.start({
inputData: {
value: 100,
user: "Michael",
requiredApprovers: ["manager", "finance"],
},
});

if (result.status === "suspended") {
const resumedResult = await run.resume({
step: "approval-step",
resumeData: {
confirm: true,
approver: "manager",
},
});
}

🌐 Related