Iostream-Li's picture
Add files using upload-large-folder tool
ff78003 verified
/**
* Wave B — background cadence worker for the online-evolution flywheel.
*
* The trigger / promote / rollback library functions are all pure
* primitives that the admin endpoints already wrap. This file is the
* "in-process cron" that calls them on a fixed schedule so the loop
* runs without an operator hitting the admin page:
*
* every TICK_MS:
* for each active tool_network:
* 1. evaluateTriggers(network)
* 2. if anyFired → attemptAutoPromote(network, candidate)
* for each shadow candidate (skipping private-namespace ones)
* 3. tickRollbackWatch() (once per pass, sweeps every recent
* auto-promote across all networks)
*
* Everything is best-effort and per-network try/catch so a single
* misbehaving network never breaks the loop. A single-flight latch
* prevents overlapping passes when a tick runs longer than TICK_MS.
*
* Disable in env with `EVOLUTION_SCHEDULER_DISABLED=1`. Override the
* cadence (in ms) with `EVOLUTION_SCHEDULER_INTERVAL_MS=300000`. The
* scheduler stays inert in tests (`NODE_ENV=test`) so suites do not
* have to teardown a global timer.
*
* Every tick emits a structured log under `kind=evolution_scheduler_tick`
* so the same admin Evolution Live page that already reads
* `network_evolution_events` can be paired with the workflow log to see
* the cadence end-to-end. Skipped/promoted/rolled-back outcomes are
* still written into `network_evolution_events` by the underlying
* library functions — no double-write here.
*/
import { and, eq, isNull } from "drizzle-orm";
import { db, networkVersions, toolNetworks } from "@workspace/db";
import { logger } from "../logger";
import { evaluateTriggers, type TriggerEvaluation } from "./triggers";
import {
attemptAutoPromote,
type AutoPromoteDecision,
} from "./promote";
import { tickRollbackWatch, type MonitorResult } from "./rollback";
import { runEvolutionBuilder, type BuilderResult } from "./builder";
export const DEFAULT_EVOLUTION_TICK_MS = 5 * 60 * 1000;
export interface EvolutionTickOptions {
/**
* Cap on how many shadow candidates we try to auto-promote per
* triggered network in one tick. Keeps a misconfigured network from
* dog-piling promote attempts; the admin page can still drive more
* manually via POST /admin/evolution/.../promote.
*/
maxCandidatesPerNetwork?: number;
}
export interface EvolutionTickNetworkSummary {
networkId: string;
networkName: string;
evaluation?: Pick<TriggerEvaluation, "anyFired" | "signals">;
/**
* Wave B step 2 — builder pass result for this network on this
* tick. Present only when the trigger fired (we don't run the
* builder otherwise — no point burning CPU when no signal demands
* a new candidate).
*/
builder?: BuilderResult;
promoteDecisions: AutoPromoteDecision[];
errors: string[];
}
export interface EvolutionTickSummary {
startedAt: string;
finishedAt: string;
durationMs: number;
networksScanned: number;
triggersFired: number;
/**
* Wave B step 2 — count of `runEvolutionBuilder` calls that
* returned `status='proposed'` this tick. Useful as a top-line
* metric for the admin Live page (and for the per-tick log line
* below) so operators can see at a glance whether the flywheel is
* actually producing candidates.
*/
candidatesProposed: number;
promotionsAttempted: number;
promoted: number;
rolledBack: number;
perNetwork: EvolutionTickNetworkSummary[];
rollback: MonitorResult[];
errors: string[];
}
/**
* Run one full pass of the scheduler. Exposed independently of the
* timer so tests, smoke drivers, and the admin endpoint can drive a
* one-shot tick deterministically.
*/
export async function runEvolutionTick(
opts: EvolutionTickOptions = {},
): Promise<EvolutionTickSummary> {
const startedAtMs = Date.now();
const startedAt = new Date(startedAtMs).toISOString();
const maxCandidates = opts.maxCandidatesPerNetwork ?? 5;
const summary: EvolutionTickSummary = {
startedAt,
finishedAt: startedAt,
durationMs: 0,
networksScanned: 0,
triggersFired: 0,
candidatesProposed: 0,
promotionsAttempted: 0,
promoted: 0,
rolledBack: 0,
perNetwork: [],
rollback: [],
errors: [],
};
let networks: Array<{ id: string; name: string }> = [];
try {
networks = await db
.select({ id: toolNetworks.id, name: toolNetworks.name })
.from(toolNetworks)
.where(eq(toolNetworks.status, "active"));
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
summary.errors.push(`list_networks_failed: ${msg}`);
logger.error({ err }, "evolution scheduler: failed to list networks");
}
for (const net of networks) {
summary.networksScanned += 1;
const perNet: EvolutionTickNetworkSummary = {
networkId: net.id,
networkName: net.name,
promoteDecisions: [],
errors: [],
};
let evaluation: TriggerEvaluation | null = null;
try {
evaluation = await evaluateTriggers(net.id);
perNet.evaluation = {
anyFired: evaluation.anyFired,
signals: evaluation.signals,
};
if (evaluation.anyFired) {
summary.triggersFired += 1;
}
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
perNet.errors.push(`evaluate_failed: ${msg}`);
logger.error(
{ err, networkId: net.id },
"evolution scheduler: evaluateTriggers failed",
);
}
if (evaluation?.anyFired) {
// -- Builder pass (Wave B step 2) --------------------------------
// Trigger fired → ask the network's strategy for one new candidate
// before we look at existing shadow rows. Result is folded into
// `perNet.builder`; failures are isolated inside runEvolutionBuilder
// (it never throws) so a strategy fault can't block the promote +
// rollback passes that come after.
try {
const builderResult = await runEvolutionBuilder({
networkId: net.id,
reason: "scheduler_tick",
});
perNet.builder = builderResult;
if (builderResult.status === "proposed") {
summary.candidatesProposed += 1;
}
} catch (err) {
// runEvolutionBuilder is documented as non-throwing, but the
// outer try is here so a future regression to that contract
// can't take the whole tick down.
const msg = err instanceof Error ? err.message : String(err);
perNet.errors.push(`builder_threw: ${msg}`);
logger.error(
{ err, networkId: net.id },
"evolution scheduler: runEvolutionBuilder threw (contract violation)",
);
}
// Pick eligible shadow variants. Private-namespaced variants are
// excluded — they only ever serve their owning user and are
// promoted via the runtime path, never the shared active slot.
let candidates: Array<{ id: string }> = [];
try {
candidates = await db
.select({ id: networkVersions.id })
.from(networkVersions)
.where(
and(
eq(networkVersions.networkId, net.id),
eq(networkVersions.status, "shadow"),
isNull(networkVersions.privateNamespace),
),
);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
perNet.errors.push(`list_candidates_failed: ${msg}`);
logger.error(
{ err, networkId: net.id },
"evolution scheduler: failed to list shadow candidates",
);
}
const limited = candidates.slice(0, maxCandidates);
for (const cand of limited) {
summary.promotionsAttempted += 1;
try {
const decision = await attemptAutoPromote({
networkId: net.id,
candidateVariantId: cand.id,
actor: "scheduler",
});
perNet.promoteDecisions.push(decision);
if (
decision.outcome === "promoted" ||
decision.outcome === "promoted_private"
) {
summary.promoted += 1;
}
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
perNet.errors.push(`promote_failed:${cand.id}:${msg}`);
logger.warn(
{ err, networkId: net.id, candidateVariantId: cand.id },
"evolution scheduler: attemptAutoPromote threw",
);
}
}
}
summary.perNetwork.push(perNet);
}
// One rollback sweep covers every recent auto-promote across all
// networks (the watcher already pages by horizon = window * 2).
try {
const rollback = await tickRollbackWatch();
summary.rollback = rollback;
summary.rolledBack = rollback.filter(
(r) => r.outcome === "rolled_back",
).length;
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
summary.errors.push(`rollback_tick_failed: ${msg}`);
logger.error(
{ err },
"evolution scheduler: tickRollbackWatch failed",
);
}
const finishedAtMs = Date.now();
summary.finishedAt = new Date(finishedAtMs).toISOString();
summary.durationMs = finishedAtMs - startedAtMs;
return summary;
}
export interface StartEvolutionSchedulerOptions {
/** Cadence in ms. Defaults to 5 minutes. */
intervalMs?: number;
/** Run a tick immediately on start (default true). */
runOnStart?: boolean;
/** Per-tick options forwarded to `runEvolutionTick`. */
tick?: EvolutionTickOptions;
}
export interface EvolutionSchedulerHandle {
stop: () => void;
}
/**
* Start the in-process cadence worker. Returns a handle whose
* `stop()` cancels the timer and lets the next pending tick finish.
*
* Honours `EVOLUTION_SCHEDULER_DISABLED=1` and silently no-ops in
* `NODE_ENV=test`. Otherwise the timer is `unref()`'d so it cannot
* keep the Node event loop alive on its own.
*/
export function startEvolutionScheduler(
opts: StartEvolutionSchedulerOptions = {},
): EvolutionSchedulerHandle {
if (process.env["NODE_ENV"] === "test") {
return { stop: () => {} };
}
if (process.env["EVOLUTION_SCHEDULER_DISABLED"] === "1") {
logger.info(
"evolution scheduler: disabled via EVOLUTION_SCHEDULER_DISABLED=1",
);
return { stop: () => {} };
}
const envInterval = Number(process.env["EVOLUTION_SCHEDULER_INTERVAL_MS"]);
const intervalMs =
opts.intervalMs ??
(Number.isFinite(envInterval) && envInterval >= 1000
? envInterval
: DEFAULT_EVOLUTION_TICK_MS);
let stopped = false;
let inFlight = false;
const fire = async () => {
if (stopped) return;
if (inFlight) {
logger.warn(
{ kind: "evolution_scheduler_tick" },
"evolution scheduler: previous tick still in flight, skipping",
);
return;
}
inFlight = true;
try {
const summary = await runEvolutionTick(opts.tick ?? {});
logger.info(
{
kind: "evolution_scheduler_tick",
networksScanned: summary.networksScanned,
triggersFired: summary.triggersFired,
candidatesProposed: summary.candidatesProposed,
promotionsAttempted: summary.promotionsAttempted,
promoted: summary.promoted,
rolledBack: summary.rolledBack,
durationMs: summary.durationMs,
errors: summary.errors,
},
"evolution scheduler: tick complete",
);
} catch (err) {
logger.error(
{ err, kind: "evolution_scheduler_tick" },
"evolution scheduler: tick threw",
);
} finally {
inFlight = false;
}
};
const timer = setInterval(() => {
void fire();
}, intervalMs);
if (typeof timer.unref === "function") timer.unref();
if (opts.runOnStart !== false) {
// Defer the first tick so module-load doesn't block server boot
// and so stop() called immediately can still cancel cleanly.
setTimeout(() => {
void fire();
}, 0).unref?.();
}
logger.info(
{ intervalMs, kind: "evolution_scheduler_start" },
"evolution scheduler: started",
);
return {
stop: () => {
stopped = true;
clearInterval(timer);
},
};
}