Skip to main content

复合存储

🌐 Composite Storage

MastraCompositeStore 可以从不同的提供商组合存储域。当你需要为不同用途使用不同的数据库时,可以使用它。例如,使用 LibSQL 处理内存,使用 PostgreSQL 处理工作流。

安装
Direct link to 安装

🌐 Installation

MastraCompositeStore 包含在 @mastra/core 中:

npm install @mastra/core@latest

你还需要安装你想要组合的存储提供程序:

🌐 You'll also need to install the storage providers you want to compose:

npm install @mastra/pg@latest @mastra/libsql@latest

存储域
Direct link to 存储域

🌐 Storage domains

Mastra 将存储组织为五个专门的字段,每个字段处理特定类型的数据。每个字段可以由不同的存储适配器支持,且字段类从每个存储包中导出。

🌐 Mastra organizes storage into five specialized domains, each handling a specific type of data. Each domain can be backed by a different storage adapter, and domain classes are exported from each storage package.

描述
memory代理的对话持久化。存储对话线程(会话)、消息、资源(用户身份)以及工作内存(跨对话的持久上下文)。
workflows工作流执行状态。当工作流因等待人工输入、外部事件或计划恢复而挂起时,其状态会在此处持久化,以便服务器重启后能够恢复。
scoresMastra 评估系统的评估结果。分数和指标会在此处持久化,以便进行分析和长期比较。
observability遥测数据,包括跟踪和跨度。代理交互、工具调用以及大语言模型请求会生成跨度,这些跨度会被收集成跟踪,用于调试和性能分析。
agents已存储代理的配置。允许在运行时定义和更新代理,而无需进行代码部署。

用法
Direct link to 用法

🌐 Usage

基础组成
Direct link to 基础组成

🌐 Basic composition

直接从每个存储包导入字段类并进行组合:

🌐 Import domain classes directly from each store package and compose them:

src/mastra/index.ts
import { MastraCompositeStore } from "@mastra/core/storage";
import { WorkflowsPG, ScoresPG } from "@mastra/pg";
import { MemoryLibSQL } from "@mastra/libsql";
import { Mastra } from "@mastra/core";

export const mastra = new Mastra({
storage: new MastraCompositeStore({
id: "composite",
domains: {
memory: new MemoryLibSQL({ url: "file:./local.db" }),
workflows: new WorkflowsPG({ connectionString: process.env.DATABASE_URL }),
scores: new ScoresPG({ connectionString: process.env.DATABASE_URL }),
},
}),
});

使用默认存储
Direct link to 使用默认存储

🌐 With a default storage

使用 default 指定一个备用存储,然后覆盖特定的域:

🌐 Use default to specify a fallback storage, then override specific domains:

src/mastra/index.ts
import { MastraCompositeStore } from "@mastra/core/storage";
import { PostgresStore } from "@mastra/pg";
import { MemoryLibSQL } from "@mastra/libsql";
import { Mastra } from "@mastra/core";

const pgStore = new PostgresStore({
id: "pg",
connectionString: process.env.DATABASE_URL,
});

export const mastra = new Mastra({
storage: new MastraCompositeStore({
id: "composite",
default: pgStore,
domains: {
memory: new MemoryLibSQL({ url: "file:./local.db" }),
},
}),
});

选项
Direct link to 选项

🌐 Options

id:

string
Unique identifier for this storage instance.

default?:

MastraCompositeStore
Default storage adapter. Domains not explicitly specified in `domains` will use this storage's domains as fallbacks.

domains?:

object
Individual domain overrides. Each domain can come from a different storage adapter. These take precedence over the default storage.

domains.memory?:

MemoryStorage
Storage for threads, messages, and resources.

domains.workflows?:

WorkflowsStorage
Storage for workflow snapshots.

domains.scores?:

ScoresStorage
Storage for evaluation scores.

domains.observability?:

ObservabilityStorage
Storage for traces and spans.

domains.agents?:

AgentsStorage
Storage for stored agent configurations.

disableInit?:

boolean
When true, automatic initialization is disabled. You must call init() explicitly.

初始化
Direct link to 初始化

🌐 Initialization

MastraCompositeStore 独立初始化每个配置的域。当传递给 Mastra 类时,init() 会自动被调用:

src/mastra/index.ts
import { MastraCompositeStore } from "@mastra/core/storage";
import { MemoryPG, WorkflowsPG, ScoresPG } from "@mastra/pg";
import { Mastra } from "@mastra/core";

const storage = new MastraCompositeStore({
id: "composite",
domains: {
memory: new MemoryPG({ connectionString: process.env.DATABASE_URL }),
workflows: new WorkflowsPG({ connectionString: process.env.DATABASE_URL }),
scores: new ScoresPG({ connectionString: process.env.DATABASE_URL }),
},
});

export const mastra = new Mastra({
storage, // init() called automatically
});

如果直接使用存储,请显式调用 init()

🌐 If using storage directly, call init() explicitly:

import { MastraCompositeStore } from "@mastra/core/storage";
import { MemoryPG } from "@mastra/pg";

const storage = new MastraCompositeStore({
id: "composite",
domains: {
memory: new MemoryPG({ connectionString: process.env.DATABASE_URL }),
},
});

await storage.init();

// Access domain-specific stores via getStore()
const memoryStore = await storage.getStore("memory");
const thread = await memoryStore?.getThreadById({ threadId: "..." });

用例
Direct link to 用例

🌐 Use cases

针对不同工作负载的独立数据库
Direct link to 针对不同工作负载的独立数据库

🌐 Separate databases for different workloads

在开发过程中使用本地数据库,同时将生产数据保存在托管服务中:

🌐 Use a local database for development while keeping production data in a managed service:

import { MastraCompositeStore } from "@mastra/core/storage";
import { MemoryPG, WorkflowsPG, ScoresPG } from "@mastra/pg";
import { MemoryLibSQL } from "@mastra/libsql";

const storage = new MastraCompositeStore({
id: "composite",
domains: {
// Use local SQLite for development, PostgreSQL for production
memory:
process.env.NODE_ENV === "development"
? new MemoryLibSQL({ url: "file:./dev.db" })
: new MemoryPG({ connectionString: process.env.DATABASE_URL }),
workflows: new WorkflowsPG({ connectionString: process.env.DATABASE_URL }),
scores: new ScoresPG({ connectionString: process.env.DATABASE_URL }),
},
});

用于可观测性的专用存储
Direct link to 用于可观测性的专用存储

🌐 Specialized storage for observability

在生产环境中,可观测性数据可能会迅速压垮通用数据库。单个代理交互就可能生成数百个跨度,而高流量应用每天可能产生数千条跟踪。

🌐 Observability data can quickly overwhelm general-purpose databases in production. A single agent interaction can generate hundreds of spans, and high-traffic applications can produce thousands of traces per day.

ClickHouse 被推荐用于生产环境的可观测性,因为它针对高流量、大量写入的分析工作负载进行了优化。使用复合存储将可观测数据路由到 ClickHouse,同时将其他数据保存在主数据库中:

import { MastraCompositeStore } from "@mastra/core/storage";
import { MemoryPG, WorkflowsPG, ScoresPG } from "@mastra/pg";
import { ObservabilityStorageClickhouse } from "@mastra/clickhouse";

const storage = new MastraCompositeStore({
id: "composite",
domains: {
memory: new MemoryPG({ connectionString: process.env.DATABASE_URL }),
workflows: new WorkflowsPG({ connectionString: process.env.DATABASE_URL }),
scores: new ScoresPG({ connectionString: process.env.DATABASE_URL }),
observability: new ObservabilityStorageClickhouse({
url: process.env.CLICKHOUSE_URL,
username: process.env.CLICKHOUSE_USERNAME,
password: process.env.CLICKHOUSE_PASSWORD,
}),
},
});
info

当使用不支持可观测性的存储提供商(如 Convex、DynamoDB 或 Cloudflare)时,也需要采用这种方法。有关支持的提供商完整列表,请参阅 DefaultExporter 文档

🌐 This approach is also required when using storage providers that don't support observability (like Convex, DynamoDB, or Cloudflare). See the DefaultExporter documentation for the full list of supported providers.