/** * Persists each provider's recent per-call duration window to Postgres * so the admin sparkline (Task #127) survives a server restart, redeploy * or worker rotation. The in-memory ring buffer in `gateway.ts` remains * the source of truth at runtime; this layer just mirrors it on a * debounced flush tick and rehydrates on boot. * * Keyed by `(adapter, source)` and capped at the same * `RECENT_DURATION_LIMIT = 50` window used in memory. */ import { db, providerLatencySamples } from "@workspace/db"; import { sql } from "drizzle-orm"; import { logger } from "../lib/logger"; import { _setSampleDirtyHook, getRecentSamplesFor, restoreRecentSamples, } from "./gateway"; const FLUSH_INTERVAL_MS = 5_000; const dirty = new Set(); let flushTimer: NodeJS.Timeout | null = null; function dirtyKey(adapter: string, source: string): string { return `${adapter}\u0000${source}`; } /** * Read every persisted window from Postgres and push it back into the * gateway's in-memory state. Safe to call once at boot before serving * traffic — failures are logged but do not crash startup. */ export async function loadPersistedLatencySamples(): Promise { try { const rows = await db .select({ adapter: providerLatencySamples.adapter, source: providerLatencySamples.source, samples: providerLatencySamples.samples, }) .from(providerLatencySamples); let restored = 0; for (const r of rows) { const samples = Array.isArray(r.samples) ? r.samples : []; restoreRecentSamples(r.adapter, r.source, samples); restored += samples.length; } logger.info( { providers: rows.length, samples: restored }, "restored persisted provider latency samples", ); } catch (err) { logger.warn( { err }, "failed to load persisted provider latency samples; sparkline will start empty", ); } } /** * Begin mirroring the in-memory recent-samples window to Postgres. * Registers a dirty hook with the gateway so each new sample marks * its provider for the next flush tick (debounced at * `FLUSH_INTERVAL_MS`). Idempotent. */ export function startLatencySamplePersistence(): void { if (flushTimer) return; _setSampleDirtyHook((adapter, source) => { dirty.add(dirtyKey(adapter, source)); }); flushTimer = setInterval(() => { void flushDirtySamples(); }, FLUSH_INTERVAL_MS); flushTimer.unref?.(); } /** Test/admin helper — stop the flush loop and unregister the hook. */ export function stopLatencySamplePersistence(): void { if (flushTimer) { clearInterval(flushTimer); flushTimer = null; } _setSampleDirtyHook(null); dirty.clear(); } /** * Drain the dirty set, upserting one row per provider. Exported so * tests (and a graceful shutdown handler, if added later) can force a * synchronous flush. Errors are isolated per row. */ export async function flushDirtySamples(): Promise { if (dirty.size === 0) return; const batch = Array.from(dirty); dirty.clear(); for (const key of batch) { const [adapter, source] = key.split("\u0000"); if (!adapter || !source) continue; const samples = getRecentSamplesFor(adapter, source); try { await db .insert(providerLatencySamples) .values({ adapter, source, samples }) .onConflictDoUpdate({ target: [ providerLatencySamples.adapter, providerLatencySamples.source, ], set: { samples, updatedAt: sql`NOW()` }, }); } catch (err) { logger.warn( { err, adapter, source }, "failed to persist provider latency samples", ); } } }