File size: 3,694 Bytes
6d1fe92 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | /**
* Persists each provider's recent per-call duration window to Postgres
* so the admin sparkline (Task #127) survives a server restart, redeploy
* or worker rotation. The in-memory ring buffer in `gateway.ts` remains
* the source of truth at runtime; this layer just mirrors it on a
* debounced flush tick and rehydrates on boot.
*
* Keyed by `(adapter, source)` and capped at the same
* `RECENT_DURATION_LIMIT = 50` window used in memory.
*/
import { db, providerLatencySamples } from "@workspace/db";
import { sql } from "drizzle-orm";
import { logger } from "../lib/logger";
import {
_setSampleDirtyHook,
getRecentSamplesFor,
restoreRecentSamples,
} from "./gateway";
const FLUSH_INTERVAL_MS = 5_000;
const dirty = new Set<string>();
let flushTimer: NodeJS.Timeout | null = null;
function dirtyKey(adapter: string, source: string): string {
return `${adapter}\u0000${source}`;
}
/**
* Read every persisted window from Postgres and push it back into the
* gateway's in-memory state. Safe to call once at boot before serving
* traffic — failures are logged but do not crash startup.
*/
export async function loadPersistedLatencySamples(): Promise<void> {
try {
const rows = await db
.select({
adapter: providerLatencySamples.adapter,
source: providerLatencySamples.source,
samples: providerLatencySamples.samples,
})
.from(providerLatencySamples);
let restored = 0;
for (const r of rows) {
const samples = Array.isArray(r.samples) ? r.samples : [];
restoreRecentSamples(r.adapter, r.source, samples);
restored += samples.length;
}
logger.info(
{ providers: rows.length, samples: restored },
"restored persisted provider latency samples",
);
} catch (err) {
logger.warn(
{ err },
"failed to load persisted provider latency samples; sparkline will start empty",
);
}
}
/**
* Begin mirroring the in-memory recent-samples window to Postgres.
* Registers a dirty hook with the gateway so each new sample marks
* its provider for the next flush tick (debounced at
* `FLUSH_INTERVAL_MS`). Idempotent.
*/
export function startLatencySamplePersistence(): void {
if (flushTimer) return;
_setSampleDirtyHook((adapter, source) => {
dirty.add(dirtyKey(adapter, source));
});
flushTimer = setInterval(() => {
void flushDirtySamples();
}, FLUSH_INTERVAL_MS);
flushTimer.unref?.();
}
/** Test/admin helper — stop the flush loop and unregister the hook. */
export function stopLatencySamplePersistence(): void {
if (flushTimer) {
clearInterval(flushTimer);
flushTimer = null;
}
_setSampleDirtyHook(null);
dirty.clear();
}
/**
* Drain the dirty set, upserting one row per provider. Exported so
* tests (and a graceful shutdown handler, if added later) can force a
* synchronous flush. Errors are isolated per row.
*/
export async function flushDirtySamples(): Promise<void> {
if (dirty.size === 0) return;
const batch = Array.from(dirty);
dirty.clear();
for (const key of batch) {
const [adapter, source] = key.split("\u0000");
if (!adapter || !source) continue;
const samples = getRecentSamplesFor(adapter, source);
try {
await db
.insert(providerLatencySamples)
.values({ adapter, source, samples })
.onConflictDoUpdate({
target: [
providerLatencySamples.adapter,
providerLatencySamples.source,
],
set: { samples, updatedAt: sql`NOW()` },
});
} catch (err) {
logger.warn(
{ err, adapter, source },
"failed to persist provider latency samples",
);
}
}
}
|