PostgreSQL 存储
🌐 PostgreSQL Storage
PostgreSQL 存储实现提供了一个可用于生产环境的存储解决方案,使用 PostgreSQL 数据库。
🌐 The PostgreSQL storage implementation provides a production-ready storage solution using PostgreSQL databases.
安装Direct link to 安装
🌐 Installation
- npm
- pnpm
- Yarn
- Bun
npm install @mastra/pg@latest
pnpm add @mastra/pg@latest
yarn add @mastra/pg@latest
bun add @mastra/pg@latest
用法Direct link to 用法
🌐 Usage
import { PostgresStore } from "@mastra/pg";
const storage = new PostgresStore({
id: 'pg-storage',
connectionString: process.env.DATABASE_URL,
});
参数Direct link to 参数
🌐 Parameters
id:
connectionString?:
host?:
port?:
database?:
user?:
password?:
pool?:
schemaName?:
ssl?:
max?:
idleTimeoutMillis?:
disableInit?:
skipDefaultIndexes?:
indexes?:
构造函数示例Direct link to 构造函数示例
🌐 Constructor Examples
你可以通过以下方式实例化 PostgresStore:
🌐 You can instantiate PostgresStore in the following ways:
import { PostgresStore } from "@mastra/pg";
import { Pool } from "pg";
// Using a connection string
const store1 = new PostgresStore({
id: 'pg-storage-1',
connectionString: "postgresql://user:password@localhost:5432/mydb",
});
// Using a connection string with pool options
const store2 = new PostgresStore({
id: 'pg-storage-2',
connectionString: "postgresql://user:password@localhost:5432/mydb",
schemaName: "custom_schema",
max: 30, // Max pool connections
idleTimeoutMillis: 60000, // Idle timeout
ssl: { rejectUnauthorized: false },
});
// Using individual connection parameters
const store3 = new PostgresStore({
id: 'pg-storage-3',
host: "localhost",
port: 5432,
database: "mydb",
user: "user",
password: "password",
});
// Using a pre-configured pg.Pool (recommended for pool reuse)
const existingPool = new Pool({
connectionString: "postgresql://user:password@localhost:5432/mydb",
max: 20,
// ... your custom pool configuration
});
const store4 = new PostgresStore({
id: 'pg-storage-4',
pool: existingPool,
schemaName: "custom_schema", // optional
});
附加说明Direct link to 附加说明
🌐 Additional Notes
模式管理Direct link to 模式管理
🌐 Schema Management
存储实现会自动处理模式的创建和更新。它会创建以下表格:
🌐 The storage implementation handles schema creation and updates automatically. It creates the following tables:
mastra_workflow_snapshot:存储工作流状态和执行数据mastra_evals:存储评估结果和元数据mastra_threads:存储对话线程mastra_messages:存储单条消息mastra_traces:存储遥测和跟踪数据mastra_scorers:存储评分和评估数据mastra_resources:存储资源工作内存数据
可观测性Direct link to 可观测性
🌐 Observability
PostgreSQL 支持可观测性,并且可以处理低量的跟踪数据。吞吐量能力取决于部署因素,如硬件、模式设计、索引和保留策略,并应针对你的特定环境进行验证。对于高流量的生产环境,请考虑:
🌐 PostgreSQL supports observability and can handle low trace volumes. Throughput capacity depends on deployment factors such as hardware, schema design, indexing, and retention policies, and should be validated for your specific environment. For high-volume production environments, consider:
- 使用
insert-only跟踪策略 来减少数据库写操作 - 设置表分区以实现高效的数据保留
- 如果需要进一步扩展,可以将可观测性迁移到 ClickHouse 通过复合存储
初始化Direct link to 初始化
🌐 Initialization
当你将存储传递给 Mastra 类时,init() 会在任何存储操作之前自动调用:
🌐 When you pass storage to the Mastra class, init() is called automatically before any storage operation:
import { Mastra } from "@mastra/core";
import { PostgresStore } from "@mastra/pg";
const storage = new PostgresStore({
id: 'pg-storage',
connectionString: process.env.DATABASE_URL,
});
const mastra = new Mastra({
storage, // init() is called automatically
});
如果你直接使用存储而不使用 Mastra,必须显式调用 init() 来创建表格:
🌐 If you're using storage directly without Mastra, you must call init() explicitly to create the tables:
import { PostgresStore } from "@mastra/pg";
const storage = new PostgresStore({
id: 'pg-storage',
connectionString: process.env.DATABASE_URL,
});
// Required when using storage directly
await storage.init();
// Access domain-specific stores via getStore()
const memoryStore = await storage.getStore('memory');
const thread = await memoryStore?.getThreadById({ threadId: "..." });
如果未调用 init(),表将不会被创建,存储操作可能会静默失败或抛出错误。
使用现有泳池Direct link to 使用现有泳池
🌐 Using an Existing Pool
如果你的应用中已经有一个 pg.Pool(例如,与 ORM 共享或用于行级安全),你可以将它直接传递给 PostgresStore:
🌐 If you already have a pg.Pool in your application (e.g., shared with an ORM or for Row Level Security), you can pass it directly to PostgresStore:
import { Pool } from "pg";
import { PostgresStore } from "@mastra/pg";
// Your existing pool (shared across your application)
const pool = new Pool({
connectionString: process.env.DATABASE_URL,
max: 20,
});
const storage = new PostgresStore({
id: "shared-storage",
pool: pool,
});
线程池生命周期行为:
- 当你提供一个连接池时:Mastra 会使用你的连接池,但在调用
store.close()时不会关闭它。你需要管理连接池的生命周期。 - 当 Mastra 创建一个资金池时:Mastra 拥有该资金池,并将在调用
store.close()时关闭它。
直接数据库和连接池访问Direct link to 直接数据库和连接池访问
🌐 Direct Database and Pool Access
PostgresStore 公开了底层数据库客户端和连接池,以用于高级用例:
store.db; // DbClient - query interface with helpers (any, one, tx, etc.)
store.pool; // pg.Pool - the underlying connection pool
使用 store.db 进行查询:
// Execute queries with helper methods
const users = await store.db.any("SELECT * FROM users WHERE active = $1", [true]);
const user = await store.db.one("SELECT * FROM users WHERE id = $1", [userId]);
const maybeUser = await store.db.oneOrNone("SELECT * FROM users WHERE email = $1", [email]);
// Use transactions
const result = await store.db.tx(async (t) => {
await t.none("INSERT INTO logs (message) VALUES ($1)", ["Started"]);
const data = await t.any("SELECT * FROM items");
return data;
});
直接使用 store.pool:
// Get a client for manual connection management
const client = await store.pool.connect();
try {
await client.query("SET LOCAL app.user_id = $1", [userId]);
const result = await client.query("SELECT * FROM protected_table");
return result.rows;
} finally {
client.release();
}
使用这些字段时:
🌐 When using these fields:
- 你负责正确的连接和事务处理。
- 关闭存储(
store.close())只有在 Mastra 创建该池的情况下才会销毁它。 - 直接访问会绕过 PostgresStore 方法提供的任何额外逻辑或验证。
这种方法适用于需要低级访问的高级场景。
🌐 This approach is intended for advanced scenarios where low-level access is required.
在 Next.js 中使用Direct link to 在 Next.js 中使用
🌐 Using with Next.js
在 Next.js 应用中使用 PostgresStore 时,开发过程中 热模块替换 (HMR) 可能会导致创建多个存储实例,从而出现以下警告:
🌐 When using PostgresStore in Next.js applications, Hot Module Replacement (HMR) during development can cause multiple storage instances to be created, resulting in this warning:
WARNING: Creating a duplicate database object for the same connection.
为防止这种情况,将 PostgresStore 实例存储在全局对象上,以便在 HMR 重载时保持存在:
🌐 To prevent this, store the PostgresStore instance on the global object so it persists across HMR reloads:
import { PostgresStore } from "@mastra/pg";
import { Memory } from "@mastra/memory";
// Extend the global type to include our instances
declare global {
var pgStore: PostgresStore | undefined;
var memory: Memory | undefined;
}
// Get or create the PostgresStore instance
function getPgStore(): PostgresStore {
if (!global.pgStore) {
if (!process.env.DATABASE_URL) {
throw new Error("DATABASE_URL is not defined in environment variables");
}
global.pgStore = new PostgresStore({
id: "pg-storage",
connectionString: process.env.DATABASE_URL,
ssl:
process.env.DATABASE_SSL === "true"
? { rejectUnauthorized: false }
: false,
});
}
return global.pgStore;
}
// Get or create the Memory instance
function getMemory(): Memory {
if (!global.memory) {
global.memory = new Memory({
storage: getPgStore(),
});
}
return global.memory;
}
export const storage = getPgStore();
export const memory = getMemory();
然后在你的 Mastra 配置中使用导出的实例:
🌐 Then use the exported instances in your Mastra configuration:
import { Mastra } from "@mastra/core/mastra";
import { storage } from "./storage";
export const mastra = new Mastra({
storage,
// ...other config
});
这个模式确保无论模块在开发过程中被重新加载多少次,只会创建一个 PostgresStore 实例。相同的模式也可以应用于其他存储提供商,如 LibSQLStore。
🌐 This pattern ensures only one PostgresStore instance is created regardless of how many times the module is reloaded during development. The same pattern can be applied to other storage providers like LibSQLStore.
这个单例模式仅在使用 HMR 的本地开发中才需要。在生产构建中,模块只会被加载一次。
使用示例Direct link to 使用示例
🌐 Usage Example
为代理添加内存Direct link to 为代理添加内存
🌐 Adding memory to an agent
要向代理添加 PostgreSQL 内存,请使用 Memory 类,并使用 PostgresStore 创建一个新的 storage 键。connectionString 可以是远程位置,也可以是本地数据库连接。
🌐 To add PostgreSQL memory to an agent use the Memory class and create a new storage key using PostgresStore. The connectionString can either be a remote location, or a local database connection.
import { Memory } from "@mastra/memory";
import { Agent } from "@mastra/core/agent";
import { PostgresStore } from "@mastra/pg";
export const pgAgent = new Agent({
id: "pg-agent",
name: "PG Agent",
instructions:
"You are an AI agent with the ability to automatically recall memories from previous interactions.",
model: "openai/gpt-5.1",
memory: new Memory({
storage: new PostgresStore({
id: 'pg-agent-storage',
connectionString: process.env.DATABASE_URL!,
}),
options: {
generateTitle: true, // Explicitly enable automatic title generation
},
}),
});
使用代理Direct link to 使用代理
🌐 Using the agent
使用 memoryOptions 来限定此请求的回忆范围。设置 lastMessages: 5 以限制基于最近性的回忆,使用 semanticRecall 获取最相关的 topK: 3 条消息,包括每个匹配消息周围的 messageRange: 2 条邻近消息以提供上下文。
🌐 Use memoryOptions to scope recall for this request. Set lastMessages: 5 to limit recency-based recall, and use semanticRecall to fetch the topK: 3 most relevant messages, including messageRange: 2 neighboring messages for context around each match.
import "dotenv/config";
import { mastra } from "./mastra";
const threadId = "123";
const resourceId = "user-456";
const agent = mastra.getAgent("pg-agent");
const message = await agent.stream("My name is Mastra", {
memory: {
thread: threadId,
resource: resourceId,
},
});
await message.textStream.pipeTo(new WritableStream());
const stream = await agent.stream("What's my name?", {
memory: {
thread: threadId,
resource: resourceId,
},
memoryOptions: {
lastMessages: 5,
semanticRecall: {
topK: 3,
messageRange: 2,
},
},
});
for await (const chunk of stream.textStream) {
process.stdout.write(chunk);
}
索引管理Direct link to 索引管理
🌐 Index Management
PostgreSQL 存储提供索引管理以优化查询性能。
🌐 PostgreSQL storage provides index management to optimize query performance.
默认索引Direct link to 默认索引
🌐 Default Indexes
PostgreSQL 存储在初始化期间为常见查询模式创建复合索引:
🌐 PostgreSQL storage creates composite indexes during initialization for common query patterns:
mastra_threads_resourceid_createdat_idx:(resourceId, 创建时间 DESC)mastra_messages_thread_id_createdat_idx:(线程ID, 创建时间 降序)mastra_ai_spans_traceid_startedat_idx:(traceId, startedAt 降序)mastra_ai_spans_parentspanid_startedat_idx:(parentSpanId, startedAt 降序)mastra_ai_spans_name_startedat_idx:(名称, 开始时间 降序)mastra_ai_spans_scope_startedat_idx:(范围, startedAt 降序)mastra_scores_trace_id_span_id_created_at_idx:(traceId, spanId, createdAt 降序)
这些索引可以提高带排序的筛选查询的性能,包括消息查询上的 dateRange 筛选。
🌐 These indexes improve performance for filtered queries with sorting, including dateRange filters on message queries.
配置索引Direct link to 配置索引
🌐 Configuring Indexes
你可以通过构造函数选项来控制索引创建:
🌐 You can control index creation via constructor options:
import { PostgresStore } from "@mastra/pg";
// Skip default indexes (manage indexes separately)
const store = new PostgresStore({
id: 'pg-storage',
connectionString: process.env.DATABASE_URL,
skipDefaultIndexes: true,
});
// Add custom indexes during initialization
const storeWithCustomIndexes = new PostgresStore({
id: 'pg-storage',
connectionString: process.env.DATABASE_URL,
indexes: [
{
name: "idx_threads_metadata_type",
table: "mastra_threads",
columns: ["metadata->>'type'"],
},
{
name: "idx_messages_status",
table: "mastra_messages",
columns: ["metadata->>'status'"],
},
],
});
对于高级索引类型,你可以指定其他选项:
🌐 For advanced index types, you can specify additional options:
unique: true用于唯一约束where: 'condition'用于部分索引method: 'brin'用于时间序列数据storage: { fillfactor: 90 }适用于高更新频率的表concurrent: true用于非阻塞创建(默认)
指数选项Direct link to 指数选项
🌐 Index Options
name:
table:
columns:
unique?:
concurrent?:
where?:
method?:
opclass?:
storage?:
tablespace?:
特定模式索引Direct link to 特定模式索引
🌐 Schema-Specific Indexes
使用自定义模式时,索引名称会加上模式名称作为前缀:
🌐 When using custom schemas, index names are prefixed with the schema name:
const storage = new PostgresStore({
id: 'pg-storage',
connectionString: process.env.DATABASE_URL,
schemaName: "custom_schema",
indexes: [
{
name: "idx_threads_status",
table: "mastra_threads",
columns: ["status"],
},
],
});
// Creates index as: custom_schema_idx_threads_status
通过 SQL 管理索引Direct link to 通过 SQL 管理索引
🌐 Managing Indexes via SQL
对于高级索引管理(列出、删除、分析),请通过 db 访问器使用直接的 SQL 查询:
🌐 For advanced index management (listing, dropping, analyzing), use direct SQL queries via the db accessor:
// List indexes for a table
const indexes = await storage.db.any(`
SELECT indexname, indexdef
FROM pg_indexes
WHERE tablename = 'mastra_messages'
`);
// Drop an index
await storage.db.none('DROP INDEX IF EXISTS idx_my_custom_index');
// Analyze index usage
const stats = await storage.db.one(`
SELECT idx_scan, idx_tup_read
FROM pg_stat_user_indexes
WHERE indexrelname = 'mastra_messages_thread_id_createdat_idx'
`);
索引类型及使用案例Direct link to 索引类型及使用案例
🌐 Index Types and Use Cases
PostgreSQL 提供针对特定场景优化的不同索引类型:
🌐 PostgreSQL offers different index types optimized for specific scenarios:
| 索引类型 | 最适合 | 存储 | 速度 |
|---|---|---|---|
| btree (默认) | 范围查询、排序、通用 | 中等 | 快 |
| hash | 仅用于等值比较 | 小 | 对 = 非常快 |
| gin | JSONB、数组、全文搜索 | 大 | 对包含查询快速 |
| gist | 几何数据、全文搜索 | 中等 | 对最近邻查询快速 |
| spgist | 非平衡数据、文本模式 | 小 | 对特定模式快速 |
| brin | 具有自然顺序的大表 | 很小 | 对范围查询快速 |