File size: 3,694 Bytes
6d1fe92
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/**
 * Persists each provider's recent per-call duration window to Postgres
 * so the admin sparkline (Task #127) survives a server restart, redeploy
 * or worker rotation. The in-memory ring buffer in `gateway.ts` remains
 * the source of truth at runtime; this layer just mirrors it on a
 * debounced flush tick and rehydrates on boot.
 *
 * Keyed by `(adapter, source)` and capped at the same
 * `RECENT_DURATION_LIMIT = 50` window used in memory.
 */
import { db, providerLatencySamples } from "@workspace/db";
import { sql } from "drizzle-orm";
import { logger } from "../lib/logger";
import {
  _setSampleDirtyHook,
  getRecentSamplesFor,
  restoreRecentSamples,
} from "./gateway";

const FLUSH_INTERVAL_MS = 5_000;

const dirty = new Set<string>();
let flushTimer: NodeJS.Timeout | null = null;

function dirtyKey(adapter: string, source: string): string {
  return `${adapter}\u0000${source}`;
}

/**
 * Read every persisted window from Postgres and push it back into the
 * gateway's in-memory state. Safe to call once at boot before serving
 * traffic — failures are logged but do not crash startup.
 */
export async function loadPersistedLatencySamples(): Promise<void> {
  try {
    const rows = await db
      .select({
        adapter: providerLatencySamples.adapter,
        source: providerLatencySamples.source,
        samples: providerLatencySamples.samples,
      })
      .from(providerLatencySamples);
    let restored = 0;
    for (const r of rows) {
      const samples = Array.isArray(r.samples) ? r.samples : [];
      restoreRecentSamples(r.adapter, r.source, samples);
      restored += samples.length;
    }
    logger.info(
      { providers: rows.length, samples: restored },
      "restored persisted provider latency samples",
    );
  } catch (err) {
    logger.warn(
      { err },
      "failed to load persisted provider latency samples; sparkline will start empty",
    );
  }
}

/**
 * Begin mirroring the in-memory recent-samples window to Postgres.
 * Registers a dirty hook with the gateway so each new sample marks
 * its provider for the next flush tick (debounced at
 * `FLUSH_INTERVAL_MS`). Idempotent.
 */
export function startLatencySamplePersistence(): void {
  if (flushTimer) return;
  _setSampleDirtyHook((adapter, source) => {
    dirty.add(dirtyKey(adapter, source));
  });
  flushTimer = setInterval(() => {
    void flushDirtySamples();
  }, FLUSH_INTERVAL_MS);
  flushTimer.unref?.();
}

/** Test/admin helper — stop the flush loop and unregister the hook. */
export function stopLatencySamplePersistence(): void {
  if (flushTimer) {
    clearInterval(flushTimer);
    flushTimer = null;
  }
  _setSampleDirtyHook(null);
  dirty.clear();
}

/**
 * Drain the dirty set, upserting one row per provider. Exported so
 * tests (and a graceful shutdown handler, if added later) can force a
 * synchronous flush. Errors are isolated per row.
 */
export async function flushDirtySamples(): Promise<void> {
  if (dirty.size === 0) return;
  const batch = Array.from(dirty);
  dirty.clear();
  for (const key of batch) {
    const [adapter, source] = key.split("\u0000");
    if (!adapter || !source) continue;
    const samples = getRecentSamplesFor(adapter, source);
    try {
      await db
        .insert(providerLatencySamples)
        .values({ adapter, source, samples })
        .onConflictDoUpdate({
          target: [
            providerLatencySamples.adapter,
            providerLatencySamples.source,
          ],
          set: { samples, updatedAt: sql`NOW()` },
        });
    } catch (err) {
      logger.warn(
        { err, adapter, source },
        "failed to persist provider latency samples",
      );
    }
  }
}