diff --git a/packages/adapter-postgres/src/index.ts b/packages/adapter-postgres/src/index.ts index bceeae1415..53cb240086 100644 --- a/packages/adapter-postgres/src/index.ts +++ b/packages/adapter-postgres/src/index.ts @@ -1,8 +1,8 @@ import { v4 } from "uuid"; -import postgres from "pg"; -const { Pool } = postgres; -type PoolType = typeof postgres.Pool; +// Import the entire module as default +import pg from "pg"; +type Pool = pg.Pool; import { QueryConfig, @@ -20,9 +20,9 @@ import { type UUID, type IDatabaseCacheAdapter, Participant, - DatabaseAdapter, elizaLogger, getEmbeddingConfig, + DatabaseAdapter, } from "@ai16z/eliza"; import fs from "fs"; import { fileURLToPath } from "url"; @@ -32,10 +32,10 @@ const __filename = fileURLToPath(import.meta.url); // get the resolved path to t const __dirname = path.dirname(__filename); // get the name of the directory export class PostgresDatabaseAdapter - extends DatabaseAdapter + extends DatabaseAdapter implements IDatabaseCacheAdapter { - private pool: InstanceType; + private pool: Pool; private readonly maxRetries: number = 3; private readonly baseDelay: number = 1000; // 1 second private readonly maxDelay: number = 10000; // 10 seconds @@ -43,7 +43,12 @@ export class PostgresDatabaseAdapter private readonly connectionTimeout: number = 5000; // 5 seconds constructor(connectionConfig: any) { - super(); + super({ + //circuitbreaker stuff + failureThreshold: 5, + resetTimeout: 60000, + halfOpenMaxAttempts: 3, + }); const defaultConfig = { max: 20, @@ -51,7 +56,7 @@ export class PostgresDatabaseAdapter connectionTimeoutMillis: this.connectionTimeout, }; - this.pool = new Pool({ + this.pool = new pg.Pool({ ...defaultConfig, ...connectionConfig, // Allow overriding defaults }); @@ -75,6 +80,19 @@ export class PostgresDatabaseAdapter await this.cleanup(); process.exit(0); }); + + process.on("beforeExit", async () => { + await this.cleanup(); + }); + } + + private async withDatabase( + operation: () => Promise, + context: string + ): Promise { + return this.withCircuitBreaker(async () => { + return this.withRetry(operation); + }, context); } private async withRetry(operation: () => Promise): Promise { @@ -137,7 +155,7 @@ export class PostgresDatabaseAdapter await this.pool.end(); // Create new pool - this.pool = new Pool({ + this.pool = new pg.Pool({ ...this.pool.options, connectionTimeoutMillis: this.connectionTimeout, }); @@ -159,16 +177,9 @@ export class PostgresDatabaseAdapter queryTextOrConfig: string | QueryConfig, values?: QueryConfigValues ): Promise> { - const client = await this.pool.connect(); - - try { - return client.query(queryTextOrConfig, values); - } catch (error) { - elizaLogger.error(error); - throw error; - } finally { - client.release(); - } + return this.withDatabase(async () => { + return await this.pool.query(queryTextOrConfig, values); + }, "query"); } async init() { @@ -237,17 +248,17 @@ export class PostgresDatabaseAdapter } async getRoom(roomId: UUID): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { const { rows } = await this.pool.query( "SELECT id FROM rooms WHERE id = $1", [roomId] ); return rows.length > 0 ? (rows[0].id as UUID) : null; - }); + }, "getRoom"); } async getParticipantsForAccount(userId: UUID): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { const { rows } = await this.pool.query( `SELECT id, "userId", "roomId", "last_message_read" FROM participants @@ -255,20 +266,20 @@ export class PostgresDatabaseAdapter [userId] ); return rows as Participant[]; - }); + }, "getParticipantsForAccount"); } async getParticipantUserState( roomId: UUID, userId: UUID ): Promise<"FOLLOWED" | "MUTED" | null> { - return this.withRetry(async () => { + return this.withDatabase(async () => { const { rows } = await this.pool.query( `SELECT "userState" FROM participants WHERE "roomId" = $1 AND "userId" = $2`, [roomId, userId] ); return rows.length > 0 ? rows[0].userState : null; - }); + }, "getParticipantUserState"); } async getMemoriesByRoomIds(params: { @@ -276,7 +287,7 @@ export class PostgresDatabaseAdapter agentId?: UUID; tableName: string; }): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { if (params.roomIds.length === 0) return []; const placeholders = params.roomIds .map((_, i) => `$${i + 2}`) @@ -298,7 +309,7 @@ export class PostgresDatabaseAdapter ? JSON.parse(row.content) : row.content, })); - }); + }, "getMemoriesByRoomIds"); } async setParticipantUserState( @@ -306,26 +317,26 @@ export class PostgresDatabaseAdapter userId: UUID, state: "FOLLOWED" | "MUTED" | null ): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { await this.pool.query( `UPDATE participants SET "userState" = $1 WHERE "roomId" = $2 AND "userId" = $3`, [state, roomId, userId] ); - }); + }, "setParticipantUserState"); } async getParticipantsForRoom(roomId: UUID): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { const { rows } = await this.pool.query( 'SELECT "userId" FROM participants WHERE "roomId" = $1', [roomId] ); return rows.map((row) => row.userId); - }); + }, "getParticipantsForRoom"); } async getAccountById(userId: UUID): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { const { rows } = await this.pool.query( "SELECT * FROM accounts WHERE id = $1", [userId] @@ -348,11 +359,11 @@ export class PostgresDatabaseAdapter ? JSON.parse(account.details) : account.details, }; - }); + }, "getAccountById"); } async createAccount(account: Account): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { try { const accountId = account.id ?? v4(); await this.pool.query( @@ -380,11 +391,11 @@ export class PostgresDatabaseAdapter }); return false; // Return false instead of throwing to maintain existing behavior } - }); + }, "createAccount"); } async getActorById(params: { roomId: UUID }): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { const { rows } = await this.pool.query( `SELECT a.id, a.name, a.username, a.details FROM participants p @@ -421,7 +432,7 @@ export class PostgresDatabaseAdapter }; } }); - }).catch((error) => { + }, "getActorById").catch((error) => { elizaLogger.error("Failed to get actors:", { roomId: params.roomId, error: error.message, @@ -431,7 +442,7 @@ export class PostgresDatabaseAdapter } async getMemoryById(id: UUID): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { const { rows } = await this.pool.query( "SELECT * FROM memories WHERE id = $1", [id] @@ -445,11 +456,11 @@ export class PostgresDatabaseAdapter ? JSON.parse(rows[0].content) : rows[0].content, }; - }); + }, "getMemoryById"); } async createMemory(memory: Memory, tableName: string): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { elizaLogger.debug("PostgresAdapter createMemory:", { memoryId: memory.id, embeddingLength: memory.embedding?.length, @@ -486,7 +497,7 @@ export class PostgresDatabaseAdapter Date.now(), ] ); - }); + }, "createMemory"); } async searchMemories(params: { @@ -521,7 +532,7 @@ export class PostgresDatabaseAdapter if (!params.tableName) throw new Error("tableName is required"); if (!params.roomId) throw new Error("roomId is required"); - return this.withRetry(async () => { + return this.withDatabase(async () => { // Build query let sql = `SELECT * FROM memories WHERE type = $1 AND "roomId" = $2`; const values: any[] = [params.tableName, params.roomId]; @@ -587,7 +598,7 @@ export class PostgresDatabaseAdapter ? JSON.parse(row.content) : row.content, })); - }); + }, "getMemories"); } async getGoals(params: { @@ -596,7 +607,7 @@ export class PostgresDatabaseAdapter onlyInProgress?: boolean; count?: number; }): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { let sql = `SELECT * FROM goals WHERE "roomId" = $1`; const values: any[] = [params.roomId]; let paramCount = 1; @@ -625,11 +636,11 @@ export class PostgresDatabaseAdapter ? JSON.parse(row.objectives) : row.objectives, })); - }); + }, "getGoals"); } async updateGoal(goal: Goal): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { try { await this.pool.query( `UPDATE goals SET name = $1, status = $2, objectives = $3 WHERE id = $4`, @@ -649,11 +660,11 @@ export class PostgresDatabaseAdapter }); throw error; } - }); + }, "updateGoal"); } async createGoal(goal: Goal): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { await this.pool.query( `INSERT INTO goals (id, "roomId", "userId", name, status, objectives) VALUES ($1, $2, $3, $4, $5, $6)`, @@ -666,13 +677,13 @@ export class PostgresDatabaseAdapter JSON.stringify(goal.objectives), ] ); - }); + }, "createGoal"); } async removeGoal(goalId: UUID): Promise { if (!goalId) throw new Error("Goal ID is required"); - return this.withRetry(async () => { + return this.withDatabase(async () => { try { const result = await this.pool.query( "DELETE FROM goals WHERE id = $1 RETURNING id", @@ -691,23 +702,23 @@ export class PostgresDatabaseAdapter }); throw error; } - }); + }, "removeGoal"); } async createRoom(roomId?: UUID): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { const newRoomId = roomId || v4(); await this.pool.query("INSERT INTO rooms (id) VALUES ($1)", [ newRoomId, ]); return newRoomId as UUID; - }); + }, "createRoom"); } async removeRoom(roomId: UUID): Promise { if (!roomId) throw new Error("Room ID is required"); - return this.withRetry(async () => { + return this.withDatabase(async () => { const client = await this.pool.connect(); try { await client.query("BEGIN"); @@ -759,9 +770,9 @@ export class PostgresDatabaseAdapter }); throw error; } finally { - client.release(); + if (client) client.release(); } - }); + }, "removeRoom"); } async createRelationship(params: { @@ -773,7 +784,7 @@ export class PostgresDatabaseAdapter throw new Error("userA and userB are required"); } - return this.withRetry(async () => { + return this.withDatabase(async () => { try { const relationshipId = v4(); await this.pool.query( @@ -814,7 +825,7 @@ export class PostgresDatabaseAdapter } return false; } - }); + }, "createRelationship"); } async getRelationship(params: { @@ -825,7 +836,7 @@ export class PostgresDatabaseAdapter throw new Error("userA and userB are required"); } - return this.withRetry(async () => { + return this.withDatabase(async () => { try { const { rows } = await this.pool.query( `SELECT * FROM relationships @@ -857,7 +868,7 @@ export class PostgresDatabaseAdapter }); throw error; } - }); + }, "getRelationship"); } async getRelationships(params: { userId: UUID }): Promise { @@ -865,7 +876,7 @@ export class PostgresDatabaseAdapter throw new Error("userId is required"); } - return this.withRetry(async () => { + return this.withDatabase(async () => { try { const { rows } = await this.pool.query( `SELECT * FROM relationships @@ -888,7 +899,7 @@ export class PostgresDatabaseAdapter }); throw error; } - }); + }, "getRelationships"); } async getCachedEmbeddings(opts: { @@ -910,7 +921,7 @@ export class PostgresDatabaseAdapter if (opts.query_match_count <= 0) throw new Error("query_match_count must be positive"); - return this.withRetry(async () => { + return this.withDatabase(async () => { try { elizaLogger.debug("Fetching cached embeddings:", { tableName: opts.query_table_name, @@ -996,7 +1007,7 @@ export class PostgresDatabaseAdapter }); throw error; } - }); + }, "getCachedEmbeddings"); } async log(params: { @@ -1013,7 +1024,7 @@ export class PostgresDatabaseAdapter throw new Error("body must be a valid object"); } - return this.withRetry(async () => { + return this.withDatabase(async () => { try { const logId = v4(); // Generate ID for tracking await this.pool.query( @@ -1052,7 +1063,7 @@ export class PostgresDatabaseAdapter }); throw error; } - }); + }, "log"); } async searchMemoriesByEmbedding( @@ -1066,7 +1077,7 @@ export class PostgresDatabaseAdapter tableName: string; } ): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { elizaLogger.debug("Incoming vector:", { length: embedding.length, sample: embedding.slice(0, 5), @@ -1154,11 +1165,11 @@ export class PostgresDatabaseAdapter : row.content, similarity: row.similarity, })); - }); + }, "searchMemoriesByEmbedding"); } async addParticipant(userId: UUID, roomId: UUID): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { try { await this.pool.query( `INSERT INTO participants (id, "userId", "roomId") @@ -1170,11 +1181,11 @@ export class PostgresDatabaseAdapter console.log("Error adding participant", error); return false; } - }); + }, "addParticpant"); } async removeParticipant(userId: UUID, roomId: UUID): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { try { await this.pool.query( `DELETE FROM participants WHERE "userId" = $1 AND "roomId" = $2`, @@ -1185,37 +1196,37 @@ export class PostgresDatabaseAdapter console.log("Error removing participant", error); return false; } - }); + }, "removeParticipant"); } async updateGoalStatus(params: { goalId: UUID; status: GoalStatus; }): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { await this.pool.query( "UPDATE goals SET status = $1 WHERE id = $2", [params.status, params.goalId] ); - }); + }, "updateGoalStatus"); } async removeMemory(memoryId: UUID, tableName: string): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { await this.pool.query( "DELETE FROM memories WHERE type = $1 AND id = $2", [tableName, memoryId] ); - }); + }, "removeMemory"); } async removeAllMemories(roomId: UUID, tableName: string): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { await this.pool.query( `DELETE FROM memories WHERE type = $1 AND "roomId" = $2`, [tableName, roomId] ); - }); + }, "removeAllMemories"); } async countMemories( @@ -1225,7 +1236,7 @@ export class PostgresDatabaseAdapter ): Promise { if (!tableName) throw new Error("tableName is required"); - return this.withRetry(async () => { + return this.withDatabase(async () => { let sql = `SELECT COUNT(*) as count FROM memories WHERE type = $1 AND "roomId" = $2`; if (unique) { sql += ` AND "unique" = true`; @@ -1233,36 +1244,36 @@ export class PostgresDatabaseAdapter const { rows } = await this.pool.query(sql, [tableName, roomId]); return parseInt(rows[0].count); - }); + }, "countMemories"); } async removeAllGoals(roomId: UUID): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { await this.pool.query(`DELETE FROM goals WHERE "roomId" = $1`, [ roomId, ]); - }); + }, "removeAllGoals"); } async getRoomsForParticipant(userId: UUID): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { const { rows } = await this.pool.query( `SELECT "roomId" FROM participants WHERE "userId" = $1`, [userId] ); return rows.map((row) => row.roomId); - }); + }, "getRoomsForParticipant"); } async getRoomsForParticipants(userIds: UUID[]): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { const placeholders = userIds.map((_, i) => `$${i + 1}`).join(", "); const { rows } = await this.pool.query( `SELECT DISTINCT "roomId" FROM participants WHERE "userId" IN (${placeholders})`, userIds ); return rows.map((row) => row.roomId); - }); + }, "getRoomsForParticipants"); } async getActorDetails(params: { roomId: string }): Promise { @@ -1270,7 +1281,7 @@ export class PostgresDatabaseAdapter throw new Error("roomId is required"); } - return this.withRetry(async () => { + return this.withDatabase(async () => { try { const sql = ` SELECT @@ -1327,14 +1338,14 @@ export class PostgresDatabaseAdapter `Failed to fetch actor details: ${error instanceof Error ? error.message : String(error)}` ); } - }); + }, "getActorDetails"); } async getCache(params: { key: string; agentId: UUID; }): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { try { const sql = `SELECT "value"::TEXT FROM cache WHERE "key" = $1 AND "agentId" = $2`; const { rows } = await this.query<{ value: string }>(sql, [ @@ -1351,7 +1362,7 @@ export class PostgresDatabaseAdapter }); return undefined; } - }); + }, "getCache"); } async setCache(params: { @@ -1359,7 +1370,7 @@ export class PostgresDatabaseAdapter agentId: UUID; value: string; }): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { try { const client = await this.pool.connect(); try { @@ -1385,7 +1396,7 @@ export class PostgresDatabaseAdapter }); return false; } finally { - client.release(); + if (client) client.release(); } } catch (error) { elizaLogger.error( @@ -1394,14 +1405,14 @@ export class PostgresDatabaseAdapter ); return false; } - }); + }, "setCache"); } async deleteCache(params: { key: string; agentId: UUID; }): Promise { - return this.withRetry(async () => { + return this.withDatabase(async () => { try { const client = await this.pool.connect(); try { @@ -1433,7 +1444,7 @@ export class PostgresDatabaseAdapter ); return false; } - }); + }, "deleteCache"); } } diff --git a/packages/core/src/database.ts b/packages/core/src/database.ts index 01c80b91dc..9e8cbfa1b5 100644 --- a/packages/core/src/database.ts +++ b/packages/core/src/database.ts @@ -9,6 +9,8 @@ import { Participant, IDatabaseAdapter, } from "./types.ts"; +import { CircuitBreaker } from "./database/CircuitBreaker"; +import { elizaLogger } from "./logger"; /** * An abstract class representing a database adapter for managing various entities @@ -20,6 +22,35 @@ export abstract class DatabaseAdapter implements IDatabaseAdapter { */ db: DB; + /** + * Circuit breaker instance used to handle fault tolerance and prevent cascading failures. + * Implements the Circuit Breaker pattern to temporarily disable operations when a failure threshold is reached. + * + * The circuit breaker has three states: + * - CLOSED: Normal operation, requests pass through + * - OPEN: Failure threshold exceeded, requests are blocked + * - HALF_OPEN: Testing if service has recovered + * + * @protected + */ + protected circuitBreaker: CircuitBreaker; + + /** + * Creates a new DatabaseAdapter instance with optional circuit breaker configuration. + * + * @param circuitBreakerConfig - Configuration options for the circuit breaker + * @param circuitBreakerConfig.failureThreshold - Number of failures before circuit opens (defaults to 5) + * @param circuitBreakerConfig.resetTimeout - Time in ms before attempting to close circuit (defaults to 60000) + * @param circuitBreakerConfig.halfOpenMaxAttempts - Number of successful attempts needed to close circuit (defaults to 3) + */ + constructor(circuitBreakerConfig?: { + failureThreshold?: number; + resetTimeout?: number; + halfOpenMaxAttempts?: number; + }) { + this.circuitBreaker = new CircuitBreaker(circuitBreakerConfig); + } + /** * Optional initialization method for the database adapter. * @returns A Promise that resolves when initialization is complete. @@ -348,4 +379,27 @@ export abstract class DatabaseAdapter implements IDatabaseAdapter { abstract getRelationships(params: { userId: UUID; }): Promise; + + /** + * Executes an operation with circuit breaker protection. + * @param operation A function that returns a Promise to be executed with circuit breaker protection + * @param context A string describing the context/operation being performed for logging purposes + * @returns A Promise that resolves to the result of the operation + * @throws Will throw an error if the circuit breaker is open or if the operation fails + * @protected + */ + protected async withCircuitBreaker( + operation: () => Promise, + context: string + ): Promise { + try { + return await this.circuitBreaker.execute(operation); + } catch (error) { + elizaLogger.error(`Circuit breaker error in ${context}:`, { + error: error instanceof Error ? error.message : String(error), + state: this.circuitBreaker.getState(), + }); + throw error; + } + } } diff --git a/packages/core/src/database/CircuitBreaker.ts b/packages/core/src/database/CircuitBreaker.ts new file mode 100644 index 0000000000..298ef11774 --- /dev/null +++ b/packages/core/src/database/CircuitBreaker.ts @@ -0,0 +1,70 @@ +export type CircuitBreakerState = "CLOSED" | "OPEN" | "HALF_OPEN"; + +export class CircuitBreaker { + private state: CircuitBreakerState = "CLOSED"; + private failureCount: number = 0; + private lastFailureTime?: number; + private halfOpenSuccesses: number = 0; + + private readonly failureThreshold: number; + private readonly resetTimeout: number; + private readonly halfOpenMaxAttempts: number; + + constructor( + private readonly config: { + failureThreshold?: number; + resetTimeout?: number; + halfOpenMaxAttempts?: number; + } = {} + ) { + this.failureThreshold = config.failureThreshold ?? 5; + this.resetTimeout = config.resetTimeout ?? 60000; + this.halfOpenMaxAttempts = config.halfOpenMaxAttempts ?? 3; + } + + async execute(operation: () => Promise): Promise { + if (this.state === "OPEN") { + if (Date.now() - (this.lastFailureTime || 0) > this.resetTimeout) { + this.state = "HALF_OPEN"; + this.halfOpenSuccesses = 0; + } else { + throw new Error("Circuit breaker is OPEN"); + } + } + + try { + const result = await operation(); + + if (this.state === "HALF_OPEN") { + this.halfOpenSuccesses++; + if (this.halfOpenSuccesses >= this.halfOpenMaxAttempts) { + this.reset(); + } + } + + return result; + } catch (error) { + this.handleFailure(); + throw error; + } + } + + private handleFailure(): void { + this.failureCount++; + this.lastFailureTime = Date.now(); + + if (this.failureCount >= this.failureThreshold) { + this.state = "OPEN"; + } + } + + private reset(): void { + this.state = "CLOSED"; + this.failureCount = 0; + this.lastFailureTime = undefined; + } + + getState(): "CLOSED" | "OPEN" | "HALF_OPEN" { + return this.state; + } +}