doatlas-2 / artifacts /api-server /src /lib /__tests__ /shadow-budget.integration.test.ts
Iostream-Li's picture
Add files using upload-large-folder tool
ff78003 verified
/**
* Dynamic shadow-budget — DB-path integration tests.
*
* The pure-logic counterpart in `shadow-budget.unit.test.mjs` only
* exercises `decideShadowBudget` / `shouldEmitThresholdEvent`. This
* suite drives `recordShadowSample` + `computeShadowBudget` + the
* `_handlers.listNetworks` admin route against an ephemeral Postgres
* schema (mirrors the pattern in `tool-graph.test.ts`) so we catch
* regressions in:
* - the `network_shadow_samples` row write (incl. `budget_skipped`),
* - the `shadow_budget_skipped` event payload now carrying
* `thresholdMs` / `budgetMode` / `p75Ms`,
* - the `shadow_budget_threshold` event firing on first dynamic
* threshold and on >=10% drift,
* - the `/api/admin/evolution/networks` payload exposing the new
* `shadowBudget` field.
*
* Skipped when DATABASE_URL is unset.
*/
import { test } from "node:test";
import assert from "node:assert/strict";
import { randomBytes } from "node:crypto";
import type { RequestHandler } from "express";
const ORIGINAL_DSN = process.env.DATABASE_URL || "";
const SKIP = !ORIGINAL_DSN;
function dsnWithSearchPath(dsn: string, schema: string): string {
const opt = `options=-c%20search_path%3D${encodeURIComponent(schema)}`;
return dsn.includes("?") ? `${dsn}&${opt}` : `${dsn}?${opt}`;
}
interface CapturedResponse {
statusCode: number;
body: unknown;
}
async function callHandler(
handler: RequestHandler,
req: Record<string, unknown>,
): Promise<CapturedResponse> {
let statusCode = 200;
let body: unknown = null;
const res = {
status(code: number) {
statusCode = code;
return this;
},
json(payload: unknown) {
body = payload;
return this;
},
};
await handler(req as never, res as never, (() => {}) as never);
return { statusCode, body };
}
test("shadow budget DB integration", {
skip: SKIP && "DATABASE_URL not set",
}, async (t) => {
const schema = `shadow_budget_test_${randomBytes(6).toString("hex")}`;
process.env.DATABASE_URL = dsnWithSearchPath(ORIGINAL_DSN, schema);
const { pool } = await import("@workspace/db");
await pool.query(`CREATE SCHEMA "${schema}"`);
// Mirror the three tables touched by recordShadowSample +
// listNetworks. We deliberately skip network_versions /
// network_version_metrics — the listNetworks handler only invokes
// rollingFitness when activeVariantId is non-null, so the test
// network leaves it null and the fitness path is short-circuited.
await pool.query(`
CREATE TABLE "${schema}".tool_networks (
id text PRIMARY KEY,
name text NOT NULL UNIQUE,
problem_class_path text NOT NULL,
description text NOT NULL DEFAULT '',
input_contract jsonb NOT NULL,
output_contract jsonb NOT NULL,
internal_graph jsonb NOT NULL,
active_variant_id text,
builder_model_tier text NOT NULL DEFAULT 'strong',
release_tier_floor text NOT NULL DEFAULT 'strong',
config jsonb NOT NULL DEFAULT '{}'::jsonb,
status text NOT NULL DEFAULT 'active',
cost_hint double precision,
latency_hint_ms integer,
capability_tags jsonb NOT NULL DEFAULT '[]'::jsonb,
legacy_alias_node_id text,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
CREATE TABLE "${schema}".network_shadow_samples (
id text PRIMARY KEY,
network_id text NOT NULL,
active_variant_id text NOT NULL,
shadow_variant_id text NOT NULL,
problem_class_path text NOT NULL,
active_score double precision NOT NULL,
shadow_score double precision NOT NULL,
critical_signal boolean NOT NULL DEFAULT false,
active_cost_ms integer,
shadow_cost_ms integer,
budget_skipped boolean NOT NULL DEFAULT false,
conversation_id text,
message_id text,
created_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX "${schema}_nss_net" ON "${schema}".network_shadow_samples(network_id);
CREATE INDEX "${schema}_nss_created" ON "${schema}".network_shadow_samples(created_at);
CREATE TABLE "${schema}".network_evolution_events (
id text PRIMARY KEY,
network_id text NOT NULL,
kind text NOT NULL,
variant_id text,
payload jsonb NOT NULL DEFAULT '{}'::jsonb,
related_event_id text,
promotion_id text,
created_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX "${schema}_nee_net" ON "${schema}".network_evolution_events(network_id);
CREATE INDEX "${schema}_nee_kind" ON "${schema}".network_evolution_events(kind);
CREATE INDEX "${schema}_nee_created" ON "${schema}".network_evolution_events(created_at);
CREATE TABLE "${schema}".network_promotions (
id text PRIMARY KEY,
network_id text NOT NULL,
from_variant_id text,
to_variant_id text NOT NULL,
reason text NOT NULL,
metrics_snapshot jsonb NOT NULL DEFAULT '{}'::jsonb,
decided_by text NOT NULL DEFAULT 'system',
created_at timestamptz NOT NULL DEFAULT now()
);
`);
const shadow = await import("../evolution/shadow");
const events = await import("../evolution/events");
const flywheel = await import("../../routes/evolutionFlywheel");
t.after(async () => {
try {
await pool.query(`DROP SCHEMA IF EXISTS "${schema}" CASCADE`);
} finally {
process.env.DATABASE_URL = ORIGINAL_DSN;
}
});
const NETWORK_ID = "tnet_test_shadow_1";
const ACTIVE_VARIANT = "ver_active_1";
const SHADOW_VARIANT = "ver_shadow_1";
const PROBLEM_CLASS = "research/test";
await t.test("seed: create one tool network row", async () => {
await pool.query(
`INSERT INTO "${schema}".tool_networks
(id, name, problem_class_path, input_contract, output_contract, internal_graph)
VALUES ($1, $2, $3, $4::jsonb, $4::jsonb, $4::jsonb)`,
[NETWORK_ID, "test_shadow_network", PROBLEM_CLASS, "{}"],
);
});
await t.test(
"seed N>=10 cost samples → switches from fallback to dynamic mode",
async () => {
// First call has zero history → fallback. Each subsequent call
// grows the history; once we cross MIN_SAMPLES_FOR_DYNAMIC=10,
// computeShadowBudget should switch to dynamic mode.
//
// We deliberately mix a single 3000ms outlier into nine 500ms
// samples so that:
// - the fallback path emits at least two threshold events
// (initial 750ms cap, then the 4500ms blip from sample #10),
// - when the 11th sample crosses into dynamic mode the new
// ceiling (≈750ms — P75 of the mixed history) drifts >10%
// from the most recent fallback event and a fresh
// `shadow_budget_threshold` event is emitted with
// mode="dynamic". This is exactly the regression the suite
// guards against.
for (let i = 0; i < 9; i += 1) {
await shadow.recordShadowSample({
networkId: NETWORK_ID,
activeVariantId: ACTIVE_VARIANT,
shadowVariantId: SHADOW_VARIANT,
problemClassPath: PROBLEM_CLASS,
activeScore: 0.7,
shadowScore: 0.7,
activeCostMs: 500,
// Stay under every plausible cap so we don't accidentally
// trip a budget skip during seeding.
shadowCostMs: 300,
});
}
// The lone outlier — flips the fallback threshold to 4500.
await shadow.recordShadowSample({
networkId: NETWORK_ID,
activeVariantId: ACTIVE_VARIANT,
shadowVariantId: SHADOW_VARIANT,
problemClassPath: PROBLEM_CLASS,
activeScore: 0.7,
shadowScore: 0.7,
activeCostMs: 3000,
shadowCostMs: 300,
});
const rows = await pool.query<{ count: string }>(
`SELECT COUNT(*)::text AS count FROM "${schema}".network_shadow_samples WHERE network_id = $1`,
[NETWORK_ID],
);
assert.equal(rows.rows[0]!.count, "10");
// 10 historical cost samples are now in place ⇒ next budget
// computation should be in dynamic mode with p75=500.
const budget = await shadow.computeShadowBudget(NETWORK_ID, 500);
assert.equal(budget.mode, "dynamic");
assert.equal(budget.p75Ms, 500);
assert.equal(budget.thresholdMs, 500 * 1.5);
assert.equal(budget.sampleCount, 10);
},
);
await t.test(
"shadow_budget_threshold event fires when dynamic threshold drifts >10%",
async () => {
// Seed events: at minimum a fallback initial 750 and the 4500
// emitted by sample #10. Everything that follows in this test
// is the assertion that the dynamic-mode event is added on top.
const fallbackOnly = await events.listEvents({
networkId: NETWORK_ID,
kind: "shadow_budget_threshold",
limit: 50,
});
assert.ok(
fallbackOnly.length >= 1,
"fallback-mode threshold events must already exist from seed",
);
assert.ok(
fallbackOnly.every(
(ev) => (ev.payload as Record<string, unknown>).mode === "fallback",
),
"no dynamic-mode events should exist yet",
);
// The eleventh sample is the first one written under dynamic
// mode — its 750ms ceiling drifts ~83% off the last fallback
// value of 4500ms, so the threshold-event guard must fire.
await shadow.recordShadowSample({
networkId: NETWORK_ID,
activeVariantId: ACTIVE_VARIANT,
shadowVariantId: SHADOW_VARIANT,
problemClassPath: PROBLEM_CLASS,
activeScore: 0.72,
shadowScore: 0.7,
activeCostMs: 500,
shadowCostMs: 300,
});
const afterDynamic = await events.listEvents({
networkId: NETWORK_ID,
kind: "shadow_budget_threshold",
limit: 50,
});
const dynamicEvents = afterDynamic.filter(
(ev) => (ev.payload as Record<string, unknown>).mode === "dynamic",
);
assert.ok(
dynamicEvents.length >= 1,
`crossing into dynamic mode with >10% drift must emit at least one dynamic event (got ${dynamicEvents.length})`,
);
const firstDynamic = dynamicEvents[0]!.payload as Record<string, unknown>;
assert.equal(firstDynamic.mode, "dynamic");
assert.equal(typeof firstDynamic.thresholdMs, "number");
assert.equal(typeof firstDynamic.p75Ms, "number");
assert.equal(typeof firstDynamic.safetyFactor, "number");
assert.equal(typeof firstDynamic.previousThresholdMs, "number");
// Push P75 up dramatically. Adding ≥30 high-cost samples
// crowds the recent-50 window so P75 — and therefore the
// dynamic threshold — drifts well past the 10% epsilon. A new
// dynamic-mode event must follow.
const before = dynamicEvents.length;
for (let i = 0; i < 30; i += 1) {
await shadow.recordShadowSample({
networkId: NETWORK_ID,
activeVariantId: ACTIVE_VARIANT,
shadowVariantId: SHADOW_VARIANT,
problemClassPath: PROBLEM_CLASS,
activeScore: 0.72,
shadowScore: 0.7,
activeCostMs: 4000,
shadowCostMs: 1200,
});
}
const afterDrift = await events.listEvents({
networkId: NETWORK_ID,
kind: "shadow_budget_threshold",
limit: 50,
});
const driftDynamic = afterDrift.filter(
(ev) => (ev.payload as Record<string, unknown>).mode === "dynamic",
);
assert.ok(
driftDynamic.length > before,
`>10% drift in P75 must emit additional dynamic events (before=${before} after=${driftDynamic.length})`,
);
const latest = driftDynamic[0]!.payload as Record<string, unknown>;
assert.ok(
(latest.thresholdMs as number) > (firstDynamic.thresholdMs as number),
"the post-drift threshold should reflect the higher P75",
);
},
);
await t.test(
"shadow_budget_skipped row + event payload include thresholdMs/mode/p75Ms",
async () => {
// Push a shadow run that comfortably exceeds the current
// dynamic ceiling. The active-cost spikes from the previous
// sub-test put P75 around 4000ms ⇒ threshold ~6000ms. A 99000ms
// shadow run is unmistakable.
const result = await shadow.recordShadowSample({
networkId: NETWORK_ID,
activeVariantId: ACTIVE_VARIANT,
shadowVariantId: SHADOW_VARIANT,
problemClassPath: PROBLEM_CLASS,
activeScore: 0.72,
shadowScore: 0.7,
activeCostMs: 4000,
shadowCostMs: 99000,
});
assert.equal(result.budgetSkipped, true, "row must be flagged as budget_skipped");
assert.equal(result.budget.mode, "dynamic");
assert.equal(typeof result.budget.thresholdMs, "number");
// Row in the DB carries the budget_skipped=true flag.
const dbRow = await pool.query<{ budget_skipped: boolean }>(
`SELECT budget_skipped FROM "${schema}".network_shadow_samples WHERE id = $1`,
[result.row.id],
);
assert.equal(dbRow.rows[0]!.budget_skipped, true);
const skipEvents = await events.listEvents({
networkId: NETWORK_ID,
kind: "shadow_budget_skipped",
limit: 5,
});
assert.ok(skipEvents.length >= 1, "shadow_budget_skipped event must be emitted");
const payload = skipEvents[0]!.payload as Record<string, unknown>;
// Newly-added payload fields the legacy event did not carry.
assert.equal(typeof payload.thresholdMs, "number");
assert.equal(payload.budgetMode, "dynamic");
assert.equal(typeof payload.p75Ms, "number");
assert.equal(typeof payload.safetyFactor, "number");
// Sanity: original cost numbers still round-trip.
assert.equal(payload.activeCostMs, 4000);
assert.equal(payload.shadowCostMs, 99000);
// Legacy `multiplier` field retained for back-compat with old readers.
assert.equal(payload.multiplier, payload.safetyFactor);
},
);
await t.test(
"GET /admin/evolution/networks surfaces shadowBudget per network",
async () => {
const handler = flywheel._handlers.listNetworks;
const { statusCode, body } = await callHandler(handler, {});
assert.equal(statusCode, 200);
assert.ok(
body && typeof body === "object" && "networks" in (body as object),
"response must have a networks array",
);
const networks = (body as { networks: unknown[] }).networks;
assert.ok(Array.isArray(networks) && networks.length === 1);
const net = networks[0] as Record<string, unknown>;
assert.equal(net.id, NETWORK_ID);
assert.equal(net.fitness, null, "fitness short-circuits when activeVariantId is null");
assert.ok(
net.shadowBudget && typeof net.shadowBudget === "object",
"shadowBudget field must be present on the response",
);
const sb = net.shadowBudget as Record<string, unknown>;
assert.equal(sb.mode, "dynamic");
assert.equal(typeof sb.thresholdMs, "number");
assert.equal(typeof sb.p75Ms, "number");
assert.equal(typeof sb.safetyFactor, "number");
assert.equal(typeof sb.recentSkipRatio, "number");
assert.equal(typeof sb.sampleCount, "number");
assert.equal(typeof sb.recentSampleCount, "number");
},
);
});