Iostream-Li's picture
Add files using upload-large-folder tool
ff78003 verified
/**
* Wave B — rolling fitness aggregator.
*
* Reads `network_version_metrics` (written per-turn by the Reviewer in
* Wave A) and aggregates them into a single 0..1 fitness score for a
* given (network, version, problem_class) tuple. The aggregator is the
* data substrate every Wave B mechanism rides on:
*
* - Triggers compare current fitness vs a floor.
* - Shadow A/B compares two fitness windows and computes a
* non-overlapping 95% CI gate.
* - Auto-rollback compares post-promote fitness vs the snapshot
* captured at promote time.
*
* Window semantics ("rolling"): MIN(last 7 days, last N=100 rows) — the
* tighter of wall-clock and sample count wins. This is intentional: a
* very-low-traffic network must not be scored on a 6-month-old metric,
* and a very-high-traffic network must not let stale rows dominate.
*
* Channel weighting: `problem_classes.reviewer_weights` (jsonb) lets
* each problem_class decide which Reviewer channel (factuality, cost,
* latency, …) matters most. When a channel has no weight, the
* uniform-mean fallback applies. The aggregator never invents weights —
* if the per-class config and the per-row breakdown disagree on a
* channel name, the missing side is treated as zero contribution
* (exposed as `unweightedScore` / `weightedScore` so callers can audit).
*/
import { and, desc, eq, gte, sql } from "drizzle-orm";
import {
db,
networkVersionMetrics,
problemClasses,
submissionFeedbackLedger,
} from "@workspace/db";
import {
REVIEWER_CHANNEL_KEYS,
NEUTRAL_CHANNEL_SCORE,
} from "../reviewer/channels";
const DEFAULT_WINDOW_DAYS = 7;
const DEFAULT_MAX_ROWS = 100;
export interface FitnessSampleRow {
reviewerScore: number;
channelBreakdown: Record<string, number>;
costMs: number | null;
retries: number;
budgetExceeded: boolean;
createdAt: Date;
/**
* Task #227 — 来自 LEFT JOIN submission_feedback_ledger 的真值状态。
* pending / failed / null(无台账)→ weightedScore 走 weightsLabelFree;
* received → 走 weightsWithTruth 并把 channelBreakdown.external_truth
* 覆写为 ledger.external_truth_value/100(rolling 时再 clamp 到 0..1)。
*/
externalTruthStatus?: "pending" | "received" | "failed" | null;
externalTruthValue?: number | null;
}
export interface FitnessSummary {
networkId: string;
versionId: string;
problemClassPath: string;
/** Number of metric rows that fell inside the rolling window. */
sampleCount: number;
/** Mean reviewer score (0..1) — straight from the row scalar. */
mean: number;
/** Sample standard deviation. */
stddev: number;
/** Lower bound of the 95% normal-approx CI (max with 0). */
ciLower: number;
/** Upper bound of the 95% normal-approx CI (min with 1). */
ciUpper: number;
/** Mean reviewer score weighted by problem_class channel weights. */
weightedMean: number;
/** Wall-clock timestamp of the freshest row in the window. */
latestAt: Date | null;
/** Wall-clock timestamp of the oldest row in the window. */
oldestAt: Date | null;
/** True when the window is shorter than `windowDays` (sample-bounded). */
sampleBounded: boolean;
}
export interface FitnessOptions {
windowDays?: number;
maxRows?: number;
}
export interface ChannelWeights {
/**
* 默认/兼容字段:与 labelFree 等价,留作旧调用方的视图。
* 新代码应直接读 `labelFree` / `withTruth` 二选一。
*/
weights: Record<string, number>;
/**
* Task #227 — Label-free 套件:无主办方真值时使用,external_truth 权重 0。
*/
labelFree: Record<string, number>;
/**
* Task #227 — With-truth 套件:metric 行 ledger.external_truth_status='received'
* 时使用;external_truth 通道带显著权重(drugclip 默认 0.50)。
*
* 旧的扁平 reviewer_weights 配置(只一套权重)读出来时,labelFree 与
* withTruth 都填同一份,以便弹性切换不报错(只是退化为不切换)。
*/
withTruth: Record<string, number>;
/** True when the row was found in `problem_classes.reviewer_weights`. */
fromConfig: boolean;
}
/**
* Resolve the per-class channel weights, falling back to uniform when
* the problem_class row is missing or has no `reviewer_weights` jsonb.
*
* The resolver is deliberately permissive about channel names — it
* simply trusts whatever keys the Reviewer wrote into
* `channel_breakdown` and applies the configured weight by name match.
*
* Task #227:支持两种 jsonb 形态。
* 1. 新形态(推荐):`{weightsLabelFree: {...}, weightsWithTruth: {...}}`,
* 由 weightedScore 按 metric 行级状态切换。
* 2. 旧形态(兼容):`{contract_validity: 0.25, runner_signal: 0.45, ...}`,
* labelFree 与 withTruth 都被填成同一份。
*/
export async function resolveChannelWeights(
problemClassPath: string,
): Promise<ChannelWeights> {
const rows = await db
.select()
.from(problemClasses)
.where(eq(problemClasses.path, problemClassPath))
.limit(1);
const row = rows[0];
if (!row || !row.reviewerWeights) {
return { weights: {}, labelFree: {}, withTruth: {}, fromConfig: false };
}
const raw = row.reviewerWeights as Record<string, unknown>;
// 新形态优先:任一子键存在即视为 dual-shape。
const dualLabelFree = (raw as { weightsLabelFree?: unknown }).weightsLabelFree;
const dualWithTruth = (raw as { weightsWithTruth?: unknown }).weightsWithTruth;
if (
(dualLabelFree && typeof dualLabelFree === "object") ||
(dualWithTruth && typeof dualWithTruth === "object")
) {
const labelFree = sanitiseWeightMap(dualLabelFree);
// 缺失的一边退化为 labelFree,确保切换路径恒不抛错。
const withTruth =
dualWithTruth && typeof dualWithTruth === "object"
? sanitiseWeightMap(dualWithTruth)
: { ...labelFree };
return {
weights: labelFree,
labelFree,
withTruth,
fromConfig:
Object.keys(labelFree).length > 0 || Object.keys(withTruth).length > 0,
};
}
// 旧扁平形态:两套都填同一份,等价于不切换。
const flat = sanitiseWeightMap(raw);
return {
weights: flat,
labelFree: flat,
withTruth: flat,
fromConfig: Object.keys(flat).length > 0,
};
}
function sanitiseWeightMap(raw: unknown): Record<string, number> {
const out: Record<string, number> = {};
if (!raw || typeof raw !== "object") return out;
for (const [k, v] of Object.entries(raw as Record<string, unknown>)) {
const n = typeof v === "number" ? v : Number(v);
if (Number.isFinite(n) && n >= 0) out[k] = n;
}
return out;
}
/**
* Task #227 — 纯函数:把 problem_classes.reviewer_weights 的 jsonb
* (新 dual-shape 或旧扁平)折叠成 grader 可消费的 Record<string,number>。
*
* mode:
* - "labelFree" (默认):无外部真值时使用,EXTERNAL_TRUTH 通道不参与加权。
* - "withTruth":metric 行级 ledger.external_truth_status === "received" 时使用。
*
* 旧扁平形态对两种 mode 返回同一份,等价于"不切换"。
* 用于 shadow/regression 等不直接走 rollingFitness LEFT JOIN 的路径,
* 以避免 grader 拿到嵌套对象时 combineChannels 静默回落到 domain_grader。
*/
export function flattenReviewerWeights(
raw: unknown,
mode: "labelFree" | "withTruth" = "labelFree",
): Record<string, number> {
if (!raw || typeof raw !== "object") return {};
const obj = raw as Record<string, unknown>;
const dualLabelFree = obj.weightsLabelFree;
const dualWithTruth = obj.weightsWithTruth;
const isDual =
(dualLabelFree && typeof dualLabelFree === "object") ||
(dualWithTruth && typeof dualWithTruth === "object");
if (isDual) {
if (mode === "withTruth") {
const wt = sanitiseWeightMap(dualWithTruth);
if (Object.keys(wt).length > 0) return wt;
return sanitiseWeightMap(dualLabelFree);
}
return sanitiseWeightMap(dualLabelFree);
}
return sanitiseWeightMap(raw);
}
/**
* Compute the rolling fitness window for one (network, version, class).
* Empty windows return `sampleCount=0` and zeroed metrics — callers
* decide how to handle "no signal yet".
*/
export async function rollingFitness(
networkId: string,
versionId: string,
problemClassPath: string,
opts: FitnessOptions = {},
): Promise<FitnessSummary> {
const windowDays = opts.windowDays ?? DEFAULT_WINDOW_DAYS;
const maxRows = opts.maxRows ?? DEFAULT_MAX_ROWS;
const since = new Date(Date.now() - windowDays * 24 * 60 * 60 * 1000);
// Task #227 — LEFT JOIN submission_feedback_ledger so weightedScore
// 可以按 metric 行级 ledger.external_truth_status 切换权重套。
// 没有台账的旧行 / 非 competition 路径写入的 metric 行 ledger 字段为 null,
// weightedScore 自然回落到 weightsLabelFree。
const raw = await db
.select({
reviewerScore: networkVersionMetrics.reviewerScore,
channelBreakdown: networkVersionMetrics.channelBreakdown,
costMs: networkVersionMetrics.costMs,
retries: networkVersionMetrics.retries,
budgetExceeded: networkVersionMetrics.budgetExceeded,
createdAt: networkVersionMetrics.createdAt,
externalTruthStatus: submissionFeedbackLedger.externalTruthStatus,
externalTruthValue: submissionFeedbackLedger.externalTruthValue,
})
.from(networkVersionMetrics)
.leftJoin(
submissionFeedbackLedger,
eq(submissionFeedbackLedger.metricRowId, networkVersionMetrics.id),
)
.where(
and(
eq(networkVersionMetrics.networkId, networkId),
eq(networkVersionMetrics.versionId, versionId),
eq(networkVersionMetrics.problemClassPath, problemClassPath),
gte(networkVersionMetrics.createdAt, since),
),
)
.orderBy(desc(networkVersionMetrics.createdAt))
.limit(maxRows);
const rows: FitnessSampleRow[] = raw.map((r) => ({
reviewerScore: r.reviewerScore,
channelBreakdown: ((r.channelBreakdown ?? {}) as Record<string, number>),
costMs: r.costMs,
retries: r.retries,
budgetExceeded: r.budgetExceeded,
createdAt: r.createdAt,
externalTruthStatus:
(r.externalTruthStatus as FitnessSampleRow["externalTruthStatus"]) ?? null,
externalTruthValue: r.externalTruthValue ?? null,
}));
return summarise(rows, networkId, versionId, problemClassPath, {
windowDays,
maxRows,
weights: await resolveChannelWeights(problemClassPath),
});
}
interface SummariseOpts {
windowDays: number;
maxRows: number;
weights: ChannelWeights;
}
/**
* Pure aggregation kernel — separated from `rollingFitness` so the
* smoke driver can pass synthesised rows without round-tripping the DB.
*/
export function summarise(
rows: FitnessSampleRow[],
networkId: string,
versionId: string,
problemClassPath: string,
opts: SummariseOpts,
): FitnessSummary {
const n = rows.length;
if (n === 0) {
return {
networkId,
versionId,
problemClassPath,
sampleCount: 0,
mean: 0,
stddev: 0,
ciLower: 0,
ciUpper: 0,
weightedMean: 0,
latestAt: null,
oldestAt: null,
sampleBounded: false,
};
}
let sum = 0;
let weightedSum = 0;
for (const r of rows) {
sum += clamp01(r.reviewerScore);
// Task #227 — 行级弹性切换:metric 行 ledger.external_truth_status='received'
// 走 weightsWithTruth(权重套带 external_truth=0.50),否则 weightsLabelFree。
// 同时,如果有真值,覆写 channelBreakdown.external_truth 为 value/100;
// 因为 metric 写入时该通道是 NEUTRAL_CHANNEL_SCORE(driver 不知道真值),
// 直接用 NEUTRAL × 0.50 会让回灌后的分数停在 ~0.5 — 不能反映真值。
const useWithTruth = r.externalTruthStatus === "received";
const wmap = useWithTruth ? opts.weights.withTruth : opts.weights.labelFree;
let rowForScore = r;
if (
useWithTruth &&
typeof r.externalTruthValue === "number" &&
Number.isFinite(r.externalTruthValue)
) {
rowForScore = {
...r,
channelBreakdown: {
...r.channelBreakdown,
external_truth: clamp01(r.externalTruthValue / 100),
},
};
}
weightedSum += weightedScore(rowForScore, wmap);
}
const mean = sum / n;
const weightedMean = weightedSum / n;
let varSum = 0;
for (const r of rows) {
const d = clamp01(r.reviewerScore) - mean;
varSum += d * d;
}
const stddev = n > 1 ? Math.sqrt(varSum / (n - 1)) : 0;
// Normal-approx 95% CI; for small N this is conservative-ish but the
// promote gate also enforces `sampleCount >= 30` so the approximation
// is safe at the gate boundary.
const half = 1.96 * (stddev / Math.sqrt(n));
const ciLower = Math.max(0, mean - half);
const ciUpper = Math.min(1, mean + half);
const latestAt = rows[0]?.createdAt ?? null;
const oldestAt = rows[rows.length - 1]?.createdAt ?? null;
const sampleBounded = n >= opts.maxRows;
return {
networkId,
versionId,
problemClassPath,
sampleCount: n,
mean,
stddev,
ciLower,
ciUpper,
weightedMean,
latestAt,
oldestAt,
sampleBounded,
};
}
/**
* Exported so unit tests can `exercise` the CONT-006 fallback branch
* directly without running the full `summarise` pipeline. Production
* callers should still go through `summarise()`.
*/
export function weightedScore(
row: FitnessSampleRow,
weights: Record<string, number>,
): number {
const breakdown = row.channelBreakdown || {};
const channelKeys = Object.keys(breakdown);
if (channelKeys.length === 0 || Object.keys(weights).length === 0) {
// QUARANTINE-CONT-006 — weighted_score fallback to raw reviewerScore.
// Real signal: B5 will plug LLM-judge channel weights + per-class config.
// @deprecated CONT-006. Falls back to raw reviewerScore when channel weights missing. Real fix in B5.
void import("../quarantine/index.ts").then((q) =>
q.recordQuarantineHit("CONT-006", {
gate: "weighted_score_fallback",
site: "evolution/fitness.ts:weightedScore",
hasBreakdown: channelKeys.length > 0,
hasWeights: Object.keys(weights).length > 0,
}),
);
return clamp01(row.reviewerScore);
}
let weightSum = 0;
let valSum = 0;
for (const k of channelKeys) {
const w = weights[k];
if (typeof w !== "number" || w <= 0) continue;
const v = clamp01(Number(breakdown[k]) || 0);
weightSum += w;
valSum += w * v;
}
if (weightSum === 0) return clamp01(row.reviewerScore);
return valSum / weightSum;
}
function clamp01(n: number): number {
if (!Number.isFinite(n)) return 0;
if (n < 0) return 0;
if (n > 1) return 1;
return n;
}
/**
* Convenience: count the rows that fall inside the rolling window for
* cadence-trigger evaluation, without paying the full aggregation cost.
*/
export async function countWindowSamples(
networkId: string,
versionId: string,
windowDays: number = DEFAULT_WINDOW_DAYS,
): Promise<number> {
const since = new Date(Date.now() - windowDays * 24 * 60 * 60 * 1000);
const r = await db
.select({ c: sql<number>`count(*)::int` })
.from(networkVersionMetrics)
.where(
and(
eq(networkVersionMetrics.networkId, networkId),
eq(networkVersionMetrics.versionId, versionId),
gte(networkVersionMetrics.createdAt, since),
),
);
return Number(r[0]?.c ?? 0);
}