File size: 12,314 Bytes
ff78003
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
/**
 * Wave B — background cadence worker for the online-evolution flywheel.
 *
 * The trigger / promote / rollback library functions are all pure
 * primitives that the admin endpoints already wrap. This file is the
 * "in-process cron" that calls them on a fixed schedule so the loop
 * runs without an operator hitting the admin page:
 *
 *   every TICK_MS:
 *     for each active tool_network:
 *       1. evaluateTriggers(network)
 *       2. if anyFired → attemptAutoPromote(network, candidate)
 *          for each shadow candidate (skipping private-namespace ones)
 *       3. tickRollbackWatch()  (once per pass, sweeps every recent
 *          auto-promote across all networks)
 *
 * Everything is best-effort and per-network try/catch so a single
 * misbehaving network never breaks the loop. A single-flight latch
 * prevents overlapping passes when a tick runs longer than TICK_MS.
 *
 * Disable in env with `EVOLUTION_SCHEDULER_DISABLED=1`. Override the
 * cadence (in ms) with `EVOLUTION_SCHEDULER_INTERVAL_MS=300000`. The
 * scheduler stays inert in tests (`NODE_ENV=test`) so suites do not
 * have to teardown a global timer.
 *
 * Every tick emits a structured log under `kind=evolution_scheduler_tick`
 * so the same admin Evolution Live page that already reads
 * `network_evolution_events` can be paired with the workflow log to see
 * the cadence end-to-end. Skipped/promoted/rolled-back outcomes are
 * still written into `network_evolution_events` by the underlying
 * library functions — no double-write here.
 */
import { and, eq, isNull } from "drizzle-orm";
import { db, networkVersions, toolNetworks } from "@workspace/db";
import { logger } from "../logger";
import { evaluateTriggers, type TriggerEvaluation } from "./triggers";
import {
  attemptAutoPromote,
  type AutoPromoteDecision,
} from "./promote";
import { tickRollbackWatch, type MonitorResult } from "./rollback";
import { runEvolutionBuilder, type BuilderResult } from "./builder";

export const DEFAULT_EVOLUTION_TICK_MS = 5 * 60 * 1000;

export interface EvolutionTickOptions {
  /**
   * Cap on how many shadow candidates we try to auto-promote per
   * triggered network in one tick. Keeps a misconfigured network from
   * dog-piling promote attempts; the admin page can still drive more
   * manually via POST /admin/evolution/.../promote.
   */
  maxCandidatesPerNetwork?: number;
}

export interface EvolutionTickNetworkSummary {
  networkId: string;
  networkName: string;
  evaluation?: Pick<TriggerEvaluation, "anyFired" | "signals">;
  /**
   * Wave B step 2 — builder pass result for this network on this
   * tick. Present only when the trigger fired (we don't run the
   * builder otherwise — no point burning CPU when no signal demands
   * a new candidate).
   */
  builder?: BuilderResult;
  promoteDecisions: AutoPromoteDecision[];
  errors: string[];
}

export interface EvolutionTickSummary {
  startedAt: string;
  finishedAt: string;
  durationMs: number;
  networksScanned: number;
  triggersFired: number;
  /**
   * Wave B step 2 — count of `runEvolutionBuilder` calls that
   * returned `status='proposed'` this tick. Useful as a top-line
   * metric for the admin Live page (and for the per-tick log line
   * below) so operators can see at a glance whether the flywheel is
   * actually producing candidates.
   */
  candidatesProposed: number;
  promotionsAttempted: number;
  promoted: number;
  rolledBack: number;
  perNetwork: EvolutionTickNetworkSummary[];
  rollback: MonitorResult[];
  errors: string[];
}

/**
 * Run one full pass of the scheduler. Exposed independently of the
 * timer so tests, smoke drivers, and the admin endpoint can drive a
 * one-shot tick deterministically.
 */
export async function runEvolutionTick(
  opts: EvolutionTickOptions = {},
): Promise<EvolutionTickSummary> {
  const startedAtMs = Date.now();
  const startedAt = new Date(startedAtMs).toISOString();
  const maxCandidates = opts.maxCandidatesPerNetwork ?? 5;

  const summary: EvolutionTickSummary = {
    startedAt,
    finishedAt: startedAt,
    durationMs: 0,
    networksScanned: 0,
    triggersFired: 0,
    candidatesProposed: 0,
    promotionsAttempted: 0,
    promoted: 0,
    rolledBack: 0,
    perNetwork: [],
    rollback: [],
    errors: [],
  };

  let networks: Array<{ id: string; name: string }> = [];
  try {
    networks = await db
      .select({ id: toolNetworks.id, name: toolNetworks.name })
      .from(toolNetworks)
      .where(eq(toolNetworks.status, "active"));
  } catch (err) {
    const msg = err instanceof Error ? err.message : String(err);
    summary.errors.push(`list_networks_failed: ${msg}`);
    logger.error({ err }, "evolution scheduler: failed to list networks");
  }

  for (const net of networks) {
    summary.networksScanned += 1;
    const perNet: EvolutionTickNetworkSummary = {
      networkId: net.id,
      networkName: net.name,
      promoteDecisions: [],
      errors: [],
    };

    let evaluation: TriggerEvaluation | null = null;
    try {
      evaluation = await evaluateTriggers(net.id);
      perNet.evaluation = {
        anyFired: evaluation.anyFired,
        signals: evaluation.signals,
      };
      if (evaluation.anyFired) {
        summary.triggersFired += 1;
      }
    } catch (err) {
      const msg = err instanceof Error ? err.message : String(err);
      perNet.errors.push(`evaluate_failed: ${msg}`);
      logger.error(
        { err, networkId: net.id },
        "evolution scheduler: evaluateTriggers failed",
      );
    }

    if (evaluation?.anyFired) {
      // -- Builder pass (Wave B step 2) --------------------------------
      // Trigger fired → ask the network's strategy for one new candidate
      // before we look at existing shadow rows. Result is folded into
      // `perNet.builder`; failures are isolated inside runEvolutionBuilder
      // (it never throws) so a strategy fault can't block the promote +
      // rollback passes that come after.
      try {
        const builderResult = await runEvolutionBuilder({
          networkId: net.id,
          reason: "scheduler_tick",
        });
        perNet.builder = builderResult;
        if (builderResult.status === "proposed") {
          summary.candidatesProposed += 1;
        }
      } catch (err) {
        // runEvolutionBuilder is documented as non-throwing, but the
        // outer try is here so a future regression to that contract
        // can't take the whole tick down.
        const msg = err instanceof Error ? err.message : String(err);
        perNet.errors.push(`builder_threw: ${msg}`);
        logger.error(
          { err, networkId: net.id },
          "evolution scheduler: runEvolutionBuilder threw (contract violation)",
        );
      }

      // Pick eligible shadow variants. Private-namespaced variants are
      // excluded — they only ever serve their owning user and are
      // promoted via the runtime path, never the shared active slot.
      let candidates: Array<{ id: string }> = [];
      try {
        candidates = await db
          .select({ id: networkVersions.id })
          .from(networkVersions)
          .where(
            and(
              eq(networkVersions.networkId, net.id),
              eq(networkVersions.status, "shadow"),
              isNull(networkVersions.privateNamespace),
            ),
          );
      } catch (err) {
        const msg = err instanceof Error ? err.message : String(err);
        perNet.errors.push(`list_candidates_failed: ${msg}`);
        logger.error(
          { err, networkId: net.id },
          "evolution scheduler: failed to list shadow candidates",
        );
      }

      const limited = candidates.slice(0, maxCandidates);
      for (const cand of limited) {
        summary.promotionsAttempted += 1;
        try {
          const decision = await attemptAutoPromote({
            networkId: net.id,
            candidateVariantId: cand.id,
            actor: "scheduler",
          });
          perNet.promoteDecisions.push(decision);
          if (
            decision.outcome === "promoted" ||
            decision.outcome === "promoted_private"
          ) {
            summary.promoted += 1;
          }
        } catch (err) {
          const msg = err instanceof Error ? err.message : String(err);
          perNet.errors.push(`promote_failed:${cand.id}:${msg}`);
          logger.warn(
            { err, networkId: net.id, candidateVariantId: cand.id },
            "evolution scheduler: attemptAutoPromote threw",
          );
        }
      }
    }

    summary.perNetwork.push(perNet);
  }

  // One rollback sweep covers every recent auto-promote across all
  // networks (the watcher already pages by horizon = window * 2).
  try {
    const rollback = await tickRollbackWatch();
    summary.rollback = rollback;
    summary.rolledBack = rollback.filter(
      (r) => r.outcome === "rolled_back",
    ).length;
  } catch (err) {
    const msg = err instanceof Error ? err.message : String(err);
    summary.errors.push(`rollback_tick_failed: ${msg}`);
    logger.error(
      { err },
      "evolution scheduler: tickRollbackWatch failed",
    );
  }

  const finishedAtMs = Date.now();
  summary.finishedAt = new Date(finishedAtMs).toISOString();
  summary.durationMs = finishedAtMs - startedAtMs;
  return summary;
}

export interface StartEvolutionSchedulerOptions {
  /** Cadence in ms. Defaults to 5 minutes. */
  intervalMs?: number;
  /** Run a tick immediately on start (default true). */
  runOnStart?: boolean;
  /** Per-tick options forwarded to `runEvolutionTick`. */
  tick?: EvolutionTickOptions;
}

export interface EvolutionSchedulerHandle {
  stop: () => void;
}

/**
 * Start the in-process cadence worker. Returns a handle whose
 * `stop()` cancels the timer and lets the next pending tick finish.
 *
 * Honours `EVOLUTION_SCHEDULER_DISABLED=1` and silently no-ops in
 * `NODE_ENV=test`. Otherwise the timer is `unref()`'d so it cannot
 * keep the Node event loop alive on its own.
 */
export function startEvolutionScheduler(
  opts: StartEvolutionSchedulerOptions = {},
): EvolutionSchedulerHandle {
  if (process.env["NODE_ENV"] === "test") {
    return { stop: () => {} };
  }
  if (process.env["EVOLUTION_SCHEDULER_DISABLED"] === "1") {
    logger.info(
      "evolution scheduler: disabled via EVOLUTION_SCHEDULER_DISABLED=1",
    );
    return { stop: () => {} };
  }

  const envInterval = Number(process.env["EVOLUTION_SCHEDULER_INTERVAL_MS"]);
  const intervalMs =
    opts.intervalMs ??
    (Number.isFinite(envInterval) && envInterval >= 1000
      ? envInterval
      : DEFAULT_EVOLUTION_TICK_MS);

  let stopped = false;
  let inFlight = false;

  const fire = async () => {
    if (stopped) return;
    if (inFlight) {
      logger.warn(
        { kind: "evolution_scheduler_tick" },
        "evolution scheduler: previous tick still in flight, skipping",
      );
      return;
    }
    inFlight = true;
    try {
      const summary = await runEvolutionTick(opts.tick ?? {});
      logger.info(
        {
          kind: "evolution_scheduler_tick",
          networksScanned: summary.networksScanned,
          triggersFired: summary.triggersFired,
          candidatesProposed: summary.candidatesProposed,
          promotionsAttempted: summary.promotionsAttempted,
          promoted: summary.promoted,
          rolledBack: summary.rolledBack,
          durationMs: summary.durationMs,
          errors: summary.errors,
        },
        "evolution scheduler: tick complete",
      );
    } catch (err) {
      logger.error(
        { err, kind: "evolution_scheduler_tick" },
        "evolution scheduler: tick threw",
      );
    } finally {
      inFlight = false;
    }
  };

  const timer = setInterval(() => {
    void fire();
  }, intervalMs);
  if (typeof timer.unref === "function") timer.unref();

  if (opts.runOnStart !== false) {
    // Defer the first tick so module-load doesn't block server boot
    // and so stop() called immediately can still cancel cleanly.
    setTimeout(() => {
      void fire();
    }, 0).unref?.();
  }

  logger.info(
    { intervalMs, kind: "evolution_scheduler_start" },
    "evolution scheduler: started",
  );

  return {
    stop: () => {
      stopped = true;
      clearInterval(timer);
    },
  };
}