/** * Wave B — rolling fitness aggregator. * * Reads `network_version_metrics` (written per-turn by the Reviewer in * Wave A) and aggregates them into a single 0..1 fitness score for a * given (network, version, problem_class) tuple. The aggregator is the * data substrate every Wave B mechanism rides on: * * - Triggers compare current fitness vs a floor. * - Shadow A/B compares two fitness windows and computes a * non-overlapping 95% CI gate. * - Auto-rollback compares post-promote fitness vs the snapshot * captured at promote time. * * Window semantics ("rolling"): MIN(last 7 days, last N=100 rows) — the * tighter of wall-clock and sample count wins. This is intentional: a * very-low-traffic network must not be scored on a 6-month-old metric, * and a very-high-traffic network must not let stale rows dominate. * * Channel weighting: `problem_classes.reviewer_weights` (jsonb) lets * each problem_class decide which Reviewer channel (factuality, cost, * latency, …) matters most. When a channel has no weight, the * uniform-mean fallback applies. The aggregator never invents weights — * if the per-class config and the per-row breakdown disagree on a * channel name, the missing side is treated as zero contribution * (exposed as `unweightedScore` / `weightedScore` so callers can audit). */ import { and, desc, eq, gte, sql } from "drizzle-orm"; import { db, networkVersionMetrics, problemClasses, submissionFeedbackLedger, } from "@workspace/db"; import { REVIEWER_CHANNEL_KEYS, NEUTRAL_CHANNEL_SCORE, } from "../reviewer/channels"; const DEFAULT_WINDOW_DAYS = 7; const DEFAULT_MAX_ROWS = 100; export interface FitnessSampleRow { reviewerScore: number; channelBreakdown: Record; costMs: number | null; retries: number; budgetExceeded: boolean; createdAt: Date; /** * Task #227 — 来自 LEFT JOIN submission_feedback_ledger 的真值状态。 * pending / failed / null(无台账)→ weightedScore 走 weightsLabelFree; * received → 走 weightsWithTruth 并把 channelBreakdown.external_truth * 覆写为 ledger.external_truth_value/100(rolling 时再 clamp 到 0..1)。 */ externalTruthStatus?: "pending" | "received" | "failed" | null; externalTruthValue?: number | null; } export interface FitnessSummary { networkId: string; versionId: string; problemClassPath: string; /** Number of metric rows that fell inside the rolling window. */ sampleCount: number; /** Mean reviewer score (0..1) — straight from the row scalar. */ mean: number; /** Sample standard deviation. */ stddev: number; /** Lower bound of the 95% normal-approx CI (max with 0). */ ciLower: number; /** Upper bound of the 95% normal-approx CI (min with 1). */ ciUpper: number; /** Mean reviewer score weighted by problem_class channel weights. */ weightedMean: number; /** Wall-clock timestamp of the freshest row in the window. */ latestAt: Date | null; /** Wall-clock timestamp of the oldest row in the window. */ oldestAt: Date | null; /** True when the window is shorter than `windowDays` (sample-bounded). */ sampleBounded: boolean; } export interface FitnessOptions { windowDays?: number; maxRows?: number; } export interface ChannelWeights { /** * 默认/兼容字段:与 labelFree 等价,留作旧调用方的视图。 * 新代码应直接读 `labelFree` / `withTruth` 二选一。 */ weights: Record; /** * Task #227 — Label-free 套件:无主办方真值时使用,external_truth 权重 0。 */ labelFree: Record; /** * Task #227 — With-truth 套件:metric 行 ledger.external_truth_status='received' * 时使用;external_truth 通道带显著权重(drugclip 默认 0.50)。 * * 旧的扁平 reviewer_weights 配置(只一套权重)读出来时,labelFree 与 * withTruth 都填同一份,以便弹性切换不报错(只是退化为不切换)。 */ withTruth: Record; /** True when the row was found in `problem_classes.reviewer_weights`. */ fromConfig: boolean; } /** * Resolve the per-class channel weights, falling back to uniform when * the problem_class row is missing or has no `reviewer_weights` jsonb. * * The resolver is deliberately permissive about channel names — it * simply trusts whatever keys the Reviewer wrote into * `channel_breakdown` and applies the configured weight by name match. * * Task #227:支持两种 jsonb 形态。 * 1. 新形态(推荐):`{weightsLabelFree: {...}, weightsWithTruth: {...}}`, * 由 weightedScore 按 metric 行级状态切换。 * 2. 旧形态(兼容):`{contract_validity: 0.25, runner_signal: 0.45, ...}`, * labelFree 与 withTruth 都被填成同一份。 */ export async function resolveChannelWeights( problemClassPath: string, ): Promise { const rows = await db .select() .from(problemClasses) .where(eq(problemClasses.path, problemClassPath)) .limit(1); const row = rows[0]; if (!row || !row.reviewerWeights) { return { weights: {}, labelFree: {}, withTruth: {}, fromConfig: false }; } const raw = row.reviewerWeights as Record; // 新形态优先:任一子键存在即视为 dual-shape。 const dualLabelFree = (raw as { weightsLabelFree?: unknown }).weightsLabelFree; const dualWithTruth = (raw as { weightsWithTruth?: unknown }).weightsWithTruth; if ( (dualLabelFree && typeof dualLabelFree === "object") || (dualWithTruth && typeof dualWithTruth === "object") ) { const labelFree = sanitiseWeightMap(dualLabelFree); // 缺失的一边退化为 labelFree,确保切换路径恒不抛错。 const withTruth = dualWithTruth && typeof dualWithTruth === "object" ? sanitiseWeightMap(dualWithTruth) : { ...labelFree }; return { weights: labelFree, labelFree, withTruth, fromConfig: Object.keys(labelFree).length > 0 || Object.keys(withTruth).length > 0, }; } // 旧扁平形态:两套都填同一份,等价于不切换。 const flat = sanitiseWeightMap(raw); return { weights: flat, labelFree: flat, withTruth: flat, fromConfig: Object.keys(flat).length > 0, }; } function sanitiseWeightMap(raw: unknown): Record { const out: Record = {}; if (!raw || typeof raw !== "object") return out; for (const [k, v] of Object.entries(raw as Record)) { const n = typeof v === "number" ? v : Number(v); if (Number.isFinite(n) && n >= 0) out[k] = n; } return out; } /** * Task #227 — 纯函数:把 problem_classes.reviewer_weights 的 jsonb * (新 dual-shape 或旧扁平)折叠成 grader 可消费的 Record。 * * mode: * - "labelFree" (默认):无外部真值时使用,EXTERNAL_TRUTH 通道不参与加权。 * - "withTruth":metric 行级 ledger.external_truth_status === "received" 时使用。 * * 旧扁平形态对两种 mode 返回同一份,等价于"不切换"。 * 用于 shadow/regression 等不直接走 rollingFitness LEFT JOIN 的路径, * 以避免 grader 拿到嵌套对象时 combineChannels 静默回落到 domain_grader。 */ export function flattenReviewerWeights( raw: unknown, mode: "labelFree" | "withTruth" = "labelFree", ): Record { if (!raw || typeof raw !== "object") return {}; const obj = raw as Record; const dualLabelFree = obj.weightsLabelFree; const dualWithTruth = obj.weightsWithTruth; const isDual = (dualLabelFree && typeof dualLabelFree === "object") || (dualWithTruth && typeof dualWithTruth === "object"); if (isDual) { if (mode === "withTruth") { const wt = sanitiseWeightMap(dualWithTruth); if (Object.keys(wt).length > 0) return wt; return sanitiseWeightMap(dualLabelFree); } return sanitiseWeightMap(dualLabelFree); } return sanitiseWeightMap(raw); } /** * Compute the rolling fitness window for one (network, version, class). * Empty windows return `sampleCount=0` and zeroed metrics — callers * decide how to handle "no signal yet". */ export async function rollingFitness( networkId: string, versionId: string, problemClassPath: string, opts: FitnessOptions = {}, ): Promise { const windowDays = opts.windowDays ?? DEFAULT_WINDOW_DAYS; const maxRows = opts.maxRows ?? DEFAULT_MAX_ROWS; const since = new Date(Date.now() - windowDays * 24 * 60 * 60 * 1000); // Task #227 — LEFT JOIN submission_feedback_ledger so weightedScore // 可以按 metric 行级 ledger.external_truth_status 切换权重套。 // 没有台账的旧行 / 非 competition 路径写入的 metric 行 ledger 字段为 null, // weightedScore 自然回落到 weightsLabelFree。 const raw = await db .select({ reviewerScore: networkVersionMetrics.reviewerScore, channelBreakdown: networkVersionMetrics.channelBreakdown, costMs: networkVersionMetrics.costMs, retries: networkVersionMetrics.retries, budgetExceeded: networkVersionMetrics.budgetExceeded, createdAt: networkVersionMetrics.createdAt, externalTruthStatus: submissionFeedbackLedger.externalTruthStatus, externalTruthValue: submissionFeedbackLedger.externalTruthValue, }) .from(networkVersionMetrics) .leftJoin( submissionFeedbackLedger, eq(submissionFeedbackLedger.metricRowId, networkVersionMetrics.id), ) .where( and( eq(networkVersionMetrics.networkId, networkId), eq(networkVersionMetrics.versionId, versionId), eq(networkVersionMetrics.problemClassPath, problemClassPath), gte(networkVersionMetrics.createdAt, since), ), ) .orderBy(desc(networkVersionMetrics.createdAt)) .limit(maxRows); const rows: FitnessSampleRow[] = raw.map((r) => ({ reviewerScore: r.reviewerScore, channelBreakdown: ((r.channelBreakdown ?? {}) as Record), costMs: r.costMs, retries: r.retries, budgetExceeded: r.budgetExceeded, createdAt: r.createdAt, externalTruthStatus: (r.externalTruthStatus as FitnessSampleRow["externalTruthStatus"]) ?? null, externalTruthValue: r.externalTruthValue ?? null, })); return summarise(rows, networkId, versionId, problemClassPath, { windowDays, maxRows, weights: await resolveChannelWeights(problemClassPath), }); } interface SummariseOpts { windowDays: number; maxRows: number; weights: ChannelWeights; } /** * Pure aggregation kernel — separated from `rollingFitness` so the * smoke driver can pass synthesised rows without round-tripping the DB. */ export function summarise( rows: FitnessSampleRow[], networkId: string, versionId: string, problemClassPath: string, opts: SummariseOpts, ): FitnessSummary { const n = rows.length; if (n === 0) { return { networkId, versionId, problemClassPath, sampleCount: 0, mean: 0, stddev: 0, ciLower: 0, ciUpper: 0, weightedMean: 0, latestAt: null, oldestAt: null, sampleBounded: false, }; } let sum = 0; let weightedSum = 0; for (const r of rows) { sum += clamp01(r.reviewerScore); // Task #227 — 行级弹性切换:metric 行 ledger.external_truth_status='received' // 走 weightsWithTruth(权重套带 external_truth=0.50),否则 weightsLabelFree。 // 同时,如果有真值,覆写 channelBreakdown.external_truth 为 value/100; // 因为 metric 写入时该通道是 NEUTRAL_CHANNEL_SCORE(driver 不知道真值), // 直接用 NEUTRAL × 0.50 会让回灌后的分数停在 ~0.5 — 不能反映真值。 const useWithTruth = r.externalTruthStatus === "received"; const wmap = useWithTruth ? opts.weights.withTruth : opts.weights.labelFree; let rowForScore = r; if ( useWithTruth && typeof r.externalTruthValue === "number" && Number.isFinite(r.externalTruthValue) ) { rowForScore = { ...r, channelBreakdown: { ...r.channelBreakdown, external_truth: clamp01(r.externalTruthValue / 100), }, }; } weightedSum += weightedScore(rowForScore, wmap); } const mean = sum / n; const weightedMean = weightedSum / n; let varSum = 0; for (const r of rows) { const d = clamp01(r.reviewerScore) - mean; varSum += d * d; } const stddev = n > 1 ? Math.sqrt(varSum / (n - 1)) : 0; // Normal-approx 95% CI; for small N this is conservative-ish but the // promote gate also enforces `sampleCount >= 30` so the approximation // is safe at the gate boundary. const half = 1.96 * (stddev / Math.sqrt(n)); const ciLower = Math.max(0, mean - half); const ciUpper = Math.min(1, mean + half); const latestAt = rows[0]?.createdAt ?? null; const oldestAt = rows[rows.length - 1]?.createdAt ?? null; const sampleBounded = n >= opts.maxRows; return { networkId, versionId, problemClassPath, sampleCount: n, mean, stddev, ciLower, ciUpper, weightedMean, latestAt, oldestAt, sampleBounded, }; } /** * Exported so unit tests can `exercise` the CONT-006 fallback branch * directly without running the full `summarise` pipeline. Production * callers should still go through `summarise()`. */ export function weightedScore( row: FitnessSampleRow, weights: Record, ): number { const breakdown = row.channelBreakdown || {}; const channelKeys = Object.keys(breakdown); if (channelKeys.length === 0 || Object.keys(weights).length === 0) { // QUARANTINE-CONT-006 — weighted_score fallback to raw reviewerScore. // Real signal: B5 will plug LLM-judge channel weights + per-class config. // @deprecated CONT-006. Falls back to raw reviewerScore when channel weights missing. Real fix in B5. void import("../quarantine/index.ts").then((q) => q.recordQuarantineHit("CONT-006", { gate: "weighted_score_fallback", site: "evolution/fitness.ts:weightedScore", hasBreakdown: channelKeys.length > 0, hasWeights: Object.keys(weights).length > 0, }), ); return clamp01(row.reviewerScore); } let weightSum = 0; let valSum = 0; for (const k of channelKeys) { const w = weights[k]; if (typeof w !== "number" || w <= 0) continue; const v = clamp01(Number(breakdown[k]) || 0); weightSum += w; valSum += w * v; } if (weightSum === 0) return clamp01(row.reviewerScore); return valSum / weightSum; } function clamp01(n: number): number { if (!Number.isFinite(n)) return 0; if (n < 0) return 0; if (n > 1) return 1; return n; } /** * Convenience: count the rows that fall inside the rolling window for * cadence-trigger evaluation, without paying the full aggregation cost. */ export async function countWindowSamples( networkId: string, versionId: string, windowDays: number = DEFAULT_WINDOW_DAYS, ): Promise { const since = new Date(Date.now() - windowDays * 24 * 60 * 60 * 1000); const r = await db .select({ c: sql`count(*)::int` }) .from(networkVersionMetrics) .where( and( eq(networkVersionMetrics.networkId, networkId), eq(networkVersionMetrics.versionId, versionId), gte(networkVersionMetrics.createdAt, since), ), ); return Number(r[0]?.c ?? 0); }