/** * Dynamic shadow-budget — DB-path integration tests. * * The pure-logic counterpart in `shadow-budget.unit.test.mjs` only * exercises `decideShadowBudget` / `shouldEmitThresholdEvent`. This * suite drives `recordShadowSample` + `computeShadowBudget` + the * `_handlers.listNetworks` admin route against an ephemeral Postgres * schema (mirrors the pattern in `tool-graph.test.ts`) so we catch * regressions in: * - the `network_shadow_samples` row write (incl. `budget_skipped`), * - the `shadow_budget_skipped` event payload now carrying * `thresholdMs` / `budgetMode` / `p75Ms`, * - the `shadow_budget_threshold` event firing on first dynamic * threshold and on >=10% drift, * - the `/api/admin/evolution/networks` payload exposing the new * `shadowBudget` field. * * Skipped when DATABASE_URL is unset. */ import { test } from "node:test"; import assert from "node:assert/strict"; import { randomBytes } from "node:crypto"; import type { RequestHandler } from "express"; const ORIGINAL_DSN = process.env.DATABASE_URL || ""; const SKIP = !ORIGINAL_DSN; function dsnWithSearchPath(dsn: string, schema: string): string { const opt = `options=-c%20search_path%3D${encodeURIComponent(schema)}`; return dsn.includes("?") ? `${dsn}&${opt}` : `${dsn}?${opt}`; } interface CapturedResponse { statusCode: number; body: unknown; } async function callHandler( handler: RequestHandler, req: Record, ): Promise { let statusCode = 200; let body: unknown = null; const res = { status(code: number) { statusCode = code; return this; }, json(payload: unknown) { body = payload; return this; }, }; await handler(req as never, res as never, (() => {}) as never); return { statusCode, body }; } test("shadow budget DB integration", { skip: SKIP && "DATABASE_URL not set", }, async (t) => { const schema = `shadow_budget_test_${randomBytes(6).toString("hex")}`; process.env.DATABASE_URL = dsnWithSearchPath(ORIGINAL_DSN, schema); const { pool } = await import("@workspace/db"); await pool.query(`CREATE SCHEMA "${schema}"`); // Mirror the three tables touched by recordShadowSample + // listNetworks. We deliberately skip network_versions / // network_version_metrics — the listNetworks handler only invokes // rollingFitness when activeVariantId is non-null, so the test // network leaves it null and the fitness path is short-circuited. await pool.query(` CREATE TABLE "${schema}".tool_networks ( id text PRIMARY KEY, name text NOT NULL UNIQUE, problem_class_path text NOT NULL, description text NOT NULL DEFAULT '', input_contract jsonb NOT NULL, output_contract jsonb NOT NULL, internal_graph jsonb NOT NULL, active_variant_id text, builder_model_tier text NOT NULL DEFAULT 'strong', release_tier_floor text NOT NULL DEFAULT 'strong', config jsonb NOT NULL DEFAULT '{}'::jsonb, status text NOT NULL DEFAULT 'active', cost_hint double precision, latency_hint_ms integer, capability_tags jsonb NOT NULL DEFAULT '[]'::jsonb, legacy_alias_node_id text, created_at timestamptz NOT NULL DEFAULT now(), updated_at timestamptz NOT NULL DEFAULT now() ); CREATE TABLE "${schema}".network_shadow_samples ( id text PRIMARY KEY, network_id text NOT NULL, active_variant_id text NOT NULL, shadow_variant_id text NOT NULL, problem_class_path text NOT NULL, active_score double precision NOT NULL, shadow_score double precision NOT NULL, critical_signal boolean NOT NULL DEFAULT false, active_cost_ms integer, shadow_cost_ms integer, budget_skipped boolean NOT NULL DEFAULT false, conversation_id text, message_id text, created_at timestamptz NOT NULL DEFAULT now() ); CREATE INDEX "${schema}_nss_net" ON "${schema}".network_shadow_samples(network_id); CREATE INDEX "${schema}_nss_created" ON "${schema}".network_shadow_samples(created_at); CREATE TABLE "${schema}".network_evolution_events ( id text PRIMARY KEY, network_id text NOT NULL, kind text NOT NULL, variant_id text, payload jsonb NOT NULL DEFAULT '{}'::jsonb, related_event_id text, promotion_id text, created_at timestamptz NOT NULL DEFAULT now() ); CREATE INDEX "${schema}_nee_net" ON "${schema}".network_evolution_events(network_id); CREATE INDEX "${schema}_nee_kind" ON "${schema}".network_evolution_events(kind); CREATE INDEX "${schema}_nee_created" ON "${schema}".network_evolution_events(created_at); CREATE TABLE "${schema}".network_promotions ( id text PRIMARY KEY, network_id text NOT NULL, from_variant_id text, to_variant_id text NOT NULL, reason text NOT NULL, metrics_snapshot jsonb NOT NULL DEFAULT '{}'::jsonb, decided_by text NOT NULL DEFAULT 'system', created_at timestamptz NOT NULL DEFAULT now() ); `); const shadow = await import("../evolution/shadow"); const events = await import("../evolution/events"); const flywheel = await import("../../routes/evolutionFlywheel"); t.after(async () => { try { await pool.query(`DROP SCHEMA IF EXISTS "${schema}" CASCADE`); } finally { process.env.DATABASE_URL = ORIGINAL_DSN; } }); const NETWORK_ID = "tnet_test_shadow_1"; const ACTIVE_VARIANT = "ver_active_1"; const SHADOW_VARIANT = "ver_shadow_1"; const PROBLEM_CLASS = "research/test"; await t.test("seed: create one tool network row", async () => { await pool.query( `INSERT INTO "${schema}".tool_networks (id, name, problem_class_path, input_contract, output_contract, internal_graph) VALUES ($1, $2, $3, $4::jsonb, $4::jsonb, $4::jsonb)`, [NETWORK_ID, "test_shadow_network", PROBLEM_CLASS, "{}"], ); }); await t.test( "seed N>=10 cost samples → switches from fallback to dynamic mode", async () => { // First call has zero history → fallback. Each subsequent call // grows the history; once we cross MIN_SAMPLES_FOR_DYNAMIC=10, // computeShadowBudget should switch to dynamic mode. // // We deliberately mix a single 3000ms outlier into nine 500ms // samples so that: // - the fallback path emits at least two threshold events // (initial 750ms cap, then the 4500ms blip from sample #10), // - when the 11th sample crosses into dynamic mode the new // ceiling (≈750ms — P75 of the mixed history) drifts >10% // from the most recent fallback event and a fresh // `shadow_budget_threshold` event is emitted with // mode="dynamic". This is exactly the regression the suite // guards against. for (let i = 0; i < 9; i += 1) { await shadow.recordShadowSample({ networkId: NETWORK_ID, activeVariantId: ACTIVE_VARIANT, shadowVariantId: SHADOW_VARIANT, problemClassPath: PROBLEM_CLASS, activeScore: 0.7, shadowScore: 0.7, activeCostMs: 500, // Stay under every plausible cap so we don't accidentally // trip a budget skip during seeding. shadowCostMs: 300, }); } // The lone outlier — flips the fallback threshold to 4500. await shadow.recordShadowSample({ networkId: NETWORK_ID, activeVariantId: ACTIVE_VARIANT, shadowVariantId: SHADOW_VARIANT, problemClassPath: PROBLEM_CLASS, activeScore: 0.7, shadowScore: 0.7, activeCostMs: 3000, shadowCostMs: 300, }); const rows = await pool.query<{ count: string }>( `SELECT COUNT(*)::text AS count FROM "${schema}".network_shadow_samples WHERE network_id = $1`, [NETWORK_ID], ); assert.equal(rows.rows[0]!.count, "10"); // 10 historical cost samples are now in place ⇒ next budget // computation should be in dynamic mode with p75=500. const budget = await shadow.computeShadowBudget(NETWORK_ID, 500); assert.equal(budget.mode, "dynamic"); assert.equal(budget.p75Ms, 500); assert.equal(budget.thresholdMs, 500 * 1.5); assert.equal(budget.sampleCount, 10); }, ); await t.test( "shadow_budget_threshold event fires when dynamic threshold drifts >10%", async () => { // Seed events: at minimum a fallback initial 750 and the 4500 // emitted by sample #10. Everything that follows in this test // is the assertion that the dynamic-mode event is added on top. const fallbackOnly = await events.listEvents({ networkId: NETWORK_ID, kind: "shadow_budget_threshold", limit: 50, }); assert.ok( fallbackOnly.length >= 1, "fallback-mode threshold events must already exist from seed", ); assert.ok( fallbackOnly.every( (ev) => (ev.payload as Record).mode === "fallback", ), "no dynamic-mode events should exist yet", ); // The eleventh sample is the first one written under dynamic // mode — its 750ms ceiling drifts ~83% off the last fallback // value of 4500ms, so the threshold-event guard must fire. await shadow.recordShadowSample({ networkId: NETWORK_ID, activeVariantId: ACTIVE_VARIANT, shadowVariantId: SHADOW_VARIANT, problemClassPath: PROBLEM_CLASS, activeScore: 0.72, shadowScore: 0.7, activeCostMs: 500, shadowCostMs: 300, }); const afterDynamic = await events.listEvents({ networkId: NETWORK_ID, kind: "shadow_budget_threshold", limit: 50, }); const dynamicEvents = afterDynamic.filter( (ev) => (ev.payload as Record).mode === "dynamic", ); assert.ok( dynamicEvents.length >= 1, `crossing into dynamic mode with >10% drift must emit at least one dynamic event (got ${dynamicEvents.length})`, ); const firstDynamic = dynamicEvents[0]!.payload as Record; assert.equal(firstDynamic.mode, "dynamic"); assert.equal(typeof firstDynamic.thresholdMs, "number"); assert.equal(typeof firstDynamic.p75Ms, "number"); assert.equal(typeof firstDynamic.safetyFactor, "number"); assert.equal(typeof firstDynamic.previousThresholdMs, "number"); // Push P75 up dramatically. Adding ≥30 high-cost samples // crowds the recent-50 window so P75 — and therefore the // dynamic threshold — drifts well past the 10% epsilon. A new // dynamic-mode event must follow. const before = dynamicEvents.length; for (let i = 0; i < 30; i += 1) { await shadow.recordShadowSample({ networkId: NETWORK_ID, activeVariantId: ACTIVE_VARIANT, shadowVariantId: SHADOW_VARIANT, problemClassPath: PROBLEM_CLASS, activeScore: 0.72, shadowScore: 0.7, activeCostMs: 4000, shadowCostMs: 1200, }); } const afterDrift = await events.listEvents({ networkId: NETWORK_ID, kind: "shadow_budget_threshold", limit: 50, }); const driftDynamic = afterDrift.filter( (ev) => (ev.payload as Record).mode === "dynamic", ); assert.ok( driftDynamic.length > before, `>10% drift in P75 must emit additional dynamic events (before=${before} after=${driftDynamic.length})`, ); const latest = driftDynamic[0]!.payload as Record; assert.ok( (latest.thresholdMs as number) > (firstDynamic.thresholdMs as number), "the post-drift threshold should reflect the higher P75", ); }, ); await t.test( "shadow_budget_skipped row + event payload include thresholdMs/mode/p75Ms", async () => { // Push a shadow run that comfortably exceeds the current // dynamic ceiling. The active-cost spikes from the previous // sub-test put P75 around 4000ms ⇒ threshold ~6000ms. A 99000ms // shadow run is unmistakable. const result = await shadow.recordShadowSample({ networkId: NETWORK_ID, activeVariantId: ACTIVE_VARIANT, shadowVariantId: SHADOW_VARIANT, problemClassPath: PROBLEM_CLASS, activeScore: 0.72, shadowScore: 0.7, activeCostMs: 4000, shadowCostMs: 99000, }); assert.equal(result.budgetSkipped, true, "row must be flagged as budget_skipped"); assert.equal(result.budget.mode, "dynamic"); assert.equal(typeof result.budget.thresholdMs, "number"); // Row in the DB carries the budget_skipped=true flag. const dbRow = await pool.query<{ budget_skipped: boolean }>( `SELECT budget_skipped FROM "${schema}".network_shadow_samples WHERE id = $1`, [result.row.id], ); assert.equal(dbRow.rows[0]!.budget_skipped, true); const skipEvents = await events.listEvents({ networkId: NETWORK_ID, kind: "shadow_budget_skipped", limit: 5, }); assert.ok(skipEvents.length >= 1, "shadow_budget_skipped event must be emitted"); const payload = skipEvents[0]!.payload as Record; // Newly-added payload fields the legacy event did not carry. assert.equal(typeof payload.thresholdMs, "number"); assert.equal(payload.budgetMode, "dynamic"); assert.equal(typeof payload.p75Ms, "number"); assert.equal(typeof payload.safetyFactor, "number"); // Sanity: original cost numbers still round-trip. assert.equal(payload.activeCostMs, 4000); assert.equal(payload.shadowCostMs, 99000); // Legacy `multiplier` field retained for back-compat with old readers. assert.equal(payload.multiplier, payload.safetyFactor); }, ); await t.test( "GET /admin/evolution/networks surfaces shadowBudget per network", async () => { const handler = flywheel._handlers.listNetworks; const { statusCode, body } = await callHandler(handler, {}); assert.equal(statusCode, 200); assert.ok( body && typeof body === "object" && "networks" in (body as object), "response must have a networks array", ); const networks = (body as { networks: unknown[] }).networks; assert.ok(Array.isArray(networks) && networks.length === 1); const net = networks[0] as Record; assert.equal(net.id, NETWORK_ID); assert.equal(net.fitness, null, "fitness short-circuits when activeVariantId is null"); assert.ok( net.shadowBudget && typeof net.shadowBudget === "object", "shadowBudget field must be present on the response", ); const sb = net.shadowBudget as Record; assert.equal(sb.mode, "dynamic"); assert.equal(typeof sb.thresholdMs, "number"); assert.equal(typeof sb.p75Ms, "number"); assert.equal(typeof sb.safetyFactor, "number"); assert.equal(typeof sb.recentSkipRatio, "number"); assert.equal(typeof sb.sampleCount, "number"); assert.equal(typeof sb.recentSampleCount, "number"); }, ); });