Spaces:
Paused
Paused
| import { createWriteStream, existsSync, mkdirSync, readdirSync, statSync, unlinkSync } from "node:fs"; | |
| import { readFile } from "node:fs/promises"; | |
| import { basename, resolve } from "node:path"; | |
| import postgres from "postgres"; | |
| export type RunDatabaseBackupOptions = { | |
| connectionString: string; | |
| backupDir: string; | |
| retentionDays: number; | |
| filenamePrefix?: string; | |
| connectTimeoutSeconds?: number; | |
| includeMigrationJournal?: boolean; | |
| excludeTables?: string[]; | |
| nullifyColumns?: Record<string, string[]>; | |
| }; | |
| export type RunDatabaseBackupResult = { | |
| backupFile: string; | |
| sizeBytes: number; | |
| prunedCount: number; | |
| }; | |
| export type RunDatabaseRestoreOptions = { | |
| connectionString: string; | |
| backupFile: string; | |
| connectTimeoutSeconds?: number; | |
| }; | |
| type SequenceDefinition = { | |
| sequence_schema: string; | |
| sequence_name: string; | |
| data_type: string; | |
| start_value: string; | |
| minimum_value: string; | |
| maximum_value: string; | |
| increment: string; | |
| cycle_option: "YES" | "NO"; | |
| owner_schema: string | null; | |
| owner_table: string | null; | |
| owner_column: string | null; | |
| }; | |
| type TableDefinition = { | |
| schema_name: string; | |
| tablename: string; | |
| }; | |
| const DRIZZLE_SCHEMA = "drizzle"; | |
| const DRIZZLE_MIGRATIONS_TABLE = "__drizzle_migrations"; | |
| const DEFAULT_BACKUP_WRITE_BUFFER_BYTES = 1024 * 1024; | |
| const STATEMENT_BREAKPOINT = "-- paperclip statement breakpoint 69f6f3f1-42fd-46a6-bf17-d1d85f8f3900"; | |
| function sanitizeRestoreErrorMessage(error: unknown): string { | |
| if (error && typeof error === "object") { | |
| const record = error as Record<string, unknown>; | |
| const firstLine = typeof record.message === "string" | |
| ? record.message.split(/\r?\n/, 1)[0]?.trim() | |
| : ""; | |
| const detail = typeof record.detail === "string" ? record.detail.trim() : ""; | |
| const severity = typeof record.severity === "string" ? record.severity.trim() : ""; | |
| const message = firstLine || detail || (error instanceof Error ? error.message : String(error)); | |
| return severity ? `${severity}: ${message}` : message; | |
| } | |
| return error instanceof Error ? error.message : String(error); | |
| } | |
| function timestamp(date: Date = new Date()): string { | |
| const pad = (n: number) => String(n).padStart(2, "0"); | |
| return `${date.getFullYear()}${pad(date.getMonth() + 1)}${pad(date.getDate())}-${pad(date.getHours())}${pad(date.getMinutes())}${pad(date.getSeconds())}`; | |
| } | |
| function pruneOldBackups(backupDir: string, retentionDays: number, filenamePrefix: string): number { | |
| if (!existsSync(backupDir)) return 0; | |
| const safeRetention = Math.max(1, Math.trunc(retentionDays)); | |
| const cutoff = Date.now() - safeRetention * 24 * 60 * 60 * 1000; | |
| let pruned = 0; | |
| for (const name of readdirSync(backupDir)) { | |
| if (!name.startsWith(`${filenamePrefix}-`) || !name.endsWith(".sql")) continue; | |
| const fullPath = resolve(backupDir, name); | |
| const stat = statSync(fullPath); | |
| if (stat.mtimeMs < cutoff) { | |
| unlinkSync(fullPath); | |
| pruned++; | |
| } | |
| } | |
| return pruned; | |
| } | |
| function formatBackupSize(sizeBytes: number): string { | |
| if (sizeBytes < 1024) return `${sizeBytes}B`; | |
| if (sizeBytes < 1024 * 1024) return `${(sizeBytes / 1024).toFixed(1)}K`; | |
| return `${(sizeBytes / (1024 * 1024)).toFixed(1)}M`; | |
| } | |
| function formatSqlLiteral(value: string): string { | |
| const sanitized = value.replace(/\u0000/g, ""); | |
| let tag = "$paperclip$"; | |
| while (sanitized.includes(tag)) { | |
| tag = `$paperclip_${Math.random().toString(36).slice(2, 8)}$`; | |
| } | |
| return `${tag}${sanitized}${tag}`; | |
| } | |
| function normalizeTableNameSet(values: string[] | undefined): Set<string> { | |
| return new Set( | |
| (values ?? []) | |
| .map((value) => value.trim()) | |
| .filter((value) => value.length > 0), | |
| ); | |
| } | |
| function normalizeNullifyColumnMap(values: Record<string, string[]> | undefined): Map<string, Set<string>> { | |
| const out = new Map<string, Set<string>>(); | |
| if (!values) return out; | |
| for (const [tableName, columns] of Object.entries(values)) { | |
| const normalizedTable = tableName.trim(); | |
| if (normalizedTable.length === 0) continue; | |
| const normalizedColumns = new Set( | |
| columns | |
| .map((column) => column.trim()) | |
| .filter((column) => column.length > 0), | |
| ); | |
| if (normalizedColumns.size > 0) { | |
| out.set(normalizedTable, normalizedColumns); | |
| } | |
| } | |
| return out; | |
| } | |
| function quoteIdentifier(value: string): string { | |
| return `"${value.replaceAll("\"", "\"\"")}"`; | |
| } | |
| function quoteQualifiedName(schemaName: string, objectName: string): string { | |
| return `${quoteIdentifier(schemaName)}.${quoteIdentifier(objectName)}`; | |
| } | |
| function tableKey(schemaName: string, tableName: string): string { | |
| return `${schemaName}.${tableName}`; | |
| } | |
| export function createBufferedTextFileWriter(filePath: string, maxBufferedBytes = DEFAULT_BACKUP_WRITE_BUFFER_BYTES) { | |
| const stream = createWriteStream(filePath, { encoding: "utf8" }); | |
| const flushThreshold = Math.max(1, Math.trunc(maxBufferedBytes)); | |
| let bufferedLines: string[] = []; | |
| let bufferedBytes = 0; | |
| let firstChunk = true; | |
| let closed = false; | |
| let streamError: Error | null = null; | |
| let pendingWrite = Promise.resolve(); | |
| stream.on("error", (error) => { | |
| streamError = error; | |
| }); | |
| const writeChunk = async (chunk: string): Promise<void> => { | |
| if (streamError) throw streamError; | |
| const canContinue = stream.write(chunk); | |
| if (!canContinue) { | |
| await new Promise<void>((resolve, reject) => { | |
| const handleDrain = () => { | |
| cleanup(); | |
| resolve(); | |
| }; | |
| const handleError = (error: Error) => { | |
| cleanup(); | |
| reject(error); | |
| }; | |
| const cleanup = () => { | |
| stream.off("drain", handleDrain); | |
| stream.off("error", handleError); | |
| }; | |
| stream.once("drain", handleDrain); | |
| stream.once("error", handleError); | |
| }); | |
| } | |
| if (streamError) throw streamError; | |
| }; | |
| const flushBufferedLines = () => { | |
| if (bufferedLines.length === 0) return; | |
| const linesToWrite = bufferedLines; | |
| bufferedLines = []; | |
| bufferedBytes = 0; | |
| const chunkBody = linesToWrite.join("\n"); | |
| const chunk = firstChunk ? chunkBody : `\n${chunkBody}`; | |
| firstChunk = false; | |
| pendingWrite = pendingWrite.then(() => writeChunk(chunk)); | |
| }; | |
| return { | |
| emit(line: string) { | |
| if (closed) { | |
| throw new Error(`Cannot write to closed backup file: ${filePath}`); | |
| } | |
| if (streamError) throw streamError; | |
| bufferedLines.push(line); | |
| bufferedBytes += Buffer.byteLength(line, "utf8") + 1; | |
| if (bufferedBytes >= flushThreshold) { | |
| flushBufferedLines(); | |
| } | |
| }, | |
| async close() { | |
| if (closed) return; | |
| closed = true; | |
| flushBufferedLines(); | |
| await pendingWrite; | |
| await new Promise<void>((resolve, reject) => { | |
| if (streamError) { | |
| reject(streamError); | |
| return; | |
| } | |
| stream.end((error?: Error | null) => { | |
| if (error) reject(error); | |
| else resolve(); | |
| }); | |
| }); | |
| if (streamError) throw streamError; | |
| }, | |
| async abort() { | |
| if (closed) return; | |
| closed = true; | |
| bufferedLines = []; | |
| bufferedBytes = 0; | |
| stream.destroy(); | |
| await pendingWrite.catch(() => {}); | |
| if (existsSync(filePath)) { | |
| try { | |
| unlinkSync(filePath); | |
| } catch { | |
| // Preserve the original backup failure if temporary file cleanup also fails. | |
| } | |
| } | |
| }, | |
| }; | |
| } | |
| export async function runDatabaseBackup(opts: RunDatabaseBackupOptions): Promise<RunDatabaseBackupResult> { | |
| const filenamePrefix = opts.filenamePrefix ?? "paperclip"; | |
| const retentionDays = Math.max(1, Math.trunc(opts.retentionDays)); | |
| const connectTimeout = Math.max(1, Math.trunc(opts.connectTimeoutSeconds ?? 5)); | |
| const includeMigrationJournal = opts.includeMigrationJournal === true; | |
| const excludedTableNames = normalizeTableNameSet(opts.excludeTables); | |
| const nullifiedColumnsByTable = normalizeNullifyColumnMap(opts.nullifyColumns); | |
| const sql = postgres(opts.connectionString, { max: 1, connect_timeout: connectTimeout }); | |
| mkdirSync(opts.backupDir, { recursive: true }); | |
| const backupFile = resolve(opts.backupDir, `${filenamePrefix}-${timestamp()}.sql`); | |
| const writer = createBufferedTextFileWriter(backupFile); | |
| try { | |
| await sql`SELECT 1`; | |
| const emit = (line: string) => writer.emit(line); | |
| const emitStatement = (statement: string) => { | |
| emit(statement); | |
| emit(STATEMENT_BREAKPOINT); | |
| }; | |
| const emitStatementBoundary = () => { | |
| emit(STATEMENT_BREAKPOINT); | |
| }; | |
| emit("-- Paperclip database backup"); | |
| emit(`-- Created: ${new Date().toISOString()}`); | |
| emit(""); | |
| emitStatement("BEGIN;"); | |
| emitStatement("SET LOCAL session_replication_role = replica;"); | |
| emitStatement("SET LOCAL client_min_messages = warning;"); | |
| emit(""); | |
| const allTables = await sql<TableDefinition[]>` | |
| SELECT table_schema AS schema_name, table_name AS tablename | |
| FROM information_schema.tables | |
| WHERE table_type = 'BASE TABLE' | |
| AND ( | |
| table_schema = 'public' | |
| OR (${includeMigrationJournal}::boolean AND table_schema = ${DRIZZLE_SCHEMA} AND table_name = ${DRIZZLE_MIGRATIONS_TABLE}) | |
| ) | |
| ORDER BY table_schema, table_name | |
| `; | |
| const tables = allTables; | |
| const includedTableNames = new Set(tables.map(({ schema_name, tablename }) => tableKey(schema_name, tablename))); | |
| // Get all enums | |
| const enums = await sql<{ typname: string; labels: string[] }[]>` | |
| SELECT t.typname, array_agg(e.enumlabel ORDER BY e.enumsortorder) AS labels | |
| FROM pg_type t | |
| JOIN pg_enum e ON t.oid = e.enumtypid | |
| JOIN pg_namespace n ON t.typnamespace = n.oid | |
| WHERE n.nspname = 'public' | |
| GROUP BY t.typname | |
| ORDER BY t.typname | |
| `; | |
| for (const e of enums) { | |
| const labels = e.labels.map((l) => `'${l.replace(/'/g, "''")}'`).join(", "); | |
| emitStatement(`CREATE TYPE "public"."${e.typname}" AS ENUM (${labels});`); | |
| } | |
| if (enums.length > 0) emit(""); | |
| const allSequences = await sql<SequenceDefinition[]>` | |
| SELECT | |
| s.sequence_schema, | |
| s.sequence_name, | |
| s.data_type, | |
| s.start_value, | |
| s.minimum_value, | |
| s.maximum_value, | |
| s.increment, | |
| s.cycle_option, | |
| tblns.nspname AS owner_schema, | |
| tbl.relname AS owner_table, | |
| attr.attname AS owner_column | |
| FROM information_schema.sequences s | |
| JOIN pg_class seq ON seq.relname = s.sequence_name | |
| JOIN pg_namespace n ON n.oid = seq.relnamespace AND n.nspname = s.sequence_schema | |
| LEFT JOIN pg_depend dep ON dep.objid = seq.oid AND dep.deptype = 'a' | |
| LEFT JOIN pg_class tbl ON tbl.oid = dep.refobjid | |
| LEFT JOIN pg_namespace tblns ON tblns.oid = tbl.relnamespace | |
| LEFT JOIN pg_attribute attr ON attr.attrelid = tbl.oid AND attr.attnum = dep.refobjsubid | |
| WHERE s.sequence_schema = 'public' | |
| OR (${includeMigrationJournal}::boolean AND s.sequence_schema = ${DRIZZLE_SCHEMA}) | |
| ORDER BY s.sequence_schema, s.sequence_name | |
| `; | |
| const sequences = allSequences.filter( | |
| (seq) => !seq.owner_table || includedTableNames.has(tableKey(seq.owner_schema ?? "public", seq.owner_table)), | |
| ); | |
| const schemas = new Set<string>(); | |
| for (const table of tables) schemas.add(table.schema_name); | |
| for (const seq of sequences) schemas.add(seq.sequence_schema); | |
| const extraSchemas = [...schemas].filter((schemaName) => schemaName !== "public"); | |
| if (extraSchemas.length > 0) { | |
| emit("-- Schemas"); | |
| for (const schemaName of extraSchemas) { | |
| emitStatement(`CREATE SCHEMA IF NOT EXISTS ${quoteIdentifier(schemaName)};`); | |
| } | |
| emit(""); | |
| } | |
| if (sequences.length > 0) { | |
| emit("-- Sequences"); | |
| for (const seq of sequences) { | |
| const qualifiedSequenceName = quoteQualifiedName(seq.sequence_schema, seq.sequence_name); | |
| emitStatement(`DROP SEQUENCE IF EXISTS ${qualifiedSequenceName} CASCADE;`); | |
| emitStatement( | |
| `CREATE SEQUENCE ${qualifiedSequenceName} AS ${seq.data_type} INCREMENT BY ${seq.increment} MINVALUE ${seq.minimum_value} MAXVALUE ${seq.maximum_value} START WITH ${seq.start_value}${seq.cycle_option === "YES" ? " CYCLE" : " NO CYCLE"};`, | |
| ); | |
| } | |
| emit(""); | |
| } | |
| // Get full CREATE TABLE DDL via column info | |
| for (const { schema_name, tablename } of tables) { | |
| const qualifiedTableName = quoteQualifiedName(schema_name, tablename); | |
| const columns = await sql<{ | |
| column_name: string; | |
| data_type: string; | |
| udt_name: string; | |
| is_nullable: string; | |
| column_default: string | null; | |
| character_maximum_length: number | null; | |
| numeric_precision: number | null; | |
| numeric_scale: number | null; | |
| }[]>` | |
| SELECT column_name, data_type, udt_name, is_nullable, column_default, | |
| character_maximum_length, numeric_precision, numeric_scale | |
| FROM information_schema.columns | |
| WHERE table_schema = ${schema_name} AND table_name = ${tablename} | |
| ORDER BY ordinal_position | |
| `; | |
| emit(`-- Table: ${schema_name}.${tablename}`); | |
| emitStatement(`DROP TABLE IF EXISTS ${qualifiedTableName} CASCADE;`); | |
| const colDefs: string[] = []; | |
| for (const col of columns) { | |
| let typeStr: string; | |
| if (col.data_type === "USER-DEFINED") { | |
| typeStr = `"${col.udt_name}"`; | |
| } else if (col.data_type === "ARRAY") { | |
| typeStr = `${col.udt_name.replace(/^_/, "")}[]`; | |
| } else if (col.data_type === "character varying") { | |
| typeStr = col.character_maximum_length | |
| ? `varchar(${col.character_maximum_length})` | |
| : "varchar"; | |
| } else if (col.data_type === "numeric" && col.numeric_precision != null) { | |
| typeStr = | |
| col.numeric_scale != null | |
| ? `numeric(${col.numeric_precision}, ${col.numeric_scale})` | |
| : `numeric(${col.numeric_precision})`; | |
| } else { | |
| typeStr = col.data_type; | |
| } | |
| let def = ` "${col.column_name}" ${typeStr}`; | |
| if (col.column_default != null) def += ` DEFAULT ${col.column_default}`; | |
| if (col.is_nullable === "NO") def += " NOT NULL"; | |
| colDefs.push(def); | |
| } | |
| // Primary key | |
| const pk = await sql<{ constraint_name: string; column_names: string[] }[]>` | |
| SELECT c.conname AS constraint_name, | |
| array_agg(a.attname ORDER BY array_position(c.conkey, a.attnum)) AS column_names | |
| FROM pg_constraint c | |
| JOIN pg_class t ON t.oid = c.conrelid | |
| JOIN pg_namespace n ON n.oid = t.relnamespace | |
| JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(c.conkey) | |
| WHERE n.nspname = ${schema_name} AND t.relname = ${tablename} AND c.contype = 'p' | |
| GROUP BY c.conname | |
| `; | |
| for (const p of pk) { | |
| const cols = p.column_names.map((c) => `"${c}"`).join(", "); | |
| colDefs.push(` CONSTRAINT "${p.constraint_name}" PRIMARY KEY (${cols})`); | |
| } | |
| emit(`CREATE TABLE ${qualifiedTableName} (`); | |
| emit(colDefs.join(",\n")); | |
| emit(");"); | |
| emitStatementBoundary(); | |
| emit(""); | |
| } | |
| const ownedSequences = sequences.filter((seq) => seq.owner_table && seq.owner_column); | |
| if (ownedSequences.length > 0) { | |
| emit("-- Sequence ownership"); | |
| for (const seq of ownedSequences) { | |
| emitStatement( | |
| `ALTER SEQUENCE ${quoteQualifiedName(seq.sequence_schema, seq.sequence_name)} OWNED BY ${quoteQualifiedName(seq.owner_schema ?? "public", seq.owner_table!)}.${quoteIdentifier(seq.owner_column!)};`, | |
| ); | |
| } | |
| emit(""); | |
| } | |
| // Foreign keys (after all tables created) | |
| const allForeignKeys = await sql<{ | |
| constraint_name: string; | |
| source_schema: string; | |
| source_table: string; | |
| source_columns: string[]; | |
| target_schema: string; | |
| target_table: string; | |
| target_columns: string[]; | |
| update_rule: string; | |
| delete_rule: string; | |
| }[]>` | |
| SELECT | |
| c.conname AS constraint_name, | |
| srcn.nspname AS source_schema, | |
| src.relname AS source_table, | |
| array_agg(sa.attname ORDER BY array_position(c.conkey, sa.attnum)) AS source_columns, | |
| tgtn.nspname AS target_schema, | |
| tgt.relname AS target_table, | |
| array_agg(ta.attname ORDER BY array_position(c.confkey, ta.attnum)) AS target_columns, | |
| CASE c.confupdtype WHEN 'a' THEN 'NO ACTION' WHEN 'r' THEN 'RESTRICT' WHEN 'c' THEN 'CASCADE' WHEN 'n' THEN 'SET NULL' WHEN 'd' THEN 'SET DEFAULT' END AS update_rule, | |
| CASE c.confdeltype WHEN 'a' THEN 'NO ACTION' WHEN 'r' THEN 'RESTRICT' WHEN 'c' THEN 'CASCADE' WHEN 'n' THEN 'SET NULL' WHEN 'd' THEN 'SET DEFAULT' END AS delete_rule | |
| FROM pg_constraint c | |
| JOIN pg_class src ON src.oid = c.conrelid | |
| JOIN pg_namespace srcn ON srcn.oid = src.relnamespace | |
| JOIN pg_class tgt ON tgt.oid = c.confrelid | |
| JOIN pg_namespace tgtn ON tgtn.oid = tgt.relnamespace | |
| JOIN pg_attribute sa ON sa.attrelid = src.oid AND sa.attnum = ANY(c.conkey) | |
| JOIN pg_attribute ta ON ta.attrelid = tgt.oid AND ta.attnum = ANY(c.confkey) | |
| WHERE c.contype = 'f' AND ( | |
| srcn.nspname = 'public' | |
| OR (${includeMigrationJournal}::boolean AND srcn.nspname = ${DRIZZLE_SCHEMA}) | |
| ) | |
| GROUP BY c.conname, srcn.nspname, src.relname, tgtn.nspname, tgt.relname, c.confupdtype, c.confdeltype | |
| ORDER BY srcn.nspname, src.relname, c.conname | |
| `; | |
| const fks = allForeignKeys.filter( | |
| (fk) => includedTableNames.has(tableKey(fk.source_schema, fk.source_table)) | |
| && includedTableNames.has(tableKey(fk.target_schema, fk.target_table)), | |
| ); | |
| if (fks.length > 0) { | |
| emit("-- Foreign keys"); | |
| for (const fk of fks) { | |
| const srcCols = fk.source_columns.map((c) => `"${c}"`).join(", "); | |
| const tgtCols = fk.target_columns.map((c) => `"${c}"`).join(", "); | |
| emitStatement( | |
| `ALTER TABLE ${quoteQualifiedName(fk.source_schema, fk.source_table)} ADD CONSTRAINT "${fk.constraint_name}" FOREIGN KEY (${srcCols}) REFERENCES ${quoteQualifiedName(fk.target_schema, fk.target_table)} (${tgtCols}) ON UPDATE ${fk.update_rule} ON DELETE ${fk.delete_rule};`, | |
| ); | |
| } | |
| emit(""); | |
| } | |
| // Unique constraints | |
| const allUniqueConstraints = await sql<{ | |
| constraint_name: string; | |
| schema_name: string; | |
| tablename: string; | |
| column_names: string[]; | |
| }[]>` | |
| SELECT c.conname AS constraint_name, | |
| n.nspname AS schema_name, | |
| t.relname AS tablename, | |
| array_agg(a.attname ORDER BY array_position(c.conkey, a.attnum)) AS column_names | |
| FROM pg_constraint c | |
| JOIN pg_class t ON t.oid = c.conrelid | |
| JOIN pg_namespace n ON n.oid = t.relnamespace | |
| JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(c.conkey) | |
| WHERE c.contype = 'u' AND ( | |
| n.nspname = 'public' | |
| OR (${includeMigrationJournal}::boolean AND n.nspname = ${DRIZZLE_SCHEMA}) | |
| ) | |
| GROUP BY c.conname, n.nspname, t.relname | |
| ORDER BY n.nspname, t.relname, c.conname | |
| `; | |
| const uniques = allUniqueConstraints.filter((entry) => includedTableNames.has(tableKey(entry.schema_name, entry.tablename))); | |
| if (uniques.length > 0) { | |
| emit("-- Unique constraints"); | |
| for (const u of uniques) { | |
| const cols = u.column_names.map((c) => `"${c}"`).join(", "); | |
| emitStatement(`ALTER TABLE ${quoteQualifiedName(u.schema_name, u.tablename)} ADD CONSTRAINT "${u.constraint_name}" UNIQUE (${cols});`); | |
| } | |
| emit(""); | |
| } | |
| // Indexes (non-primary, non-unique-constraint) | |
| const allIndexes = await sql<{ schema_name: string; tablename: string; indexdef: string }[]>` | |
| SELECT schemaname AS schema_name, tablename, indexdef | |
| FROM pg_indexes | |
| WHERE ( | |
| schemaname = 'public' | |
| OR (${includeMigrationJournal}::boolean AND schemaname = ${DRIZZLE_SCHEMA}) | |
| ) | |
| AND indexname NOT IN ( | |
| SELECT conname FROM pg_constraint c | |
| JOIN pg_namespace n ON n.oid = c.connamespace | |
| WHERE n.nspname = pg_indexes.schemaname | |
| ) | |
| ORDER BY schemaname, tablename, indexname | |
| `; | |
| const indexes = allIndexes.filter((entry) => includedTableNames.has(tableKey(entry.schema_name, entry.tablename))); | |
| if (indexes.length > 0) { | |
| emit("-- Indexes"); | |
| for (const idx of indexes) { | |
| emitStatement(`${idx.indexdef};`); | |
| } | |
| emit(""); | |
| } | |
| // Dump data for each table | |
| for (const { schema_name, tablename } of tables) { | |
| const qualifiedTableName = quoteQualifiedName(schema_name, tablename); | |
| const count = await sql.unsafe<{ n: number }[]>(`SELECT count(*)::int AS n FROM ${qualifiedTableName}`); | |
| if (excludedTableNames.has(tablename) || (count[0]?.n ?? 0) === 0) continue; | |
| // Get column info for this table | |
| const cols = await sql<{ column_name: string; data_type: string }[]>` | |
| SELECT column_name, data_type | |
| FROM information_schema.columns | |
| WHERE table_schema = ${schema_name} AND table_name = ${tablename} | |
| ORDER BY ordinal_position | |
| `; | |
| const colNames = cols.map((c) => `"${c.column_name}"`).join(", "); | |
| emit(`-- Data for: ${schema_name}.${tablename} (${count[0]!.n} rows)`); | |
| const rows = await sql.unsafe(`SELECT * FROM ${qualifiedTableName}`).values(); | |
| const nullifiedColumns = nullifiedColumnsByTable.get(tablename) ?? new Set<string>(); | |
| for (const row of rows) { | |
| const values = row.map((rawValue: unknown, index) => { | |
| const columnName = cols[index]?.column_name; | |
| const val = columnName && nullifiedColumns.has(columnName) ? null : rawValue; | |
| if (val === null || val === undefined) return "NULL"; | |
| if (typeof val === "boolean") return val ? "true" : "false"; | |
| if (typeof val === "number") return String(val); | |
| if (val instanceof Date) return formatSqlLiteral(val.toISOString()); | |
| if (typeof val === "object") return formatSqlLiteral(JSON.stringify(val)); | |
| return formatSqlLiteral(String(val)); | |
| }); | |
| emitStatement(`INSERT INTO ${qualifiedTableName} (${colNames}) VALUES (${values.join(", ")});`); | |
| } | |
| emit(""); | |
| } | |
| // Sequence values | |
| if (sequences.length > 0) { | |
| emit("-- Sequence values"); | |
| for (const seq of sequences) { | |
| const qualifiedSequenceName = quoteQualifiedName(seq.sequence_schema, seq.sequence_name); | |
| const val = await sql.unsafe<{ last_value: string; is_called: boolean }[]>( | |
| `SELECT last_value::text, is_called FROM ${qualifiedSequenceName}`, | |
| ); | |
| const skipSequenceValue = | |
| seq.owner_table !== null | |
| && excludedTableNames.has(seq.owner_table); | |
| if (val[0] && !skipSequenceValue) { | |
| emitStatement(`SELECT setval('${qualifiedSequenceName.replaceAll("'", "''")}', ${val[0].last_value}, ${val[0].is_called ? "true" : "false"});`); | |
| } | |
| } | |
| emit(""); | |
| } | |
| emitStatement("COMMIT;"); | |
| emit(""); | |
| await writer.close(); | |
| const sizeBytes = statSync(backupFile).size; | |
| const prunedCount = pruneOldBackups(opts.backupDir, retentionDays, filenamePrefix); | |
| return { | |
| backupFile, | |
| sizeBytes, | |
| prunedCount, | |
| }; | |
| } catch (error) { | |
| await writer.abort(); | |
| throw error; | |
| } finally { | |
| await sql.end(); | |
| } | |
| } | |
| export async function runDatabaseRestore(opts: RunDatabaseRestoreOptions): Promise<void> { | |
| const connectTimeout = Math.max(1, Math.trunc(opts.connectTimeoutSeconds ?? 5)); | |
| const sql = postgres(opts.connectionString, { max: 1, connect_timeout: connectTimeout }); | |
| try { | |
| await sql`SELECT 1`; | |
| const contents = await readFile(opts.backupFile, "utf8"); | |
| const statements = contents | |
| .split(STATEMENT_BREAKPOINT) | |
| .map((statement) => statement.trim()) | |
| .filter((statement) => statement.length > 0); | |
| for (const statement of statements) { | |
| await sql.unsafe(statement).execute(); | |
| } | |
| } catch (error) { | |
| const statementPreview = typeof error === "object" && error !== null && typeof (error as Record<string, unknown>).query === "string" | |
| ? String((error as Record<string, unknown>).query) | |
| .split(/\r?\n/) | |
| .map((line) => line.trim()) | |
| .find((line) => line.length > 0 && !line.startsWith("--")) | |
| : null; | |
| throw new Error( | |
| `Failed to restore ${basename(opts.backupFile)}: ${sanitizeRestoreErrorMessage(error)}${statementPreview ? ` [statement: ${statementPreview.slice(0, 120)}]` : ""}`, | |
| ); | |
| } finally { | |
| await sql.end(); | |
| } | |
| } | |
| export function formatDatabaseBackupResult(result: RunDatabaseBackupResult): string { | |
| const size = formatBackupSize(result.sizeBytes); | |
| const pruned = result.prunedCount > 0 ? `; pruned ${result.prunedCount} old backup(s)` : ""; | |
| return `${result.backupFile} (${size}${pruned})`; | |
| } | |