chat-dev / server /postgres.js
incognitolm
step 5
b609e05
import { EventEmitter } from 'events';
import fs from 'fs/promises';
import { PGlite } from '@electric-sql/pglite';
import { Pool } from 'pg';
import {
createLookupHash,
decryptBuffer,
decryptJson,
encryptBuffer,
encryptJson,
packEncryptedBuffer,
unpackEncryptedBuffer,
} from './cryptoUtils.js';
import {
hasEmbeddedPostgresDatabaseFiles,
isPostgresStorageMode,
POSTGRES_STORAGE_DB_DIR,
POSTGRES_STORAGE_DIR,
readPostgresStorageManifest,
STORAGE_MODE,
} from './dataPaths.js';
import { APP_SCHEMA_NAME, POSTGRES_SCHEMA_SQL } from './postgresSchema.js';
let poolInstance = null;
let initPromise = null;
let activeBackendType = null;
const configuredClients = new WeakSet();
const patchedPools = new WeakSet();
function resolveSslConfig() {
const raw = String(process.env.PGSSL || process.env.PGSSLMODE || '').trim().toLowerCase();
if (!raw || ['false', '0', 'off', 'disable'].includes(raw)) return false;
return { rejectUnauthorized: false };
}
function getValidatedSchemaName() {
const schemaName = String(APP_SCHEMA_NAME || '').trim();
if (!/^[A-Za-z_][A-Za-z0-9_]*$/.test(schemaName)) {
throw new Error(`Invalid POSTGRES_APP_SCHEMA value: ${schemaName}`);
}
return schemaName;
}
export function getPostgresSchemaName() {
return getValidatedSchemaName();
}
export function getCreateSchemaSql() {
return `CREATE SCHEMA IF NOT EXISTS "${getValidatedSchemaName()}"`;
}
export function getDropSchemaSql() {
return `DROP SCHEMA IF EXISTS "${getValidatedSchemaName()}" CASCADE`;
}
export function getSetSearchPathSql() {
return `SET search_path TO "${getValidatedSchemaName()}", public`;
}
export function hasExternalPostgresConfig() {
const connectionString = process.env.DATABASE_URL || process.env.POSTGRES_URL || '';
if (connectionString) return true;
const { PGHOST, PGUSER, PGDATABASE } = process.env;
return !!(PGHOST && PGUSER && PGDATABASE);
}
function getRequestedBackendType() {
const raw = String(process.env.POSTGRES_STORAGE_BACKEND || '').trim().toLowerCase();
return raw === 'embedded' || raw === 'external' ? raw : null;
}
function getManifestBackendType() {
const raw = String(readPostgresStorageManifest()?.backend || '').trim().toLowerCase();
return raw === 'embedded' || raw === 'external' ? raw : null;
}
function resolveBackendType(explicit = null) {
if (explicit === 'embedded' || explicit === 'external') return explicit;
const requested = getRequestedBackendType();
if (requested) return requested;
const manifestBackend = getManifestBackendType();
if (manifestBackend) return manifestBackend;
if (hasEmbeddedPostgresDatabaseFiles()) return 'embedded';
if (hasExternalPostgresConfig()) return 'external';
return 'embedded';
}
function normalizeQueryResult(result = {}) {
const rows = Array.isArray(result.rows) ? result.rows : [];
const affectedRows = Number(result.affectedRows || 0);
return {
...result,
rows,
fields: Array.isArray(result.fields) ? result.fields : [],
rowCount: rows.length || affectedRows || 0,
};
}
class EmbeddedPostgresClient {
constructor(executor) {
this.executor = executor;
this.embedded = true;
}
async query(text, params = []) {
return normalizeQueryResult(await this.executor.query(text, params));
}
async exec(text) {
const results = await this.executor.exec(text);
return Array.isArray(results) ? results.map(normalizeQueryResult) : [];
}
release() {}
}
class EmbeddedPostgresPool extends EventEmitter {
constructor(db, dataDir) {
super();
this.db = db;
this.dataDir = dataDir;
this.embedded = true;
}
async connect() {
return new EmbeddedPostgresClient(this.db);
}
async query(text, params = []) {
return normalizeQueryResult(await this.db.query(text, params));
}
async exec(text) {
const results = await this.db.exec(text);
return Array.isArray(results) ? results.map(normalizeQueryResult) : [];
}
async transaction(fn) {
return this.db.transaction(async (tx) => fn(new EmbeddedPostgresClient(tx)));
}
async end() {
await this.db.close();
}
}
function resolveEmbeddedDataDir(dataDir = null) {
return dataDir || POSTGRES_STORAGE_DB_DIR;
}
function buildExternalPoolConfig() {
const connectionString = process.env.DATABASE_URL || process.env.POSTGRES_URL || '';
const ssl = resolveSslConfig();
if (connectionString) {
return { connectionString, ssl };
}
const { PGHOST, PGPORT, PGUSER, PGPASSWORD, PGDATABASE } = process.env;
if (!PGHOST || !PGUSER || !PGDATABASE) {
throw new Error(
`PostgreSQL storage for ${POSTGRES_STORAGE_DIR} requires either an embedded database directory or a database connection. Set POSTGRES_STORAGE_BACKEND=embedded to use the local folder, or configure DATABASE_URL / PGHOST / PGUSER / PGDATABASE for an external server.`
);
}
return {
host: PGHOST,
port: PGPORT ? Number(PGPORT) : 5432,
user: PGUSER,
password: PGPASSWORD || '',
database: PGDATABASE,
ssl,
};
}
export function buildPoolConfig(options = {}) {
const backend = resolveBackendType(options.backend);
if (backend === 'embedded') {
return {
backend,
dataDir: resolveEmbeddedDataDir(options.dataDir),
};
}
return {
backend,
...buildExternalPoolConfig(),
};
}
function attachPoolErrorHandler(pool) {
if (typeof pool.on === 'function') {
pool.on('error', (err) => {
console.error('PostgreSQL pool error:', err);
});
}
return pool;
}
function patchPoolClientSetup(pool) {
if (pool?.embedded || patchedPools.has(pool)) return pool;
const originalConnect = pool.connect.bind(pool);
const originalQuery = pool.query.bind(pool);
// pool.query() bypasses our per-client setup, so wrap both entry points.
pool.connect = async (...args) => {
const client = await originalConnect(...args);
await preparePostgresClient(client);
return client;
};
pool.query = async (...args) => {
if (typeof args[args.length - 1] === 'function') {
return originalQuery(...args);
}
const client = await originalConnect();
try {
await preparePostgresClient(client);
return await client.query(...args);
} finally {
client.release();
}
};
patchedPools.add(pool);
return pool;
}
async function createEmbeddedPostgresPool({ dataDir } = {}) {
const resolvedDataDir = resolveEmbeddedDataDir(dataDir);
await fs.mkdir(resolvedDataDir, { recursive: true });
const db = await PGlite.create(resolvedDataDir);
await db.query(getCreateSchemaSql());
await db.query(getSetSearchPathSql());
return attachPoolErrorHandler(new EmbeddedPostgresPool(db, resolvedDataDir));
}
export function getPostgresBackendType() {
return activeBackendType || resolveBackendType();
}
export async function createRawPostgresPool(options = {}) {
const config = buildPoolConfig(options);
if (config.backend === 'embedded') {
return createEmbeddedPostgresPool({ dataDir: config.dataDir });
}
const pool = new Pool(config);
attachPoolErrorHandler(pool);
return patchPoolClientSetup(pool);
}
export async function preparePostgresClient(client) {
if (client?.embedded || configuredClients.has(client)) return;
await client.query(getCreateSchemaSql());
await client.query(getSetSearchPathSql());
configuredClients.add(client);
}
export async function applyPostgresSchema(pool) {
const client = await pool.connect();
try {
await preparePostgresClient(client);
if (typeof client.exec === 'function') {
await client.exec(POSTGRES_SCHEMA_SQL);
} else {
await client.query(POSTGRES_SCHEMA_SQL);
}
} finally {
client.release();
}
}
async function createPool() {
return createStandalonePostgresPool();
}
export async function createStandalonePostgresPool(options = {}) {
const pool = await createRawPostgresPool(options);
await applyPostgresSchema(pool);
activeBackendType = pool?.embedded ? 'embedded' : 'external';
return pool;
}
export async function initializePostgresStorage() {
if (!isPostgresStorageMode()) return false;
if (!initPromise) {
initPromise = createPool().then((pool) => {
poolInstance = pool;
activeBackendType = pool?.embedded ? 'embedded' : 'external';
return pool;
});
}
await initPromise;
return true;
}
export async function getPostgresPool() {
if (!isPostgresStorageMode()) {
throw new Error(`PostgreSQL storage is not active (mode=${STORAGE_MODE})`);
}
await initializePostgresStorage();
return poolInstance;
}
export async function pgQuery(text, params = []) {
const pool = await getPostgresPool();
return pool.query(text, params);
}
export async function withPgClient(fn) {
const pool = await getPostgresPool();
const client = await pool.connect();
try {
await preparePostgresClient(client);
return await fn(client);
} finally {
client.release();
}
}
export async function withPgTransaction(fn) {
const pool = await getPostgresPool();
if (typeof pool.transaction === 'function') {
return pool.transaction(async (client) => {
await preparePostgresClient(client);
return fn(client);
});
}
return withPgClient(async (client) => {
await client.query('BEGIN');
try {
const result = await fn(client);
await client.query('COMMIT');
return result;
} catch (err) {
await client.query('ROLLBACK');
throw err;
}
});
}
export function encryptJsonPayload(data, aad = '') {
return encryptJson(data, aad);
}
export function decryptJsonPayload(payload, aad = '') {
return decryptJson(payload, aad);
}
export function encryptBinaryPayload(buffer, aad = '') {
return packEncryptedBuffer(encryptBuffer(Buffer.from(buffer), aad));
}
export function decryptBinaryPayload(payload, aad = '') {
const packed = Buffer.isBuffer(payload) ? payload : Buffer.from(payload);
return decryptBuffer(unpackEncryptedBuffer(packed), aad);
}
export function makeOwnerLookup(owner) {
return createLookupHash(`${owner?.type || 'unknown'}:${owner?.id || ''}`, 'owner');
}
export function makeLookupToken(namespace, value) {
return createLookupHash(String(value ?? ''), namespace);
}