Spaces:
Runtime error
Runtime error
| import { EventEmitter } from 'events'; | |
| import fs from 'fs/promises'; | |
| import { PGlite } from '@electric-sql/pglite'; | |
| import { Pool } from 'pg'; | |
| import { | |
| createLookupHash, | |
| decryptBuffer, | |
| decryptJson, | |
| encryptBuffer, | |
| encryptJson, | |
| packEncryptedBuffer, | |
| unpackEncryptedBuffer, | |
| } from './cryptoUtils.js'; | |
| import { | |
| hasEmbeddedPostgresDatabaseFiles, | |
| isPostgresStorageMode, | |
| POSTGRES_STORAGE_DB_DIR, | |
| POSTGRES_STORAGE_DIR, | |
| readPostgresStorageManifest, | |
| STORAGE_MODE, | |
| } from './dataPaths.js'; | |
| import { APP_SCHEMA_NAME, POSTGRES_SCHEMA_SQL } from './postgresSchema.js'; | |
| let poolInstance = null; | |
| let initPromise = null; | |
| let activeBackendType = null; | |
| const configuredClients = new WeakSet(); | |
| const patchedPools = new WeakSet(); | |
| function resolveSslConfig() { | |
| const raw = String(process.env.PGSSL || process.env.PGSSLMODE || '').trim().toLowerCase(); | |
| if (!raw || ['false', '0', 'off', 'disable'].includes(raw)) return false; | |
| return { rejectUnauthorized: false }; | |
| } | |
| function getValidatedSchemaName() { | |
| const schemaName = String(APP_SCHEMA_NAME || '').trim(); | |
| if (!/^[A-Za-z_][A-Za-z0-9_]*$/.test(schemaName)) { | |
| throw new Error(`Invalid POSTGRES_APP_SCHEMA value: ${schemaName}`); | |
| } | |
| return schemaName; | |
| } | |
| export function getPostgresSchemaName() { | |
| return getValidatedSchemaName(); | |
| } | |
| export function getCreateSchemaSql() { | |
| return `CREATE SCHEMA IF NOT EXISTS "${getValidatedSchemaName()}"`; | |
| } | |
| export function getDropSchemaSql() { | |
| return `DROP SCHEMA IF EXISTS "${getValidatedSchemaName()}" CASCADE`; | |
| } | |
| export function getSetSearchPathSql() { | |
| return `SET search_path TO "${getValidatedSchemaName()}", public`; | |
| } | |
| export function hasExternalPostgresConfig() { | |
| const connectionString = process.env.DATABASE_URL || process.env.POSTGRES_URL || ''; | |
| if (connectionString) return true; | |
| const { PGHOST, PGUSER, PGDATABASE } = process.env; | |
| return !!(PGHOST && PGUSER && PGDATABASE); | |
| } | |
| function getRequestedBackendType() { | |
| const raw = String(process.env.POSTGRES_STORAGE_BACKEND || '').trim().toLowerCase(); | |
| return raw === 'embedded' || raw === 'external' ? raw : null; | |
| } | |
| function getManifestBackendType() { | |
| const raw = String(readPostgresStorageManifest()?.backend || '').trim().toLowerCase(); | |
| return raw === 'embedded' || raw === 'external' ? raw : null; | |
| } | |
| function resolveBackendType(explicit = null) { | |
| if (explicit === 'embedded' || explicit === 'external') return explicit; | |
| const requested = getRequestedBackendType(); | |
| if (requested) return requested; | |
| const manifestBackend = getManifestBackendType(); | |
| if (manifestBackend) return manifestBackend; | |
| if (hasEmbeddedPostgresDatabaseFiles()) return 'embedded'; | |
| if (hasExternalPostgresConfig()) return 'external'; | |
| return 'embedded'; | |
| } | |
| function normalizeQueryResult(result = {}) { | |
| const rows = Array.isArray(result.rows) ? result.rows : []; | |
| const affectedRows = Number(result.affectedRows || 0); | |
| return { | |
| ...result, | |
| rows, | |
| fields: Array.isArray(result.fields) ? result.fields : [], | |
| rowCount: rows.length || affectedRows || 0, | |
| }; | |
| } | |
| class EmbeddedPostgresClient { | |
| constructor(executor) { | |
| this.executor = executor; | |
| this.embedded = true; | |
| } | |
| async query(text, params = []) { | |
| return normalizeQueryResult(await this.executor.query(text, params)); | |
| } | |
| async exec(text) { | |
| const results = await this.executor.exec(text); | |
| return Array.isArray(results) ? results.map(normalizeQueryResult) : []; | |
| } | |
| release() {} | |
| } | |
| class EmbeddedPostgresPool extends EventEmitter { | |
| constructor(db, dataDir) { | |
| super(); | |
| this.db = db; | |
| this.dataDir = dataDir; | |
| this.embedded = true; | |
| } | |
| async connect() { | |
| return new EmbeddedPostgresClient(this.db); | |
| } | |
| async query(text, params = []) { | |
| return normalizeQueryResult(await this.db.query(text, params)); | |
| } | |
| async exec(text) { | |
| const results = await this.db.exec(text); | |
| return Array.isArray(results) ? results.map(normalizeQueryResult) : []; | |
| } | |
| async transaction(fn) { | |
| return this.db.transaction(async (tx) => fn(new EmbeddedPostgresClient(tx))); | |
| } | |
| async end() { | |
| await this.db.close(); | |
| } | |
| } | |
| function resolveEmbeddedDataDir(dataDir = null) { | |
| return dataDir || POSTGRES_STORAGE_DB_DIR; | |
| } | |
| function buildExternalPoolConfig() { | |
| const connectionString = process.env.DATABASE_URL || process.env.POSTGRES_URL || ''; | |
| const ssl = resolveSslConfig(); | |
| if (connectionString) { | |
| return { connectionString, ssl }; | |
| } | |
| const { PGHOST, PGPORT, PGUSER, PGPASSWORD, PGDATABASE } = process.env; | |
| if (!PGHOST || !PGUSER || !PGDATABASE) { | |
| throw new Error( | |
| `PostgreSQL storage for ${POSTGRES_STORAGE_DIR} requires either an embedded database directory or a database connection. Set POSTGRES_STORAGE_BACKEND=embedded to use the local folder, or configure DATABASE_URL / PGHOST / PGUSER / PGDATABASE for an external server.` | |
| ); | |
| } | |
| return { | |
| host: PGHOST, | |
| port: PGPORT ? Number(PGPORT) : 5432, | |
| user: PGUSER, | |
| password: PGPASSWORD || '', | |
| database: PGDATABASE, | |
| ssl, | |
| }; | |
| } | |
| export function buildPoolConfig(options = {}) { | |
| const backend = resolveBackendType(options.backend); | |
| if (backend === 'embedded') { | |
| return { | |
| backend, | |
| dataDir: resolveEmbeddedDataDir(options.dataDir), | |
| }; | |
| } | |
| return { | |
| backend, | |
| ...buildExternalPoolConfig(), | |
| }; | |
| } | |
| function attachPoolErrorHandler(pool) { | |
| if (typeof pool.on === 'function') { | |
| pool.on('error', (err) => { | |
| console.error('PostgreSQL pool error:', err); | |
| }); | |
| } | |
| return pool; | |
| } | |
| function patchPoolClientSetup(pool) { | |
| if (pool?.embedded || patchedPools.has(pool)) return pool; | |
| const originalConnect = pool.connect.bind(pool); | |
| const originalQuery = pool.query.bind(pool); | |
| // pool.query() bypasses our per-client setup, so wrap both entry points. | |
| pool.connect = async (...args) => { | |
| const client = await originalConnect(...args); | |
| await preparePostgresClient(client); | |
| return client; | |
| }; | |
| pool.query = async (...args) => { | |
| if (typeof args[args.length - 1] === 'function') { | |
| return originalQuery(...args); | |
| } | |
| const client = await originalConnect(); | |
| try { | |
| await preparePostgresClient(client); | |
| return await client.query(...args); | |
| } finally { | |
| client.release(); | |
| } | |
| }; | |
| patchedPools.add(pool); | |
| return pool; | |
| } | |
| async function createEmbeddedPostgresPool({ dataDir } = {}) { | |
| const resolvedDataDir = resolveEmbeddedDataDir(dataDir); | |
| await fs.mkdir(resolvedDataDir, { recursive: true }); | |
| const db = await PGlite.create(resolvedDataDir); | |
| await db.query(getCreateSchemaSql()); | |
| await db.query(getSetSearchPathSql()); | |
| return attachPoolErrorHandler(new EmbeddedPostgresPool(db, resolvedDataDir)); | |
| } | |
| export function getPostgresBackendType() { | |
| return activeBackendType || resolveBackendType(); | |
| } | |
| export async function createRawPostgresPool(options = {}) { | |
| const config = buildPoolConfig(options); | |
| if (config.backend === 'embedded') { | |
| return createEmbeddedPostgresPool({ dataDir: config.dataDir }); | |
| } | |
| const pool = new Pool(config); | |
| attachPoolErrorHandler(pool); | |
| return patchPoolClientSetup(pool); | |
| } | |
| export async function preparePostgresClient(client) { | |
| if (client?.embedded || configuredClients.has(client)) return; | |
| await client.query(getCreateSchemaSql()); | |
| await client.query(getSetSearchPathSql()); | |
| configuredClients.add(client); | |
| } | |
| export async function applyPostgresSchema(pool) { | |
| const client = await pool.connect(); | |
| try { | |
| await preparePostgresClient(client); | |
| if (typeof client.exec === 'function') { | |
| await client.exec(POSTGRES_SCHEMA_SQL); | |
| } else { | |
| await client.query(POSTGRES_SCHEMA_SQL); | |
| } | |
| } finally { | |
| client.release(); | |
| } | |
| } | |
| async function createPool() { | |
| return createStandalonePostgresPool(); | |
| } | |
| export async function createStandalonePostgresPool(options = {}) { | |
| const pool = await createRawPostgresPool(options); | |
| await applyPostgresSchema(pool); | |
| activeBackendType = pool?.embedded ? 'embedded' : 'external'; | |
| return pool; | |
| } | |
| export async function initializePostgresStorage() { | |
| if (!isPostgresStorageMode()) return false; | |
| if (!initPromise) { | |
| initPromise = createPool().then((pool) => { | |
| poolInstance = pool; | |
| activeBackendType = pool?.embedded ? 'embedded' : 'external'; | |
| return pool; | |
| }); | |
| } | |
| await initPromise; | |
| return true; | |
| } | |
| export async function getPostgresPool() { | |
| if (!isPostgresStorageMode()) { | |
| throw new Error(`PostgreSQL storage is not active (mode=${STORAGE_MODE})`); | |
| } | |
| await initializePostgresStorage(); | |
| return poolInstance; | |
| } | |
| export async function pgQuery(text, params = []) { | |
| const pool = await getPostgresPool(); | |
| return pool.query(text, params); | |
| } | |
| export async function withPgClient(fn) { | |
| const pool = await getPostgresPool(); | |
| const client = await pool.connect(); | |
| try { | |
| await preparePostgresClient(client); | |
| return await fn(client); | |
| } finally { | |
| client.release(); | |
| } | |
| } | |
| export async function withPgTransaction(fn) { | |
| const pool = await getPostgresPool(); | |
| if (typeof pool.transaction === 'function') { | |
| return pool.transaction(async (client) => { | |
| await preparePostgresClient(client); | |
| return fn(client); | |
| }); | |
| } | |
| return withPgClient(async (client) => { | |
| await client.query('BEGIN'); | |
| try { | |
| const result = await fn(client); | |
| await client.query('COMMIT'); | |
| return result; | |
| } catch (err) { | |
| await client.query('ROLLBACK'); | |
| throw err; | |
| } | |
| }); | |
| } | |
| export function encryptJsonPayload(data, aad = '') { | |
| return encryptJson(data, aad); | |
| } | |
| export function decryptJsonPayload(payload, aad = '') { | |
| return decryptJson(payload, aad); | |
| } | |
| export function encryptBinaryPayload(buffer, aad = '') { | |
| return packEncryptedBuffer(encryptBuffer(Buffer.from(buffer), aad)); | |
| } | |
| export function decryptBinaryPayload(payload, aad = '') { | |
| const packed = Buffer.isBuffer(payload) ? payload : Buffer.from(payload); | |
| return decryptBuffer(unpackEncryptedBuffer(packed), aad); | |
| } | |
| export function makeOwnerLookup(owner) { | |
| return createLookupHash(`${owner?.type || 'unknown'}:${owner?.id || ''}`, 'owner'); | |
| } | |
| export function makeLookupToken(namespace, value) { | |
| return createLookupHash(String(value ?? ''), namespace); | |
| } | |