Iostream-Li's picture
Add files using upload-large-folder tool
ff78003 verified
/**
* Wave B step 2 — generic evolution builder entry point.
*
* Bridges the strategy registry (`./strategies`) to the live database:
* loads the network + active variant + every existing version, asks
* the configured strategy for one new candidate, then atomically
* writes a `network_versions` row in `status='shadow'` plus a
* `builder_proposed` event so the existing 5-gate consumer
* (`promote.ts`) and the admin Evolution Live timeline both see it.
*
* Hard contracts (do not break — they are the dedup / audit / safety
* surface other Wave B mechanisms rely on):
*
* 1. Builder is *single-flight per scheduler tick*. The scheduler's
* `inFlight` latch already guarantees no two ticks overlap; this
* function holds no lock of its own. **Multi-instance deploys
* MUST add a Postgres advisory lock on (network_id) before
* calling here** — single-instance is the only safe mode today.
* TODO #232 tracks the leader-election story.
* 2. Dedup is by stable-stringify of the proposed config against
* every existing version's config (regardless of status). The
* strategy itself does the comparison, so swapping strategies
* preserves the contract automatically.
* 3. Every public outcome — proposed, no_candidate, error — writes
* exactly one event to `network_evolution_events`. The 1-hour
* debounce on `builder_no_candidate` prevents the admin
* timeline from drowning when a network is genuinely exhausted
* and a trigger keeps firing every 5 min.
* 4. Errors NEVER throw. Strategy faults, DB write faults, and
* unique-label collisions all fold into `BuilderResult.error`
* so the scheduler can keep moving on to `attemptAutoPromote`
* and `tickRollbackWatch` without losing a tick.
* 5. Builder model tier is whatever the strategy declared; we just
* pass it through to `network_versions.builder_model_tier`.
* `promote.ts` Gate 5 then routes weak candidates to the
* private namespace and strong candidates to the shared active
* slot.
*/
import { and, eq, gte } from "drizzle-orm";
import {
db,
networkEvolutionEvents,
networkVersions,
toolNetworks,
type NetworkVersionRow,
} from "@workspace/db";
import { newId } from "../ids";
import { logger } from "../logger";
import { recordEvent } from "./events";
import {
DEFAULT_EVOLUTION_STRATEGY,
pickStrategy,
type CandidateProposal,
} from "./strategies";
import { stableStringify } from "./strategies/hyperparameter-grid";
const NO_CANDIDATE_DEBOUNCE_MS = 60 * 60 * 1000;
export type BuilderStatus =
| "proposed"
| "no_candidate"
| "no_candidate_debounced"
| "skipped_no_strategy"
| "skipped_no_active"
| "skipped_no_tunable_params"
| "error";
export interface BuilderResult {
networkId: string;
status: BuilderStatus;
strategyName?: string;
candidateVariantId?: string;
versionLabel?: string;
config?: Record<string, unknown>;
rationale?: string;
diff?: string[];
error?: string;
}
export interface RunEvolutionBuilderInput {
networkId: string;
/** Free-form audit string ("scheduler_tick", "manual", etc). */
reason: string;
}
/**
* One pass of the builder for a single network. Returns a
* `BuilderResult` describing what (if anything) it produced.
*
* Safe to call from any cadence — the dedup + debounce keep repeat
* calls cheap and bounded.
*/
export async function runEvolutionBuilder(
input: RunEvolutionBuilderInput,
): Promise<BuilderResult> {
try {
return await runEvolutionBuilderInner(input);
} catch (err) {
// Outer safety net (architect-flagged high). Any throw escaping
// the inner block — including bugs in nextVersionLabel /
// computeConfigDiff / unexpected runtime errors — must NEVER
// bubble to the scheduler tick. We log loud, persist a
// builder_error event so the admin timeline catches the
// regression, and return a structured error to the caller.
const msg = err instanceof Error ? err.message : String(err);
logger.error(
{ err, networkId: input.networkId, reason: input.reason },
"evolution builder: unexpected throw escaped inner handlers",
);
await safeWriteEvent({
networkId: input.networkId,
kind: "builder_error",
payload: {
phase: "uncaught",
error: msg,
reason: input.reason,
},
});
return {
networkId: input.networkId,
status: "error",
error: `uncaught: ${msg}`,
};
}
}
async function runEvolutionBuilderInner(
input: RunEvolutionBuilderInput,
): Promise<BuilderResult> {
const { networkId, reason } = input;
let network: Awaited<ReturnType<typeof loadNetwork>> | null = null;
try {
network = await loadNetwork(networkId);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
logger.error({ err, networkId }, "evolution builder: load network failed");
await safeWriteEvent({
networkId,
kind: "builder_error",
payload: { phase: "load_network", error: msg, reason },
});
return { networkId, status: "error", error: `load_network: ${msg}` };
}
if (!network) {
await safeWriteEvent({
networkId,
kind: "builder_error",
payload: { phase: "load_network", error: "network_not_found", reason },
});
return { networkId, status: "error", error: "network_not_found" };
}
const cfg = (network.config ?? {}) as Record<string, unknown>;
const requestedStrategy =
typeof cfg["evolutionStrategy"] === "string"
? (cfg["evolutionStrategy"] as string)
: DEFAULT_EVOLUTION_STRATEGY;
// Task #228 — 用 pickStrategy 而非 getEvolutionStrategy:
// - 已注册名 → 返回对应策略;
// - 未注册 / 空 / undefined → 安全回退到 DEFAULT_EVOLUTION_STRATEGY,
// 绝不抛错,也不再走 "skipped_no_strategy" 死路径。
// 这条线和 admin PATCH /strategy 的入口校验语义一致 (那里写入前
// 已用 getEvolutionStrategy 拒掉未知名),所以生产路径上 cfg 里
// 不会出现未知名;但万一历史数据残留 / 手工 SQL 写入,这里仍能
// 自动降级为 grid 跑下去而不是把整轮 evolution tick 跳掉。
const strategy = pickStrategy(requestedStrategy);
const strategyName = strategy.name;
if (!network.activeVariantId) {
return {
networkId,
status: "skipped_no_active",
strategyName,
};
}
// Cheap fast-fail: hyperparameter_grid (and any strategy with the
// same convention) needs `tunableParams` to do anything; skip
// before loading versions when it's missing so opt-out networks
// pay zero cost on every trigger fire.
if (strategyName === DEFAULT_EVOLUTION_STRATEGY) {
const tp = cfg["tunableParams"];
if (!tp || typeof tp !== "object" || Array.isArray(tp)) {
return {
networkId,
status: "skipped_no_tunable_params",
strategyName,
};
}
}
let activeVariant: NetworkVersionRow | null = null;
let existingVersions: NetworkVersionRow[] = [];
try {
const activeRows = await db
.select()
.from(networkVersions)
.where(eq(networkVersions.id, network.activeVariantId))
.limit(1);
activeVariant = activeRows[0] ?? null;
if (!activeVariant) {
return {
networkId,
status: "skipped_no_active",
strategyName,
};
}
existingVersions = await db
.select()
.from(networkVersions)
.where(eq(networkVersions.networkId, networkId));
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
logger.error(
{ err, networkId },
"evolution builder: load variants failed",
);
await safeWriteEvent({
networkId,
kind: "builder_error",
payload: {
strategy: strategyName,
phase: "load_variants",
error: msg,
reason,
},
});
return { networkId, status: "error", error: `load_variants: ${msg}`, strategyName };
}
let proposal: CandidateProposal | null = null;
try {
proposal = await strategy.propose({
network,
activeVariant,
existingVersions,
});
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
logger.warn(
{ err, networkId, strategyName },
"evolution builder: strategy.propose threw",
);
await safeWriteEvent({
networkId,
kind: "builder_error",
payload: {
strategy: strategyName,
phase: "propose",
error: msg,
reason,
},
});
return { networkId, status: "error", error: msg, strategyName };
}
if (!proposal) {
// Debounce: only emit one `builder_no_candidate` per network per
// hour so a chronically-exhausted grid doesn't flood the admin
// event stream every 5-min tick.
const debounceSince = new Date(Date.now() - NO_CANDIDATE_DEBOUNCE_MS);
let debounced = false;
try {
const recent = await db
.select({ id: networkEvolutionEvents.id })
.from(networkEvolutionEvents)
.where(
and(
eq(networkEvolutionEvents.networkId, networkId),
eq(networkEvolutionEvents.kind, "builder_no_candidate"),
gte(networkEvolutionEvents.createdAt, debounceSince),
),
)
.limit(1);
debounced = recent.length > 0;
} catch (err) {
logger.warn(
{ err, networkId },
"evolution builder: debounce lookup failed; emitting event",
);
}
if (!debounced) {
await safeWriteEvent({
networkId,
kind: "builder_no_candidate",
payload: {
strategy: strategyName,
reason,
activeVersionId: activeVariant.id,
existingVersionCount: existingVersions.length,
exhausted: true,
},
});
return {
networkId,
status: "no_candidate",
strategyName,
};
}
return {
networkId,
status: "no_candidate_debounced",
strategyName,
};
}
const versionLabel =
proposal.versionLabel ?? nextVersionLabel(existingVersions);
const candidateId = newId("nver");
const builderTier = proposal.builderModelTier ?? "strong";
try {
await db.insert(networkVersions).values({
id: candidateId,
networkId,
versionLabel,
// Builder writes structural-equivalent to the active variant
// (same nodes, same edges) — only `config` knobs differ. When a
// future strategy mutates topology it will produce its own
// `internalGraph` in the proposal; we already pass the active's
// graph through so deterministic strategies don't have to
// duplicate it.
internalGraph: activeVariant.internalGraph,
config: proposal.config,
status: "shadow",
builtBy: `auto:${strategyName}`,
builderModelTier: builderTier,
});
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
logger.warn(
{ err, networkId, candidateId, versionLabel },
"evolution builder: insert variant failed",
);
await safeWriteEvent({
networkId,
kind: "builder_error",
payload: {
strategy: strategyName,
phase: "insert_variant",
error: msg,
reason,
attemptedVersionLabel: versionLabel,
},
});
return { networkId, status: "error", error: msg, strategyName };
}
const diff = computeConfigDiff(
(activeVariant.config ?? {}) as Record<string, unknown>,
proposal.config,
);
await safeWriteEvent({
networkId,
kind: "builder_proposed",
variantId: candidateId,
payload: {
strategy: strategyName,
reason,
parentActiveId: activeVariant.id,
parentVersionLabel: activeVariant.versionLabel,
parentConfig: activeVariant.config,
versionLabel,
config: proposal.config,
rationale: proposal.rationale,
builderModelTier: builderTier,
diff,
},
});
return {
networkId,
status: "proposed",
strategyName,
candidateVariantId: candidateId,
versionLabel,
config: proposal.config,
rationale: proposal.rationale,
diff,
};
}
async function loadNetwork(networkId: string) {
const rows = await db
.select()
.from(toolNetworks)
.where(eq(toolNetworks.id, networkId))
.limit(1);
return rows[0] ?? null;
}
/**
* Pick the next `vN` label that does not collide with any existing
* version. Falls back to `nver_<random>` if nothing matches the
* `v<digits>` convention — that's safer than blocking the builder on
* a parsing edge case.
*/
function nextVersionLabel(versions: NetworkVersionRow[]): string {
let max = 0;
let any = false;
for (const v of versions) {
const m = /^v(\d+)$/i.exec(v.versionLabel ?? "");
if (m && m[1]) {
any = true;
const n = Number(m[1]);
if (Number.isFinite(n) && n > max) max = n;
}
}
if (!any) {
// No `v<n>` baseline; start at v1 unless a non-conforming label
// already occupies it.
const taken = new Set(versions.map((v) => v.versionLabel));
return taken.has("v1") ? `v_${Date.now().toString(36)}` : "v1";
}
return `v${max + 1}`;
}
/**
* Per-key diff string list ("shingleK: 4 → 5"). Surfaced into both
* the `builder_proposed` event payload and the BuilderResult so admin
* UIs and audit logs read identically.
*/
function computeConfigDiff(
before: Record<string, unknown>,
after: Record<string, unknown>,
): string[] {
const out: string[] = [];
const keys = new Set<string>([
...Object.keys(before),
...Object.keys(after),
]);
for (const k of [...keys].sort()) {
const a = stableStringify(before[k]);
const b = stableStringify(after[k]);
if (a !== b) {
out.push(`${k}: ${a ?? "∅"}${b ?? "∅"}`);
}
}
return out;
}
interface SafeWriteEventInput {
networkId: string;
kind: "builder_proposed" | "builder_no_candidate" | "builder_error";
variantId?: string;
payload: Record<string, unknown>;
}
/**
* Wrapper around `recordEvent` that swallows write failures with a
* loud log line. The builder must never throw out of `runEvolutionBuilder`
* — even if the events table itself is degraded, the scheduler tick
* continues to `attemptAutoPromote`/`tickRollbackWatch`.
*/
async function safeWriteEvent(input: SafeWriteEventInput): Promise<void> {
try {
await recordEvent({
networkId: input.networkId,
kind: input.kind,
variantId: input.variantId ?? null,
payload: input.payload,
});
} catch (err) {
logger.error(
{ err, networkId: input.networkId, kind: input.kind },
"evolution builder: failed to record event (suppressed)",
);
}
}