doatlas-2 / artifacts /api-server /src /lib /__tests__ /shadow-sampling.test.ts
Iostream-Li's picture
Add files using upload-large-folder tool
ff78003 verified
/**
* Shadow A/B sampling on real `runNetwork` traffic — Task #182.
*
* Verifies the chat hot-path hook end-to-end against a temporary PG
* schema:
* - Happy path: a forced-rate (1.0) call writes one paired sample
* row per active turn, with the correct active/shadow scores.
* - No shadow candidate: zero sample rows, no error, active result
* still returned.
* - Runner failure on the shadow side: a `shadow_runner_error` event
* is recorded and the active turn's result is unaffected.
* - Budget breach: a slow shadow runner is aborted at 1.5× the
* active wall-clock cap and the row is marked `budget_skipped`.
*
* Skipped when `DATABASE_URL` is unset.
*/
import { test } from "node:test";
import assert from "node:assert/strict";
import { randomBytes } from "node:crypto";
const ORIGINAL_DSN = process.env.DATABASE_URL || "";
const SKIP = !ORIGINAL_DSN;
function dsnWithSearchPath(dsn: string, schema: string): string {
const opt = `options=-c%20search_path%3D${encodeURIComponent(schema)}`;
return dsn.includes("?") ? `${dsn}&${opt}` : `${dsn}?${opt}`;
}
test(
"shadow A/B sampling against live PG ephemeral schema",
{ skip: SKIP && "DATABASE_URL not set" },
async (t) => {
const schema = `shadow_sampling_test_${randomBytes(6).toString("hex")}`;
process.env.DATABASE_URL = dsnWithSearchPath(ORIGINAL_DSN, schema);
const { pool } = await import("@workspace/db");
await pool.query(`CREATE SCHEMA "${schema}"`);
// Mirror only the tool-network tables we touch. The schemas are
// copied verbatim from lib/db/src/schema/toolNetwork.ts.
await pool.query(`
CREATE TABLE "${schema}".problem_classes (
id text PRIMARY KEY,
path text NOT NULL UNIQUE,
parent_path text,
label text NOT NULL,
description text NOT NULL DEFAULT '',
capability_tags jsonb NOT NULL DEFAULT '[]'::jsonb,
reviewer_weights jsonb,
status text NOT NULL DEFAULT 'active',
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
CREATE TABLE "${schema}".network_promotions (
id text PRIMARY KEY,
network_id text NOT NULL,
from_variant_id text,
to_variant_id text NOT NULL,
reason text NOT NULL,
metrics_snapshot jsonb NOT NULL DEFAULT '{}'::jsonb,
decided_by text NOT NULL DEFAULT 'system',
created_at timestamptz NOT NULL DEFAULT now()
);
CREATE TABLE "${schema}".tool_networks (
id text PRIMARY KEY,
name text NOT NULL UNIQUE,
problem_class_path text NOT NULL,
description text NOT NULL DEFAULT '',
input_contract jsonb NOT NULL,
output_contract jsonb NOT NULL,
internal_graph jsonb NOT NULL,
active_variant_id text,
builder_model_tier text NOT NULL DEFAULT 'strong',
release_tier_floor text NOT NULL DEFAULT 'strong',
config jsonb NOT NULL DEFAULT '{}'::jsonb,
status text NOT NULL DEFAULT 'active',
cost_hint double precision,
latency_hint_ms integer,
capability_tags jsonb NOT NULL DEFAULT '[]'::jsonb,
legacy_alias_node_id text,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
CREATE TABLE "${schema}".network_versions (
id text PRIMARY KEY,
network_id text NOT NULL,
version_label text NOT NULL,
internal_graph jsonb NOT NULL,
config jsonb NOT NULL DEFAULT '{}'::jsonb,
status text NOT NULL DEFAULT 'draft',
built_by text NOT NULL DEFAULT 'system',
builder_model_tier text NOT NULL DEFAULT 'strong',
private_namespace text,
created_at timestamptz NOT NULL DEFAULT now()
);
CREATE TABLE "${schema}".network_shadow_samples (
id text PRIMARY KEY,
network_id text NOT NULL,
active_variant_id text NOT NULL,
shadow_variant_id text NOT NULL,
problem_class_path text NOT NULL,
active_score double precision NOT NULL,
shadow_score double precision NOT NULL,
critical_signal boolean NOT NULL DEFAULT false,
active_cost_ms integer,
shadow_cost_ms integer,
budget_skipped boolean NOT NULL DEFAULT false,
conversation_id text,
message_id text,
created_at timestamptz NOT NULL DEFAULT now()
);
CREATE TABLE "${schema}".network_evolution_events (
id text PRIMARY KEY,
network_id text NOT NULL,
kind text NOT NULL,
variant_id text,
payload jsonb NOT NULL DEFAULT '{}'::jsonb,
related_event_id text,
promotion_id text,
created_at timestamptz NOT NULL DEFAULT now()
);
`);
const tn = await import("../tool-network");
const sh = await import("../evolution/shadow");
t.after(async () => {
try {
await pool.query(`DROP SCHEMA IF EXISTS "${schema}" CASCADE`);
} finally {
process.env.DATABASE_URL = ORIGINAL_DSN;
}
});
/**
* Wait for fire-and-forget shadow tasks. `runNetwork` returns the
* active result immediately and dispatches the shadow run via a
* floating promise; we poll the sample / event tables for up to
* `timeoutMs` so the assertions are deterministic.
*/
async function waitFor<T>(
probe: () => Promise<T | null>,
timeoutMs = 4000,
): Promise<T | null> {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
const v = await probe();
if (v) return v;
await new Promise((r) => setTimeout(r, 50));
}
return null;
}
// ----------------------------------------------------------- setup
const networkName = `shadow_t_${randomBytes(3).toString("hex")}`;
await tn.upsertProblemClass({
path: "shadow_sampling/test",
label: "shadow sampling test class",
});
const network = await tn.upsertNetwork({
name: networkName,
problemClassPath: "shadow_sampling/test",
inputContract: { type: "object", properties: {} },
outputContract: { type: "object", properties: {} },
internalGraph: { nodes: [], edges: [] },
initialVariant: { versionLabel: "v1", config: { tag: "active" } },
});
assert.ok(network.activeVariantId, "active variant created");
const activeVariantId = network.activeVariantId!;
// Seed a shadow candidate (status='shadow') for the sampling tests.
const shadowVariant = await tn.createVariant({
networkId: network.id,
versionLabel: "v2",
config: { tag: "shadow" },
internalGraph: { nodes: [], edges: [] },
builderModelTier: "strong",
promote: false,
reason: "test_seed",
});
// Default runner: returns a per-variant reviewer score so we can
// tell active vs shadow rows apart in the assertions. Each test
// overrides this to inject errors / latency.
let runnerImpl: (
_input: Record<string, unknown>,
variant: { id: string },
ctx: { signal?: AbortSignal },
) => Promise<{
output: Record<string, unknown>;
steps: never[];
durationMs: number;
metrics: { reviewerScore: number };
}> = async (_input, variant) => ({
output: { ok: true, variantId: variant.id },
steps: [],
durationMs: 50,
metrics: { reviewerScore: variant.id === activeVariantId ? 0.6 : 0.9 },
});
tn.registerNetworkRunner(networkName, async (input, variant, ctx) =>
runnerImpl(input, variant, ctx),
);
// ------------------------------------- happy path: row is written
await t.test("forced sample rate writes one paired sample row", async () => {
await pool.query(
`DELETE FROM "${schema}".network_shadow_samples`,
);
const result = await tn.runNetwork({
networkName,
input: { q: "happy" },
enableShadow: true,
shadowSampleRate: 1,
});
assert.equal(
(result.output as { variantId: string }).variantId,
activeVariantId,
"active variant produced the user-visible result",
);
const row = await waitFor(async () => {
const r = await pool.query<{
active_variant_id: string;
shadow_variant_id: string;
active_score: string;
shadow_score: string;
budget_skipped: boolean;
}>(
`SELECT active_variant_id, shadow_variant_id, active_score::text, shadow_score::text, budget_skipped
FROM "${schema}".network_shadow_samples
WHERE network_id = $1`,
[network.id],
);
return r.rows[0] ?? null;
});
assert.ok(row, "a sample row was persisted");
assert.equal(row!.active_variant_id, activeVariantId);
assert.equal(row!.shadow_variant_id, shadowVariant.id);
assert.equal(Number(row!.active_score), 0.6);
assert.equal(Number(row!.shadow_score), 0.9);
assert.equal(row!.budget_skipped, false);
});
// ---------------------------- no shadow candidate ⇒ no row, no err
await t.test(
"missing shadow candidate produces zero rows and no error",
async () => {
await pool.query(
`DELETE FROM "${schema}".network_shadow_samples`,
);
// Hide the shadow variant temporarily.
await pool.query(
`UPDATE "${schema}".network_versions SET status = 'demoted' WHERE id = $1`,
[shadowVariant.id],
);
const result = await tn.runNetwork({
networkName,
input: { q: "no-candidate" },
enableShadow: true,
shadowSampleRate: 1,
});
assert.ok(result, "active result still returned");
// Give the fire-and-forget path time to run and definitively
// not write a row.
await new Promise((r) => setTimeout(r, 200));
const cnt = await pool.query<{ c: string }>(
`SELECT COUNT(*)::text AS c FROM "${schema}".network_shadow_samples WHERE network_id = $1`,
[network.id],
);
assert.equal(Number(cnt.rows[0]!.c), 0);
// Restore for subsequent subtests.
await pool.query(
`UPDATE "${schema}".network_versions SET status = 'shadow' WHERE id = $1`,
[shadowVariant.id],
);
},
);
// ------------------- runner error on shadow ⇒ event row, no sample
await t.test(
"shadow runner failure records an event and leaves the active result intact",
async () => {
await pool.query(
`DELETE FROM "${schema}".network_shadow_samples`,
);
await pool.query(
`DELETE FROM "${schema}".network_evolution_events`,
);
runnerImpl = async (_input, variant) => {
if (variant.id === shadowVariant.id) {
throw new Error("synthetic shadow failure");
}
return {
output: { ok: true, variantId: variant.id },
steps: [],
durationMs: 40,
metrics: { reviewerScore: 0.5 },
};
};
const result = await tn.runNetwork({
networkName,
input: { q: "boom" },
enableShadow: true,
shadowSampleRate: 1,
});
assert.equal(
(result.output as { variantId: string }).variantId,
activeVariantId,
"active path is unaffected by shadow failure",
);
const ev = await waitFor(async () => {
const r = await pool.query<{ kind: string }>(
`SELECT kind FROM "${schema}".network_evolution_events
WHERE network_id = $1 AND kind = 'shadow_runner_error'`,
[network.id],
);
return r.rows[0] ?? null;
});
assert.ok(ev, "shadow_runner_error event was recorded");
const sampleCnt = await pool.query<{ c: string }>(
`SELECT COUNT(*)::text AS c FROM "${schema}".network_shadow_samples WHERE network_id = $1`,
[network.id],
);
assert.equal(
Number(sampleCnt.rows[0]!.c),
0,
"no shadow sample row was written when the runner threw",
);
},
);
// -------- active run reports durationMs<=0 ⇒ noisy skip event
//
// Regression for the silent dead-end Task #198 calls out: the
// shadow path can't enforce its 1.5× budget without a positive
// active cost, so it skips the run. We must surface that skip via
// a `shadow_no_active_cost` event (one-shot per process per
// network) so admin dashboards catch the regression instead of
// staring at empty CI counts forever.
await t.test(
"active durationMs<=0 emits a one-shot shadow_no_active_cost event",
async () => {
await pool.query(
`DELETE FROM "${schema}".network_shadow_samples`,
);
await pool.query(
`DELETE FROM "${schema}".network_evolution_events`,
);
// Reset the in-memory dedup so this subtest is order-independent
// — earlier tests in the same process may have hit the path.
sh.__shadowNoActiveCostEmitted.clear();
// Active runs report durationMs=0 explicitly. `runNetwork`
// honours the runner-supplied value via `r.durationMs ?? ...`
// (0 is not nullish), so the shadow sampler will see
// `activeCostMs = 0` and take the no_active_cost branch.
runnerImpl = async (_input, variant) => ({
output: { ok: true, variantId: variant.id },
steps: [],
durationMs: variant.id === activeVariantId ? 0 : 30,
metrics: { reviewerScore: 0.5 },
});
await tn.runNetwork({
networkName,
input: { q: "no-cost-1" },
enableShadow: true,
shadowSampleRate: 1,
});
const ev = await waitFor(async () => {
const r = await pool.query<{ payload: Record<string, unknown> }>(
`SELECT payload FROM "${schema}".network_evolution_events
WHERE network_id = $1 AND kind = 'shadow_no_active_cost'`,
[network.id],
);
return r.rows[0] ?? null;
});
assert.ok(ev, "shadow_no_active_cost event was recorded");
assert.equal(
(ev!.payload as { activeVariantId: string }).activeVariantId,
activeVariantId,
"payload identifies the active variant that produced 0ms",
);
// No sample row should exist — the path bails before scoring.
const sampleCnt = await pool.query<{ c: string }>(
`SELECT COUNT(*)::text AS c
FROM "${schema}".network_shadow_samples WHERE network_id = $1`,
[network.id],
);
assert.equal(
Number(sampleCnt.rows[0]!.c),
0,
"no sample row written when active cost is missing",
);
// Drive the path again — the dedup must keep us at exactly one
// event row so dashboards aren't flooded.
await tn.runNetwork({
networkName,
input: { q: "no-cost-2" },
enableShadow: true,
shadowSampleRate: 1,
});
await tn.runNetwork({
networkName,
input: { q: "no-cost-3" },
enableShadow: true,
shadowSampleRate: 1,
});
// Give the fire-and-forget shadow tasks a beat to (not) write.
await new Promise((r) => setTimeout(r, 200));
const eventCnt = await pool.query<{ c: string }>(
`SELECT COUNT(*)::text AS c
FROM "${schema}".network_evolution_events
WHERE network_id = $1 AND kind = 'shadow_no_active_cost'`,
[network.id],
);
assert.equal(
Number(eventCnt.rows[0]!.c),
1,
"shadow_no_active_cost is emitted at most once per process per network",
);
// Restore the runner so later subtests start from a clean
// baseline and don't inherit the 0ms active duration.
runnerImpl = async (_input, variant) => ({
output: { ok: true, variantId: variant.id },
steps: [],
durationMs: 30,
metrics: { reviewerScore: 0.5 },
});
},
);
// -------- shadow over 1.5× active budget ⇒ aborted, budget_skipped
//
// Two assertions: (a) the row is marked `budget_skipped` from the
// sampler's perspective, and (b) the runner actually stops on
// `ctx.signal` instead of running to completion. (b) is what
// distinguishes a real abort from a timeout-bookkeeping shim.
await t.test(
"shadow run exceeding 1.5× active budget aborts the runner via ctx.signal",
async () => {
await pool.query(
`DELETE FROM "${schema}".network_shadow_samples`,
);
// Clear leftover events from earlier subtests so the waitFor
// below doesn't latch onto a stale `shadow_runner_error`.
await pool.query(
`DELETE FROM "${schema}".network_evolution_events`,
);
let shadowAborted = false;
let shadowFinishedAt = 0;
const shadowStartAt = { value: 0 };
runnerImpl = async (_input, variant, ctx) => {
if (variant.id === shadowVariant.id) {
shadowStartAt.value = Date.now();
// Cooperative cancellation: poll the signal in 50ms slices
// up to a 10s ceiling so a *broken* abort would visibly
// run for ~10s.
const ceiling = Date.now() + 10_000;
while (Date.now() < ceiling) {
if (ctx.signal?.aborted) {
shadowAborted = true;
shadowFinishedAt = Date.now();
throw Object.assign(new Error("aborted"), {
name: "AbortError",
});
}
await new Promise((r) => setTimeout(r, 50));
}
shadowFinishedAt = Date.now();
return {
output: { ok: true, variantId: variant.id },
steps: [],
durationMs: 10_000,
metrics: { reviewerScore: 0.95 },
};
}
return {
output: { ok: true, variantId: variant.id },
steps: [],
durationMs: 30,
metrics: { reviewerScore: 0.6 },
};
};
const result = await tn.runNetwork({
networkName,
input: { q: "slow-shadow" },
enableShadow: true,
shadowSampleRate: 1,
});
assert.ok(result, "active result returned without waiting for shadow");
// The sampler races the runner against a 2s budget timer; once
// the timer wins it persists a `budget_skipped=true` row. The
// runner then notices `ctx.signal.aborted` on its next poll
// and stops — that observation is what this test exists to
// prove.
const row = await waitFor(
async () => {
const r = await pool.query<{ budget_skipped: boolean }>(
`SELECT budget_skipped
FROM "${schema}".network_shadow_samples
WHERE network_id = $1
ORDER BY created_at DESC LIMIT 1`,
[network.id],
);
return r.rows[0] ?? null;
},
5_000,
);
assert.ok(row, "budget_skipped row was persisted after timeout");
assert.equal(row!.budget_skipped, true);
// Give the cooperative runner a beat to observe the abort that
// fired alongside the budget timer; the sample row is written
// ~immediately on timeout, while the runner only notices on
// its next 50ms poll.
const cancelDeadline = Date.now() + 1_000;
while (Date.now() < cancelDeadline && !shadowAborted) {
await new Promise((r) => setTimeout(r, 50));
}
assert.equal(
shadowAborted,
true,
"shadow runner observed ctx.signal.aborted and stopped",
);
// The runner stopped well under its own 10s ceiling — i.e.
// close to the 2s SHADOW_MIN_BUDGET_MS floor + one 50ms poll
// tick, not the full 10s it would have run if abort were
// ignored. We allow a generous upper bound (3s) to absorb
// CI scheduling noise but still prove cancellation.
const elapsed = shadowFinishedAt - shadowStartAt.value;
assert.ok(
elapsed < 3_000,
`shadow runner stopped within budget+slack (elapsed=${elapsed}ms, expected <3000ms)`,
);
// Reset the runner so any later tests start from a clean slate.
runnerImpl = async (_input, variant) => ({
output: { ok: true, variantId: variant.id },
steps: [],
durationMs: 30,
metrics: { reviewerScore: 0.5 },
});
},
);
// -- runner that does NOT honour the signal still gets row-marked
await t.test(
"non-cooperative slow shadow runner is marked budget_skipped",
async () => {
await pool.query(
`DELETE FROM "${schema}".network_shadow_samples`,
);
runnerImpl = async (_input, variant) => {
if (variant.id === shadowVariant.id) {
// Ignore the signal entirely — simulates a legacy runner
// that hasn't been updated to honour cooperative aborts.
await new Promise((r) => setTimeout(r, 2_500));
return {
output: { ok: true, variantId: variant.id },
steps: [],
durationMs: 2_500,
metrics: { reviewerScore: 0.95 },
};
}
return {
output: { ok: true, variantId: variant.id },
steps: [],
durationMs: 30,
metrics: { reviewerScore: 0.6 },
};
};
await tn.runNetwork({
networkName,
input: { q: "slow-shadow-noncoop" },
enableShadow: true,
shadowSampleRate: 1,
});
const row = await waitFor(
async () => {
const r = await pool.query<{ budget_skipped: boolean }>(
`SELECT budget_skipped
FROM "${schema}".network_shadow_samples
WHERE network_id = $1
ORDER BY created_at DESC LIMIT 1`,
[network.id],
);
return r.rows[0] ?? null;
},
5_000,
);
assert.ok(row, "a row was persisted for the legacy runner");
assert.equal(
row!.budget_skipped,
true,
"row marked budget_skipped even though runner ignored signal",
);
runnerImpl = async (_input, variant) => ({
output: { ok: true, variantId: variant.id },
steps: [],
durationMs: 30,
metrics: { reviewerScore: 0.5 },
});
},
);
// ----------------------- explicit `runNetworkForChat` wrapper test
await t.test(
"runNetworkForChat opts in to shadow sampling automatically",
async () => {
await pool.query(
`DELETE FROM "${schema}".network_shadow_samples`,
);
runnerImpl = async (_input, variant) => ({
output: { ok: true, variantId: variant.id },
steps: [],
durationMs: 30,
metrics: { reviewerScore: 0.7 },
});
await tn.runNetworkForChat({
networkName,
input: { q: "chat" },
shadowSampleRate: 1,
});
const row = await waitFor(async () => {
const r = await pool.query<{ shadow_variant_id: string }>(
`SELECT shadow_variant_id FROM "${schema}".network_shadow_samples WHERE network_id = $1`,
[network.id],
);
return r.rows[0] ?? null;
});
assert.ok(row, "runNetworkForChat triggered the shadow path");
},
);
// ------------- chat dispatcher hookup: `run_<network>` tool name
await t.test(
"dispatchNetworkTool routes run_<network> calls and triggers shadow",
async () => {
await pool.query(`DELETE FROM "${schema}".network_shadow_samples`);
runnerImpl = async (_input, variant) => ({
output: { ok: true, dispatched: true, variantId: variant.id },
steps: [],
durationMs: 30,
metrics: { reviewerScore: variant.id === activeVariantId ? 0.4 : 0.8 },
});
// Force-sample so the assertion is deterministic. Production
// callers in `routes/messages.ts` omit this argument and inherit
// the env-tunable `SHADOW_SAMPLE_RATE` (default 0.1).
const dispatch = await tn.dispatchNetworkTool(
`run_${networkName}`,
{ q: "via-chat" },
{
conversationId: "conv_chat_test",
messageId: null,
ownerUserId: "user_chat_test",
meta: { actor: "chat" },
},
1,
);
assert.equal(dispatch.matched, true, "run_<network> name was matched");
assert.ok(dispatch.result, "active result returned to caller");
const out = dispatch.result!.output as {
ok: boolean;
dispatched: boolean;
variantId: string;
};
assert.equal(out.ok, true);
assert.equal(out.dispatched, true);
assert.equal(
out.variantId,
activeVariantId,
"the user-visible result is from the ACTIVE variant",
);
// The dispatcher must have triggered the shadow A/B sampler
// (this is the whole point of routing chat traffic through it).
const row = await waitFor(async () => {
const r = await pool.query<{
conversation_id: string | null;
shadow_variant_id: string;
}>(
`SELECT conversation_id, shadow_variant_id
FROM "${schema}".network_shadow_samples
WHERE network_id = $1
ORDER BY created_at DESC LIMIT 1`,
[network.id],
);
return r.rows[0] ?? null;
});
assert.ok(row, "chat dispatch wrote a shadow sample row");
assert.equal(
row!.conversation_id,
"conv_chat_test",
"the chat ctx (conversationId) was propagated through the runner",
);
assert.equal(row!.shadow_variant_id, shadowVariant.id);
},
);
// -------- static guard: routes/messages.ts MUST gate on the
// `allowedToolNames` allowlist BEFORE invoking
// `dispatchNetworkTool`. A regression here would re-open
// the capability-bypass vulnerability.
await t.test(
"routes/messages.ts gates allowedToolNames before dispatchNetworkTool",
async () => {
const fs = await import("node:fs/promises");
const path = await import("node:path");
const url = await import("node:url");
const here = path.dirname(url.fileURLToPath(import.meta.url));
const messagesPath = path.resolve(here, "../../routes/messages.ts");
const src = await fs.readFile(messagesPath, "utf-8");
const allowIdx = src.indexOf("!allowedToolNames.has(tc.name)");
const dispatchIdx = src.indexOf("dispatchNetworkTool(tc.name");
assert.ok(allowIdx > 0, "allowedToolNames gate present");
assert.ok(dispatchIdx > 0, "dispatchNetworkTool call present");
assert.ok(
allowIdx < dispatchIdx,
"allowedToolNames gate must come BEFORE dispatchNetworkTool " +
"(otherwise out-of-subgraph run_<network> tools bypass the allowlist)",
);
},
);
// -------- allowlist gating: chat dispatcher rejects out-of-subgraph
// `run_<network>` calls with `tool_not_in_subgraph` BEFORE
// `dispatchNetworkTool` is consulted, mirroring the gate that
// `routes/messages.ts` runs ahead of the network branch.
await t.test(
"chat dispatcher gate rejects run_<network> outside allowedToolNames",
async () => {
await pool.query(`DELETE FROM "${schema}".network_shadow_samples`);
// Mirror the gate from routes/messages.ts: allowedToolNames
// is enforced FIRST, so an out-of-subgraph network name never
// reaches dispatchNetworkTool. We assert both halves: (a) the
// gate produces the expected `tool_not_in_subgraph` error
// payload, and (b) no shadow sample row is written (proving
// the network runtime was not invoked).
const allowed = new Set<string>(["web_search"]);
const toolName = `run_${networkName}`;
let result: unknown;
let dispatched = false;
if (allowed && !allowed.has(toolName)) {
result = {
error: `Tool ${toolName} is not in the resolved capability subgraph for this turn.`,
error_code: "tool_not_in_subgraph",
};
} else {
dispatched = true;
await tn.dispatchNetworkTool(toolName, { q: "blocked" }, undefined, 1);
}
assert.equal(dispatched, false, "dispatcher was NOT consulted");
assert.deepEqual(result, {
error: `Tool ${toolName} is not in the resolved capability subgraph for this turn.`,
error_code: "tool_not_in_subgraph",
});
// Definitively no shadow row was written.
await new Promise((r) => setTimeout(r, 200));
const cnt = await pool.query<{ c: string }>(
`SELECT COUNT(*)::text AS c FROM "${schema}".network_shadow_samples WHERE network_id = $1`,
[network.id],
);
assert.equal(
Number(cnt.rows[0]!.c),
0,
"no shadow sample row written for blocked network call",
);
},
);
// -------- non-network names and unknown networks fall through cleanly
await t.test(
"dispatchNetworkTool falls through for non-network and unknown names",
async () => {
const a = await tn.dispatchNetworkTool("web_search", { q: "x" });
assert.equal(a.matched, false, "non run_-prefixed name skipped");
const b = await tn.dispatchNetworkTool(
"run_does_not_exist_zzz",
{ q: "x" },
);
assert.equal(
b.matched,
false,
"run_-prefixed name with no registered network skipped",
);
},
);
// -------- runner without metrics.reviewerScore ⇒ fallback path
// fires AND a `shadow_reviewer_fallback` event is logged
// for both sides so operators can count how often the
// binary fallback is still in play.
await t.test(
"missing reviewer metric routes through fallback and emits counter event",
async () => {
await pool.query(`DELETE FROM "${schema}".network_shadow_samples`);
await pool.query(`DELETE FROM "${schema}".network_evolution_events`);
// Use the loosest possible runner shape — no `metrics`, no
// `fitness`, just an output blob. The Reviewer pipeline must
// grade this via the binary fallback (output non-empty ⇒ 1.0)
// and emit one `shadow_reviewer_fallback` event per side.
runnerImpl = (async (
_input: Record<string, unknown>,
variant: { id: string },
) => ({
output: { ok: true, variantId: variant.id },
steps: [],
durationMs: 30,
})) as unknown as typeof runnerImpl;
await tn.runNetwork({
networkName,
input: { q: "no-metrics" },
enableShadow: true,
shadowSampleRate: 1,
});
const row = await waitFor(async () => {
const r = await pool.query<{
active_score: string;
shadow_score: string;
}>(
`SELECT active_score::text, shadow_score::text
FROM "${schema}".network_shadow_samples
WHERE network_id = $1
ORDER BY created_at DESC LIMIT 1`,
[network.id],
);
return r.rows[0] ?? null;
});
assert.ok(row, "fallback-graded sample row was persisted");
// Binary fallback ⇒ 1.0 on both sides (non-empty output).
assert.equal(Number(row!.active_score), 1);
assert.equal(Number(row!.shadow_score), 1);
// Counter event must be emitted for BOTH sides (active + shadow)
// so the operator can pivot on `kind` without losing one half
// of the picture.
const events = await waitFor(async () => {
const r = await pool.query<{
payload: { side: string; source: string };
}>(
`SELECT payload
FROM "${schema}".network_evolution_events
WHERE network_id = $1 AND kind = 'shadow_reviewer_fallback'
ORDER BY created_at ASC`,
[network.id],
);
return r.rows.length === 2 ? r.rows : null;
});
assert.ok(events, "two shadow_reviewer_fallback events written");
const sides = new Set(events!.map((e) => e.payload.side));
assert.deepEqual(
[...sides].sort(),
["active", "shadow"],
"fallback counter fires once per side",
);
for (const e of events!) {
assert.equal(
e.payload.source,
"fallback_binary",
"non-empty output ⇒ binary fallback source",
);
}
// Reset the runner so any later tests start from a clean slate.
runnerImpl = async (_input, variant) => ({
output: { ok: true, variantId: variant.id },
steps: [],
durationMs: 30,
metrics: { reviewerScore: 0.5 },
});
},
);
// -------- pickShadowCandidate respects status filter + active id
await t.test("pickShadowCandidate returns only matching shadows", async () => {
const cand = await sh.pickShadowCandidate(network.id, activeVariantId);
assert.ok(cand, "candidate found");
assert.equal(cand!.id, shadowVariant.id);
// Asking for the active id excludes itself even when active is shadow-flagged.
const none = await sh.pickShadowCandidate(network.id, shadowVariant.id);
assert.equal(
none,
null,
"no other shadow candidate exists once we exclude the only one",
);
});
},
);