import { EventEmitter } from 'events'; import fs from 'fs/promises'; import { PGlite } from '@electric-sql/pglite'; import { Pool } from 'pg'; import { createLookupHash, decryptBuffer, decryptJson, encryptBuffer, encryptJson, packEncryptedBuffer, unpackEncryptedBuffer, } from './cryptoUtils.js'; import { hasEmbeddedPostgresDatabaseFiles, isPostgresStorageMode, POSTGRES_STORAGE_DB_DIR, POSTGRES_STORAGE_DIR, readPostgresStorageManifest, STORAGE_MODE, } from './dataPaths.js'; import { APP_SCHEMA_NAME, POSTGRES_SCHEMA_SQL } from './postgresSchema.js'; let poolInstance = null; let initPromise = null; let activeBackendType = null; const configuredClients = new WeakSet(); const patchedPools = new WeakSet(); function resolveSslConfig() { const raw = String(process.env.PGSSL || process.env.PGSSLMODE || '').trim().toLowerCase(); if (!raw || ['false', '0', 'off', 'disable'].includes(raw)) return false; return { rejectUnauthorized: false }; } function getValidatedSchemaName() { const schemaName = String(APP_SCHEMA_NAME || '').trim(); if (!/^[A-Za-z_][A-Za-z0-9_]*$/.test(schemaName)) { throw new Error(`Invalid POSTGRES_APP_SCHEMA value: ${schemaName}`); } return schemaName; } export function getPostgresSchemaName() { return getValidatedSchemaName(); } export function getCreateSchemaSql() { return `CREATE SCHEMA IF NOT EXISTS "${getValidatedSchemaName()}"`; } export function getDropSchemaSql() { return `DROP SCHEMA IF EXISTS "${getValidatedSchemaName()}" CASCADE`; } export function getSetSearchPathSql() { return `SET search_path TO "${getValidatedSchemaName()}", public`; } export function hasExternalPostgresConfig() { const connectionString = process.env.DATABASE_URL || process.env.POSTGRES_URL || ''; if (connectionString) return true; const { PGHOST, PGUSER, PGDATABASE } = process.env; return !!(PGHOST && PGUSER && PGDATABASE); } function getRequestedBackendType() { const raw = String(process.env.POSTGRES_STORAGE_BACKEND || '').trim().toLowerCase(); return raw === 'embedded' || raw === 'external' ? raw : null; } function getManifestBackendType() { const raw = String(readPostgresStorageManifest()?.backend || '').trim().toLowerCase(); return raw === 'embedded' || raw === 'external' ? raw : null; } function resolveBackendType(explicit = null) { if (explicit === 'embedded' || explicit === 'external') return explicit; const requested = getRequestedBackendType(); if (requested) return requested; const manifestBackend = getManifestBackendType(); if (manifestBackend) return manifestBackend; if (hasEmbeddedPostgresDatabaseFiles()) return 'embedded'; if (hasExternalPostgresConfig()) return 'external'; return 'embedded'; } function normalizeQueryResult(result = {}) { const rows = Array.isArray(result.rows) ? result.rows : []; const affectedRows = Number(result.affectedRows || 0); return { ...result, rows, fields: Array.isArray(result.fields) ? result.fields : [], rowCount: rows.length || affectedRows || 0, }; } class EmbeddedPostgresClient { constructor(executor) { this.executor = executor; this.embedded = true; } async query(text, params = []) { return normalizeQueryResult(await this.executor.query(text, params)); } async exec(text) { const results = await this.executor.exec(text); return Array.isArray(results) ? results.map(normalizeQueryResult) : []; } release() {} } class EmbeddedPostgresPool extends EventEmitter { constructor(db, dataDir) { super(); this.db = db; this.dataDir = dataDir; this.embedded = true; } async connect() { return new EmbeddedPostgresClient(this.db); } async query(text, params = []) { return normalizeQueryResult(await this.db.query(text, params)); } async exec(text) { const results = await this.db.exec(text); return Array.isArray(results) ? results.map(normalizeQueryResult) : []; } async transaction(fn) { return this.db.transaction(async (tx) => fn(new EmbeddedPostgresClient(tx))); } async end() { await this.db.close(); } } function resolveEmbeddedDataDir(dataDir = null) { return dataDir || POSTGRES_STORAGE_DB_DIR; } function buildExternalPoolConfig() { const connectionString = process.env.DATABASE_URL || process.env.POSTGRES_URL || ''; const ssl = resolveSslConfig(); if (connectionString) { return { connectionString, ssl }; } const { PGHOST, PGPORT, PGUSER, PGPASSWORD, PGDATABASE } = process.env; if (!PGHOST || !PGUSER || !PGDATABASE) { throw new Error( `PostgreSQL storage for ${POSTGRES_STORAGE_DIR} requires either an embedded database directory or a database connection. Set POSTGRES_STORAGE_BACKEND=embedded to use the local folder, or configure DATABASE_URL / PGHOST / PGUSER / PGDATABASE for an external server.` ); } return { host: PGHOST, port: PGPORT ? Number(PGPORT) : 5432, user: PGUSER, password: PGPASSWORD || '', database: PGDATABASE, ssl, }; } export function buildPoolConfig(options = {}) { const backend = resolveBackendType(options.backend); if (backend === 'embedded') { return { backend, dataDir: resolveEmbeddedDataDir(options.dataDir), }; } return { backend, ...buildExternalPoolConfig(), }; } function attachPoolErrorHandler(pool) { if (typeof pool.on === 'function') { pool.on('error', (err) => { console.error('PostgreSQL pool error:', err); }); } return pool; } function patchPoolClientSetup(pool) { if (pool?.embedded || patchedPools.has(pool)) return pool; const originalConnect = pool.connect.bind(pool); const originalQuery = pool.query.bind(pool); // pool.query() bypasses our per-client setup, so wrap both entry points. pool.connect = async (...args) => { const client = await originalConnect(...args); await preparePostgresClient(client); return client; }; pool.query = async (...args) => { if (typeof args[args.length - 1] === 'function') { return originalQuery(...args); } const client = await originalConnect(); try { await preparePostgresClient(client); return await client.query(...args); } finally { client.release(); } }; patchedPools.add(pool); return pool; } async function createEmbeddedPostgresPool({ dataDir } = {}) { const resolvedDataDir = resolveEmbeddedDataDir(dataDir); await fs.mkdir(resolvedDataDir, { recursive: true }); const db = await PGlite.create(resolvedDataDir); await db.query(getCreateSchemaSql()); await db.query(getSetSearchPathSql()); return attachPoolErrorHandler(new EmbeddedPostgresPool(db, resolvedDataDir)); } export function getPostgresBackendType() { return activeBackendType || resolveBackendType(); } export async function createRawPostgresPool(options = {}) { const config = buildPoolConfig(options); if (config.backend === 'embedded') { return createEmbeddedPostgresPool({ dataDir: config.dataDir }); } const pool = new Pool(config); attachPoolErrorHandler(pool); return patchPoolClientSetup(pool); } export async function preparePostgresClient(client) { if (client?.embedded || configuredClients.has(client)) return; await client.query(getCreateSchemaSql()); await client.query(getSetSearchPathSql()); configuredClients.add(client); } export async function applyPostgresSchema(pool) { const client = await pool.connect(); try { await preparePostgresClient(client); if (typeof client.exec === 'function') { await client.exec(POSTGRES_SCHEMA_SQL); } else { await client.query(POSTGRES_SCHEMA_SQL); } } finally { client.release(); } } async function createPool() { return createStandalonePostgresPool(); } export async function createStandalonePostgresPool(options = {}) { const pool = await createRawPostgresPool(options); await applyPostgresSchema(pool); activeBackendType = pool?.embedded ? 'embedded' : 'external'; return pool; } export async function initializePostgresStorage() { if (!isPostgresStorageMode()) return false; if (!initPromise) { initPromise = createPool().then((pool) => { poolInstance = pool; activeBackendType = pool?.embedded ? 'embedded' : 'external'; return pool; }); } await initPromise; return true; } export async function getPostgresPool() { if (!isPostgresStorageMode()) { throw new Error(`PostgreSQL storage is not active (mode=${STORAGE_MODE})`); } await initializePostgresStorage(); return poolInstance; } export async function pgQuery(text, params = []) { const pool = await getPostgresPool(); return pool.query(text, params); } export async function withPgClient(fn) { const pool = await getPostgresPool(); const client = await pool.connect(); try { await preparePostgresClient(client); return await fn(client); } finally { client.release(); } } export async function withPgTransaction(fn) { const pool = await getPostgresPool(); if (typeof pool.transaction === 'function') { return pool.transaction(async (client) => { await preparePostgresClient(client); return fn(client); }); } return withPgClient(async (client) => { await client.query('BEGIN'); try { const result = await fn(client); await client.query('COMMIT'); return result; } catch (err) { await client.query('ROLLBACK'); throw err; } }); } export function encryptJsonPayload(data, aad = '') { return encryptJson(data, aad); } export function decryptJsonPayload(payload, aad = '') { return decryptJson(payload, aad); } export function encryptBinaryPayload(buffer, aad = '') { return packEncryptedBuffer(encryptBuffer(Buffer.from(buffer), aad)); } export function decryptBinaryPayload(payload, aad = '') { const packed = Buffer.isBuffer(payload) ? payload : Buffer.from(payload); return decryptBuffer(unpackEncryptedBuffer(packed), aad); } export function makeOwnerLookup(owner) { return createLookupHash(`${owner?.type || 'unknown'}:${owner?.id || ''}`, 'owner'); } export function makeLookupToken(namespace, value) { return createLookupHash(String(value ?? ''), namespace); }