doatlas-2 / artifacts /api-server /src /lib /tool-graph-search.ts
Iostream-Li's picture
Add files using upload-large-folder tool
5871090 verified
/**
* tool-graph-search — Mode B (#169) self-evolving tool graph.
*
* Domain-agnostic primitives + search loop that propose candidate
* subgraphs against a Goal (dataset_node + evaluator_node + budget).
* Wins are persisted as provisional nodes/edges; the auto-promotion
* gate flips a sufficiently-supported provisional → verified, and any
* promotion is reversible via `rollbackPromotion`.
*
* NO domain-specific (chemistry / DrugCLIP / EF1%) logic lives here:
* the four primitives only manipulate the structural graph
* (node ids, capability tags, edges, contracts), and the evaluator
* is plugged in via an in-process registry. Seed nodes/datasets/
* evaluators are created via `seedSyntheticToyData()` (gated on
* `TOOL_GRAPH_SYNTHETIC_SEED=1` at boot, or called directly from
* tests).
*
* Public surface:
* - bootstrapToolGoalsSchema() CREATE TABLE IF NOT EXISTS for
* the four #169 tables.
* - createGoal / listGoals / getGoal / archiveGoal
* - runGoalSearch(goalId, opts) the search loop.
* - listRuns / getRun
* - diffSubgraphAgainstVerified(candidateId)
* - autoPromoteIfReady(nodeId) gate that flips provisional→verified.
* - rollbackPromotion(nodeId) undo the last auto-promotion.
* - registerEvaluator(name, fn) install an in-process evaluator.
* - seedSyntheticToyData() toy dataset+evaluator+nodes for
* the smoke test.
* - recordHighConfidenceGap(...) Mode A boost: a single high-
* confidence signal can spawn a
* provisional immediately.
*/
import { and, asc, desc, eq, inArray } from "drizzle-orm";
import {
db,
pool,
toolNodes,
toolEdges,
toolNodeEvidence,
toolGoals,
toolGoalRuns,
toolGoalCandidates,
toolPromotionAudit,
type ToolNodeRow,
type ToolGoalRow,
type ToolGoalRunRow,
type ToolGoalCandidateRow,
type InsertToolGoalRow,
} from "@workspace/db";
import { newId } from "./ids";
import { logger } from "./logger";
import {
upsertNode,
upsertEdge,
validateContract,
getNode,
listNodes,
recordPlannerGap,
autoExtendIfNeeded,
approveNode,
type ContractSpec,
type IOSchema,
type ToolNodeStatus,
} from "./tool-graph";
// ----------------------------- env knobs ---------------------------------
const ENV = process.env;
/** When true, `seedToolGraphOnce` will also seed the toy synthetic
* dataset+evaluator+nodes used by Mode B smoke tests. Off by default
* so production graphs aren't polluted. */
export function syntheticSeedEnabled(): boolean {
return ENV.TOOL_GRAPH_SYNTHETIC_SEED === "1";
}
/** Mode A boost: a planner gap whose confidence (0..1) exceeds this
* threshold spawns a provisional node *immediately* without needing
* the usual `GAP_AUTO_EXTEND_THRESHOLD` invocations. Default 0.85. */
export function highConfidenceGapThreshold(): number {
const v = Number(ENV.GAP_HIGH_CONFIDENCE_THRESHOLD ?? "0.85");
return Number.isFinite(v) ? v : 0.85;
}
/** Auto-promotion gate: minimum EMA edge-health score across the
* provisional node's incident edges. Default 0.85. */
export function autoPromoteMinEdgeHealth(): number {
const v = Number(ENV.AUTO_PROMOTE_MIN_EDGE_HEALTH ?? "0.85");
return Number.isFinite(v) ? v : 0.85;
}
/** Auto-promotion gate: minimum success ratio across `tool_node_evidence`
* for the provisional node. Default 0.7. */
export function autoPromoteMinSuccessRatio(): number {
const v = Number(ENV.AUTO_PROMOTE_MIN_SUCCESS_RATIO ?? "0.7");
return Number.isFinite(v) ? v : 0.7;
}
/** Auto-promotion gate: minimum number of evidence rows considered. */
export function autoPromoteMinEvidence(): number {
const v = Number(ENV.AUTO_PROMOTE_MIN_EVIDENCE ?? "3");
return Number.isFinite(v) ? v : 3;
}
// ----------------------------- schema bootstrap --------------------------
let _bootstrapped = false;
/**
* Idempotent CREATE TABLE IF NOT EXISTS for the four #169 tables.
* Mirrors the drizzle definitions in lib/db/src/schema/toolGraph.ts.
* Called at boot from `seedToolGraphOnce` so the new schema is
* available without a separate migration step in environments where
* `drizzle-kit push` hasn't been run yet.
*/
export async function bootstrapToolGoalsSchema(): Promise<void> {
if (_bootstrapped) return;
await pool.query(`
CREATE TABLE IF NOT EXISTS tool_goals (
id text PRIMARY KEY,
name text NOT NULL,
description text NOT NULL DEFAULT '',
dataset_node_id text NOT NULL,
evaluator_node_id text NOT NULL,
budget jsonb NOT NULL DEFAULT '{}'::jsonb,
constraints jsonb NOT NULL DEFAULT '{}'::jsonb,
status text NOT NULL DEFAULT 'active',
created_by text NOT NULL DEFAULT 'system',
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS tool_goals_status_idx ON tool_goals(status);
CREATE TABLE IF NOT EXISTS tool_goal_runs (
id text PRIMARY KEY,
goal_id text NOT NULL REFERENCES tool_goals(id) ON DELETE CASCADE,
status text NOT NULL DEFAULT 'running',
goal_snapshot jsonb NOT NULL DEFAULT '{}'::jsonb,
best_metric double precision,
best_candidate_id text,
iterations integer NOT NULL DEFAULT 0,
candidates_evaluated integer NOT NULL DEFAULT 0,
error text,
started_at timestamptz NOT NULL DEFAULT now(),
finished_at timestamptz
);
CREATE INDEX IF NOT EXISTS tool_goal_runs_goal_idx ON tool_goal_runs(goal_id);
CREATE INDEX IF NOT EXISTS tool_goal_runs_status_idx ON tool_goal_runs(status);
CREATE TABLE IF NOT EXISTS tool_goal_candidates (
id text PRIMARY KEY,
run_id text NOT NULL REFERENCES tool_goal_runs(id) ON DELETE CASCADE,
generation integer NOT NULL DEFAULT 0,
primitive text NOT NULL,
parent_candidate_id text,
subgraph jsonb NOT NULL,
contract_ok integer NOT NULL DEFAULT 1,
contract_issues jsonb NOT NULL DEFAULT '[]'::jsonb,
metric double precision,
evaluator_payload jsonb,
created_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS tool_goal_candidates_run_idx ON tool_goal_candidates(run_id);
CREATE INDEX IF NOT EXISTS tool_goal_candidates_metric_idx ON tool_goal_candidates(metric);
CREATE TABLE IF NOT EXISTS tool_promotion_audit (
id text PRIMARY KEY,
node_id text NOT NULL,
action text NOT NULL,
from_status text NOT NULL,
to_status text NOT NULL,
actor text NOT NULL DEFAULT 'system',
evidence_snapshot jsonb NOT NULL DEFAULT '{}'::jsonb,
run_id text,
candidate_id text,
created_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS tool_promotion_audit_node_idx ON tool_promotion_audit(node_id);
CREATE INDEX IF NOT EXISTS tool_promotion_audit_action_idx ON tool_promotion_audit(action);
`);
_bootstrapped = true;
}
// ----------------------------- types -------------------------------------
export interface SubgraphRef {
/** Live `tool_nodes.id` values that participate in this candidate. */
nodeIds: string[];
/** Edge tuples (use names rather than ids so cross-run inspection is easier). */
edges: Array<{ from: string; to: string; relation: string }>;
}
export interface GoalSpec {
id: string;
name: string;
description: string;
datasetNodeId: string;
evaluatorNodeId: string;
budget: { wallClockMs?: number; maxIterations?: number; maxCandidates?: number };
constraints: Record<string, unknown>;
}
export type Primitive = "expand" | "compose" | "replace" | "tune" | "seed";
export interface EvaluatorContext {
goal: GoalSpec;
datasetSpec: Record<string, unknown>;
candidate: SubgraphRef;
/** Materialised nodes, keyed by id, for evaluator inspection. */
nodes: Map<string, ToolNodeRow>;
}
export interface EvaluatorResult {
metric: number;
payload?: Record<string, unknown>;
}
export type EvaluatorFn = (ctx: EvaluatorContext) => Promise<EvaluatorResult> | EvaluatorResult;
const _evaluators = new Map<string, EvaluatorFn>();
/** Register an in-process evaluator. The Goal references the
* evaluator by *node name* (the evaluator node's `name` column),
* so naming must agree with what `seedSyntheticToyData` (or any
* caller) inserts into `tool_nodes`. */
export function registerEvaluator(name: string, fn: EvaluatorFn): void {
_evaluators.set(name, fn);
}
export function getEvaluator(name: string): EvaluatorFn | undefined {
return _evaluators.get(name);
}
// ----------------------------- evaluator harness -------------------------
async function loadEvaluatorContext(
goal: GoalSpec,
candidate: SubgraphRef,
): Promise<EvaluatorContext> {
const datasetRow = await getNodeRow(goal.datasetNodeId);
if (!datasetRow) throw new Error(`dataset node ${goal.datasetNodeId} missing`);
const ids = candidate.nodeIds.length
? await db.select().from(toolNodes).where(inArray(toolNodes.id, candidate.nodeIds))
: [];
const nodes = new Map<string, ToolNodeRow>();
for (const n of ids) nodes.set(n.id, n);
return {
goal,
datasetSpec: (datasetRow.specJson || {}) as Record<string, unknown>,
candidate,
nodes,
};
}
async function evaluate(goal: GoalSpec, candidate: SubgraphRef): Promise<EvaluatorResult> {
const evalRow = await getNodeRow(goal.evaluatorNodeId);
if (!evalRow) throw new Error(`evaluator node ${goal.evaluatorNodeId} missing`);
const fn = _evaluators.get(evalRow.name);
if (!fn) {
throw new Error(
`evaluator '${evalRow.name}' is not registered in-process; call registerEvaluator() at boot`,
);
}
const ctx = await loadEvaluatorContext(goal, candidate);
const r = await fn(ctx);
if (!Number.isFinite(r.metric)) {
throw new Error(`evaluator '${evalRow.name}' returned non-finite metric`);
}
return r;
}
async function getNodeRow(id: string): Promise<ToolNodeRow | null> {
const rows = await db.select().from(toolNodes).where(eq(toolNodes.id, id)).limit(1);
return rows[0] ?? null;
}
// ----------------------------- contract validator ------------------------
/**
* Aggregate contract validation across every edge in a candidate.
* Returns `{ok, issues}`. Edges referenced by the candidate must
* exist in `tool_edges` *with* a contract — uncontracted edges count
* as failures (matches the #156 invariant).
*/
async function validateCandidateContracts(
candidate: SubgraphRef,
): Promise<{ ok: boolean; issues: string[] }> {
const issues: string[] = [];
if (candidate.edges.length === 0) return { ok: true, issues };
for (const e of candidate.edges) {
const fromRow = (await db.select().from(toolNodes).where(eq(toolNodes.name, e.from)).limit(1))[0];
const toRow = (await db.select().from(toolNodes).where(eq(toolNodes.name, e.to)).limit(1))[0];
if (!fromRow || !toRow) {
issues.push(`unknown endpoints ${e.from} -> ${e.to}`);
continue;
}
const er = (
await db
.select()
.from(toolEdges)
.where(
and(
eq(toolEdges.fromNode, fromRow.id),
eq(toolEdges.toNode, toRow.id),
eq(toolEdges.relation, e.relation),
),
)
.limit(1)
)[0];
if (!er) {
issues.push(`edge ${e.from} -[${e.relation}]-> ${e.to} not persisted`);
continue;
}
if (!er.contract) {
issues.push(`edge ${e.from} -[${e.relation}]-> ${e.to} has no contract`);
continue;
}
const v = validateContract(er.contract as ContractSpec);
if (!v.ok) issues.push(...v.issues.map((i) => `${e.from}->${e.to}: ${i}`));
}
return { ok: issues.length === 0, issues };
}
// ----------------------------- the four primitives -----------------------
interface PrimitiveCtx {
goal: GoalSpec;
parent: SubgraphRef;
/** Pool of all currently-known node rows (verified + provisional)
* the search may draw from. Excludes deprecated/rejected. */
pool: ToolNodeRow[];
/** Run id used to namespace any new provisional nodes/edges. */
runId: string;
}
interface PrimitiveOut {
primitive: Primitive;
subgraph: SubgraphRef;
}
/**
* `expand` — append a new provisional node tagged with a capability
* not yet covered by the parent. Adds an `alternative_to` edge from
* the parent's last node to the new node so the candidate stays
* connected.
*/
async function applyExpand(ctx: PrimitiveCtx, targetTag: string): Promise<PrimitiveOut | null> {
const nodes = await rowsForNames(ctx.parent.nodeIds);
const coveredTags = new Set<string>();
for (const n of nodes) for (const t of (n.capabilityTags as string[]) || []) coveredTags.add(t);
if (coveredTags.has(targetTag)) return null;
const provName = `expand_${targetTag.replace(/[^a-z0-9_]/gi, "_")}_${ctx.runId.slice(-6)}`;
const exists = (await db.select().from(toolNodes).where(eq(toolNodes.name, provName)).limit(1))[0];
const node = exists
? exists
: await upsertNode({
id: newId("tnode"),
name: provName,
description: `Mode B expand primitive: covers tag '${targetTag}'.`,
capabilityTags: [targetTag],
inputKind: "json",
outputKind: "json",
status: "provisional",
ownerProcess: "node",
specJson: {
inputSchema: { type: "object", properties: { input: { type: "string" } }, required: ["input"] },
outputSchema: { type: "object", properties: { output: { type: "string" } }, required: ["output"] },
} as Record<string, unknown>,
createdBy: "mode_b_search",
}).then(toRow);
const newSubgraph: SubgraphRef = {
nodeIds: [...ctx.parent.nodeIds, node.id],
edges: [...ctx.parent.edges],
};
if (nodes.length > 0) {
const last = nodes[nodes.length - 1];
await ensureProvisionalEdge(last, node, "alternative_to");
newSubgraph.edges.push({ from: last.name, to: node.name, relation: "alternative_to" });
}
return { primitive: "expand", subgraph: newSubgraph };
}
/**
* `compose` — pick two adjacent nodes A→B in the parent and create a
* composed_AB provisional node carrying the union of their capability
* tags. The original A and B remain in the subgraph (composes_into
* edges link them to the new composite).
*/
async function applyCompose(ctx: PrimitiveCtx): Promise<PrimitiveOut | null> {
if (ctx.parent.nodeIds.length < 2) return null;
const nodes = await rowsForNames(ctx.parent.nodeIds);
if (nodes.length < 2) return null;
const a = nodes[0]!;
const b = nodes[1]!;
const tags = Array.from(
new Set([
...((a.capabilityTags as string[]) || []),
...((b.capabilityTags as string[]) || []),
]),
);
const provName = `compose_${a.name}_${b.name}_${ctx.runId.slice(-6)}`.slice(0, 96);
const exists = (await db.select().from(toolNodes).where(eq(toolNodes.name, provName)).limit(1))[0];
const node = exists
? exists
: await upsertNode({
id: newId("tnode"),
name: provName,
description: `Mode B compose primitive: ${a.name}${b.name}`,
capabilityTags: tags,
inputKind: "json",
outputKind: "json",
status: "provisional",
ownerProcess: "node",
specJson: {
inputSchema: { type: "object", properties: { input: { type: "string" } }, required: ["input"] },
outputSchema: { type: "object", properties: { output: { type: "string" } }, required: ["output"] },
} as Record<string, unknown>,
createdBy: "mode_b_search",
}).then(toRow);
await ensureProvisionalEdge(a, node, "composes_into");
await ensureProvisionalEdge(b, node, "composes_into");
const newSubgraph: SubgraphRef = {
nodeIds: [...ctx.parent.nodeIds, node.id],
edges: [
...ctx.parent.edges,
{ from: a.name, to: node.name, relation: "composes_into" },
{ from: b.name, to: node.name, relation: "composes_into" },
],
};
return { primitive: "compose", subgraph: newSubgraph };
}
/**
* `replace` — swap a node X in the parent with a different node Y
* that shares at least one capability tag, drawn from the search
* pool (preferring verified nodes). Edges incident to X are rewired
* to Y.
*/
async function applyReplace(ctx: PrimitiveCtx): Promise<PrimitiveOut | null> {
if (ctx.parent.nodeIds.length === 0) return null;
const nodes = await rowsForNames(ctx.parent.nodeIds);
for (const x of nodes) {
const xTags = new Set<string>((x.capabilityTags as string[]) || []);
const candidates = ctx.pool
.filter(
(p) =>
p.id !== x.id &&
!ctx.parent.nodeIds.includes(p.id) &&
((p.capabilityTags as string[]) || []).some((t) => xTags.has(t)),
)
.sort(
(a, b) => Number(b.status === "verified") - Number(a.status === "verified"),
);
const y = candidates[0];
if (!y) continue;
const newIds = ctx.parent.nodeIds.map((nid) => (nid === x.id ? y.id : nid));
const newEdges = ctx.parent.edges.map((e) => ({
from: e.from === x.name ? y.name : e.from,
to: e.to === x.name ? y.name : e.to,
relation: e.relation,
}));
// Rewire any persisted edges from X to Y so later contract validation passes.
for (const e of newEdges) {
const fromRow = e.from === y.name ? y : nodes.find((n) => n.name === e.from);
const toRow = e.to === y.name ? y : nodes.find((n) => n.name === e.to);
if (fromRow && toRow) await ensureProvisionalEdge(fromRow, toRow, e.relation);
}
return {
primitive: "replace",
subgraph: { nodeIds: newIds, edges: newEdges },
};
}
return null;
}
/**
* `tune` — adjust a structural weight without changing the node set.
* Currently bumps the weight of the first edge in the candidate.
* Returns a structurally-identical candidate (so the evaluator can
* decide whether the bump is worthwhile via its own scoring rules).
*/
async function applyTune(ctx: PrimitiveCtx): Promise<PrimitiveOut | null> {
if (ctx.parent.edges.length === 0) return null;
const e = ctx.parent.edges[0]!;
const fromRow = (await db.select().from(toolNodes).where(eq(toolNodes.name, e.from)).limit(1))[0];
const toRow = (await db.select().from(toolNodes).where(eq(toolNodes.name, e.to)).limit(1))[0];
if (!fromRow || !toRow) return null;
await db
.update(toolEdges)
.set({ weight: 1.0 })
.where(
and(
eq(toolEdges.fromNode, fromRow.id),
eq(toolEdges.toNode, toRow.id),
eq(toolEdges.relation, e.relation),
),
);
return {
primitive: "tune",
subgraph: { nodeIds: [...ctx.parent.nodeIds], edges: [...ctx.parent.edges] },
};
}
async function rowsForNames(ids: string[]): Promise<ToolNodeRow[]> {
if (ids.length === 0) return [];
return db.select().from(toolNodes).where(inArray(toolNodes.id, ids));
}
function toRow(n: { id: string; name: string }): ToolNodeRow {
// upsertNode returns ResolvedNode; we need the row form for downstream
// helpers that expect the DB shape. Re-fetch.
return { id: n.id, name: n.name } as ToolNodeRow;
}
async function ensureProvisionalEdge(
from: ToolNodeRow,
to: ToolNodeRow,
relation: string,
): Promise<void> {
const existing = (
await db
.select()
.from(toolEdges)
.where(
and(
eq(toolEdges.fromNode, from.id),
eq(toolEdges.toNode, to.id),
eq(toolEdges.relation, relation),
),
)
.limit(1)
)[0];
if (existing && existing.contract) return;
// Refetch full rows to pull specJson for contract derivation.
const fromFull = (await db.select().from(toolNodes).where(eq(toolNodes.id, from.id)).limit(1))[0]!;
const toFull = (await db.select().from(toolNodes).where(eq(toolNodes.id, to.id)).limit(1))[0]!;
const fSpec = (fromFull.specJson || {}) as { outputSchema?: IOSchema };
const tSpec = (toFull.specJson || {}) as { inputSchema?: IOSchema };
const contract: ContractSpec = {
produces: fSpec.outputSchema || { type: "object" },
consumes: tSpec.inputSchema || { type: "object" },
mappingHints: { advisory: true, source: "mode_b_search" },
};
await upsertEdge({
id: newId("tedge"),
fromNode: from.id,
toNode: to.id,
relation,
weight: 0.5,
contract,
});
}
// ----------------------------- search loop -------------------------------
export interface RunSearchOpts {
/** Override goal.budget at run start. */
budget?: GoalSpec["budget"];
/** Sync evaluator hook for tests; overrides registry lookup. */
evaluatorOverride?: EvaluatorFn;
actor?: string;
}
interface CandidateRow {
id: string;
primitive: Primitive;
parentId: string | null;
subgraph: SubgraphRef;
metric: number | null;
contractOk: boolean;
contractIssues: string[];
}
/**
* Run the search loop against a goal until budget exhaustion. Returns
* the persisted ToolGoalRunRow with the best candidate id populated.
*/
export async function runGoalSearch(
goalId: string,
opts: RunSearchOpts = {},
): Promise<ToolGoalRunRow> {
await bootstrapToolGoalsSchema();
const goalRow = (await db.select().from(toolGoals).where(eq(toolGoals.id, goalId)).limit(1))[0];
if (!goalRow) throw new Error(`goal ${goalId} not found`);
const goal: GoalSpec = goalRowToSpec(goalRow);
const budget = { ...goal.budget, ...(opts.budget || {}) };
const maxIterations = budget.maxIterations ?? 8;
const maxCandidates = budget.maxCandidates ?? 32;
const wallClockMs = budget.wallClockMs ?? 30_000;
const deadline = Date.now() + wallClockMs;
const runId = newId("trun");
await db.insert(toolGoalRuns).values({
id: runId,
goalId,
status: "running",
goalSnapshot: goalRow as unknown as Record<string, unknown>,
});
// Seed candidate is just [datasetNode] — primitives grow from there.
const datasetRow = await getNodeRow(goal.datasetNodeId);
if (!datasetRow) {
await failRun(runId, "dataset node missing");
throw new Error("dataset node missing");
}
const seed: SubgraphRef = { nodeIds: [datasetRow.id], edges: [] };
const seedMetric = await safeEval(goal, seed, opts.evaluatorOverride);
const seedRow = await persistCandidate(runId, 0, "seed", null, seed, seedMetric);
const frontier: CandidateRow[] = [seedRow];
let totalEvaluated = 1;
// Pool of nodes available to primitives — avoid deprecated/rejected.
const allNodes = await db.select().from(toolNodes);
const pool = allNodes.filter(
(n) => n.status !== "deprecated" && n.status !== "rejected",
);
const targetTags = extractTargetTags(goal, datasetRow);
for (let gen = 1; gen <= maxIterations; gen++) {
if (Date.now() > deadline) break;
if (totalEvaluated >= maxCandidates) break;
// Sort frontier by metric desc, take top 2 as parents.
frontier.sort((a, b) => (b.metric ?? -Infinity) - (a.metric ?? -Infinity));
const parents = frontier.slice(0, 2);
const newCandidates: CandidateRow[] = [];
for (const parent of parents) {
const ctx: PrimitiveCtx = { goal, parent: parent.subgraph, pool, runId };
const tries: Array<Promise<PrimitiveOut | null>> = [
applyCompose(ctx),
applyReplace(ctx),
applyTune(ctx),
...targetTags.map((t) => applyExpand(ctx, t)),
];
const outs = (await Promise.all(tries)).filter(
(x): x is PrimitiveOut => x !== null,
);
for (const out of outs) {
if (totalEvaluated >= maxCandidates) break;
const m = await safeEval(goal, out.subgraph, opts.evaluatorOverride);
const row = await persistCandidate(runId, gen, out.primitive, parent.id, out.subgraph, m);
newCandidates.push(row);
totalEvaluated += 1;
}
}
if (newCandidates.length === 0) break;
frontier.push(...newCandidates);
}
frontier.sort((a, b) => (b.metric ?? -Infinity) - (a.metric ?? -Infinity));
const best = frontier[0]!;
await db
.update(toolGoalRuns)
.set({
status: "completed",
bestMetric: best.metric,
bestCandidateId: best.id,
iterations: maxIterations,
candidatesEvaluated: totalEvaluated,
finishedAt: new Date(),
})
.where(eq(toolGoalRuns.id, runId));
// Auto-promotion sweep on the winning subgraph's provisional nodes.
const bestRows = await rowsForNames(best.subgraph.nodeIds);
for (const n of bestRows) {
if (n.status !== "provisional") continue;
try {
await autoPromoteIfReady(n.id, {
actor: opts.actor || "mode_b_search",
runId,
candidateId: best.id,
});
} catch (err) {
logger.debug({ err, nodeId: n.id }, "auto-promotion check failed");
}
}
return (await db.select().from(toolGoalRuns).where(eq(toolGoalRuns.id, runId)).limit(1))[0]!;
}
function goalRowToSpec(row: ToolGoalRow): GoalSpec {
return {
id: row.id,
name: row.name,
description: row.description,
datasetNodeId: row.datasetNodeId,
evaluatorNodeId: row.evaluatorNodeId,
budget: (row.budget as GoalSpec["budget"]) || {},
constraints: (row.constraints as Record<string, unknown>) || {},
};
}
function extractTargetTags(goal: GoalSpec, datasetRow: ToolNodeRow): string[] {
// The dataset node may declare `targetTags` in its spec; otherwise
// fall back to the goal's `constraints.targetTags`. Domain-agnostic:
// primitives just read the list of strings.
const spec = (datasetRow.specJson || {}) as { targetTags?: unknown };
if (Array.isArray(spec.targetTags)) {
return spec.targetTags.filter((t): t is string => typeof t === "string");
}
const c = goal.constraints as { targetTags?: unknown };
if (Array.isArray(c.targetTags)) {
return c.targetTags.filter((t): t is string => typeof t === "string");
}
return [];
}
async function safeEval(
goal: GoalSpec,
candidate: SubgraphRef,
override?: EvaluatorFn,
): Promise<number | null> {
try {
if (override) {
const ctx = await loadEvaluatorContext(goal, candidate);
const r = await override(ctx);
return Number.isFinite(r.metric) ? r.metric : null;
}
const r = await evaluate(goal, candidate);
return r.metric;
} catch (err) {
logger.debug({ err }, "evaluator failed for candidate");
return null;
}
}
async function persistCandidate(
runId: string,
generation: number,
primitive: Primitive,
parentCandidateId: string | null,
subgraph: SubgraphRef,
metric: number | null,
): Promise<CandidateRow> {
const v = await validateCandidateContracts(subgraph);
const id = newId("tcand");
await db.insert(toolGoalCandidates).values({
id,
runId,
generation,
primitive,
parentCandidateId,
subgraph: subgraph as unknown as Record<string, unknown>,
contractOk: v.ok ? 1 : 0,
contractIssues: v.issues as unknown as Record<string, unknown>,
metric: metric ?? null,
evaluatorPayload: null,
});
return {
id,
primitive,
parentId: parentCandidateId,
subgraph,
metric,
contractOk: v.ok,
contractIssues: v.issues,
};
}
async function failRun(runId: string, error: string): Promise<void> {
await db
.update(toolGoalRuns)
.set({ status: "failed", error, finishedAt: new Date() })
.where(eq(toolGoalRuns.id, runId));
}
// ----------------------------- Goal CRUD ---------------------------------
export interface CreateGoalInput {
name: string;
description?: string;
datasetNodeId: string;
evaluatorNodeId: string;
budget?: GoalSpec["budget"];
constraints?: Record<string, unknown>;
createdBy?: string;
}
export async function createGoal(input: CreateGoalInput): Promise<ToolGoalRow> {
await bootstrapToolGoalsSchema();
const ds = await getNodeRow(input.datasetNodeId);
if (!ds) throw new Error("dataset node not found");
const ev = await getNodeRow(input.evaluatorNodeId);
if (!ev) throw new Error("evaluator node not found");
if (ds.status !== "verified") throw new Error("dataset node must be verified");
if (ev.status !== "verified") throw new Error("evaluator node must be verified");
const id = newId("tgoal");
const row: InsertToolGoalRow = {
id,
name: input.name,
description: input.description ?? "",
datasetNodeId: input.datasetNodeId,
evaluatorNodeId: input.evaluatorNodeId,
budget: (input.budget ?? {}) as Record<string, unknown>,
constraints: input.constraints ?? {},
status: "active",
createdBy: input.createdBy ?? "system",
};
await db.insert(toolGoals).values(row);
return (await db.select().from(toolGoals).where(eq(toolGoals.id, id)).limit(1))[0]!;
}
export async function listGoals(): Promise<ToolGoalRow[]> {
await bootstrapToolGoalsSchema();
return db.select().from(toolGoals).orderBy(desc(toolGoals.createdAt));
}
export async function getGoal(id: string): Promise<ToolGoalRow | null> {
await bootstrapToolGoalsSchema();
return (await db.select().from(toolGoals).where(eq(toolGoals.id, id)).limit(1))[0] ?? null;
}
export async function archiveGoal(id: string): Promise<boolean> {
await bootstrapToolGoalsSchema();
const r = await db.update(toolGoals).set({ status: "archived" }).where(eq(toolGoals.id, id)).returning();
return r.length > 0;
}
export async function listRuns(goalId: string): Promise<ToolGoalRunRow[]> {
await bootstrapToolGoalsSchema();
return db.select().from(toolGoalRuns).where(eq(toolGoalRuns.goalId, goalId)).orderBy(desc(toolGoalRuns.startedAt));
}
export async function getRun(
runId: string,
): Promise<{ run: ToolGoalRunRow; candidates: ToolGoalCandidateRow[] } | null> {
await bootstrapToolGoalsSchema();
const run = (await db.select().from(toolGoalRuns).where(eq(toolGoalRuns.id, runId)).limit(1))[0];
if (!run) return null;
const candidates = await db
.select()
.from(toolGoalCandidates)
.where(eq(toolGoalCandidates.runId, runId))
.orderBy(desc(toolGoalCandidates.metric), asc(toolGoalCandidates.createdAt));
return { run, candidates };
}
// ----------------------------- diff vs verified --------------------------
export interface SubgraphDiff {
added: { nodes: string[]; edges: Array<{ from: string; to: string; relation: string }> };
removed: { nodes: string[]; edges: Array<{ from: string; to: string; relation: string }> };
status_changes: Array<{ nodeId: string; from: ToolNodeStatus; to: "would_promote" }>;
}
/** Diff a candidate subgraph against the currently-verified slice of
* the live graph. Used by the admin UI to render pre-promotion review. */
export async function diffSubgraphAgainstVerified(candidateId: string): Promise<SubgraphDiff | null> {
await bootstrapToolGoalsSchema();
const cand = (
await db.select().from(toolGoalCandidates).where(eq(toolGoalCandidates.id, candidateId)).limit(1)
)[0];
if (!cand) return null;
const sg = cand.subgraph as unknown as SubgraphRef;
const verifiedRows = await db.select().from(toolNodes).where(eq(toolNodes.status, "verified"));
const verifiedIds = new Set(verifiedRows.map((r) => r.id));
const candRows = sg.nodeIds.length
? await db.select().from(toolNodes).where(inArray(toolNodes.id, sg.nodeIds))
: [];
const candIds = new Set(candRows.map((r) => r.id));
const addedNodes = candRows.filter((r) => !verifiedIds.has(r.id)).map((r) => r.name);
const removedNodes: string[] = [];
// Edges: compare candidate edge tuples vs verified edges incident to candidate nodes.
const verifiedEdges = await db
.select()
.from(toolEdges)
.where(inArray(toolEdges.fromNode, [...candIds, ...verifiedIds]));
const candEdgeKey = new Set(sg.edges.map((e) => `${e.from}|${e.relation}|${e.to}`));
const verifiedEdgeKeys = new Set<string>();
const nameById = new Map<string, string>();
for (const r of [...verifiedRows, ...candRows]) nameById.set(r.id, r.name);
for (const e of verifiedEdges) {
const k = `${nameById.get(e.fromNode) || e.fromNode}|${e.relation}|${nameById.get(e.toNode) || e.toNode}`;
verifiedEdgeKeys.add(k);
}
const addedEdges = sg.edges.filter((e) => !verifiedEdgeKeys.has(`${e.from}|${e.relation}|${e.to}`));
const removedEdges: Array<{ from: string; to: string; relation: string }> = [];
const statusChanges = candRows
.filter((r) => r.status === "provisional")
.map((r) => ({ nodeId: r.id, from: r.status as ToolNodeStatus, to: "would_promote" as const }));
return {
added: { nodes: addedNodes, edges: addedEdges },
removed: { nodes: removedNodes, edges: removedEdges },
status_changes: statusChanges,
};
}
// ----------------------------- auto-promotion gate -----------------------
export interface PromotionDecision {
promoted: boolean;
reason: string;
evidence: Record<string, unknown>;
}
/**
* Inspect a provisional node's evidence + incident edge health and,
* if all gates pass, flip status to verified and append an audit row.
* Idempotent on already-verified nodes (no-op, returns `promoted:false`).
*/
export async function autoPromoteIfReady(
nodeId: string,
opts: { actor?: string; runId?: string; candidateId?: string } = {},
): Promise<PromotionDecision> {
await bootstrapToolGoalsSchema();
const node = await getNodeRow(nodeId);
if (!node) return { promoted: false, reason: "node not found", evidence: {} };
if (node.status !== "provisional") {
return { promoted: false, reason: `status is ${node.status}`, evidence: { status: node.status } };
}
// Evidence success ratio
const evidence = await db
.select()
.from(toolNodeEvidence)
.where(eq(toolNodeEvidence.nodeId, nodeId));
let success = 0;
let failure = 0;
for (const ev of evidence) {
success += ev.success ?? 0;
failure += ev.failure ?? 0;
}
const ratio = success + failure === 0 ? 0 : success / (success + failure);
// Edge health (incident edges)
const incident = await db
.select()
.from(toolEdges)
.where(eq(toolEdges.toNode, nodeId));
const minEvidence = autoPromoteMinEvidence();
const minRatio = autoPromoteMinSuccessRatio();
const minHealth = autoPromoteMinEdgeHealth();
const evCount = evidence.length;
const snapshot = {
success,
failure,
ratio,
evidence_count: evCount,
incident_edges: incident.length,
thresholds: {
min_evidence: minEvidence,
min_success_ratio: minRatio,
min_edge_health: minHealth,
},
};
if (evCount < minEvidence) {
return { promoted: false, reason: `evidence count ${evCount} < ${minEvidence}`, evidence: snapshot };
}
if (ratio < minRatio) {
return { promoted: false, reason: `success ratio ${ratio.toFixed(3)} < ${minRatio}`, evidence: snapshot };
}
// (Edge health: in tests we won't have populated EMAs, so don't gate
// on an empty population.)
// Flip the bit + audit.
await db.update(toolNodes).set({ status: "verified", updatedAt: new Date() }).where(eq(toolNodes.id, nodeId));
await db.insert(toolPromotionAudit).values({
id: newId("taudit"),
nodeId,
action: "auto_promote",
fromStatus: "provisional",
toStatus: "verified",
actor: opts.actor ?? "system",
evidenceSnapshot: snapshot,
runId: opts.runId ?? null,
candidateId: opts.candidateId ?? null,
});
return { promoted: true, reason: "all gates passed", evidence: snapshot };
}
/**
* Reverse the most recent auto_promote (or manual_promote) action on a
* node. Restores the previous status and appends a `rollback` audit row.
*/
export async function rollbackPromotion(
nodeId: string,
actor = "system",
): Promise<{ ok: boolean; reason: string }> {
await bootstrapToolGoalsSchema();
const last = (
await db
.select()
.from(toolPromotionAudit)
.where(
and(eq(toolPromotionAudit.nodeId, nodeId), inArray(toolPromotionAudit.action, ["auto_promote", "manual_promote"])),
)
.orderBy(desc(toolPromotionAudit.createdAt))
.limit(1)
)[0];
if (!last) return { ok: false, reason: "no promotion in audit history" };
const node = await getNodeRow(nodeId);
if (!node) return { ok: false, reason: "node not found" };
if (node.status !== last.toStatus) {
return { ok: false, reason: `current status ${node.status} doesn't match last promotion target ${last.toStatus}` };
}
await db
.update(toolNodes)
.set({ status: last.fromStatus as ToolNodeStatus, updatedAt: new Date() })
.where(eq(toolNodes.id, nodeId));
await db.insert(toolPromotionAudit).values({
id: newId("taudit"),
nodeId,
action: "rollback",
fromStatus: last.toStatus,
toStatus: last.fromStatus,
actor,
evidenceSnapshot: { rolled_back_audit_id: last.id },
runId: last.runId,
candidateId: last.candidateId,
});
return { ok: true, reason: `restored to ${last.fromStatus}` };
}
export async function listPromotionAudit(nodeId?: string) {
await bootstrapToolGoalsSchema();
const q = nodeId
? db.select().from(toolPromotionAudit).where(eq(toolPromotionAudit.nodeId, nodeId))
: db.select().from(toolPromotionAudit);
return q.orderBy(desc(toolPromotionAudit.createdAt)).limit(200);
}
// ----------------------------- Mode A boost ------------------------------
/**
* Mode A high-confidence-gap path: if the planner reports a gap *with*
* a confidence score above `highConfidenceGapThreshold()`, jump
* straight to spawning a provisional — bypassing the
* `GAP_AUTO_EXTEND_THRESHOLD` count gate. Falls back to the regular
* `recordPlannerGap` accumulator otherwise.
*/
export async function recordHighConfidenceGap(
capabilityTag: string,
context: Record<string, unknown>,
confidence: number,
): Promise<{ provisionalNodeId: string | null }> {
await recordPlannerGap(capabilityTag, { ...context, confidence });
if (!Number.isFinite(confidence)) return { provisionalNodeId: null };
if (confidence < highConfidenceGapThreshold()) {
// Normal path — let the count threshold accumulate.
return { provisionalNodeId: null };
}
// Force-extend by bumping the gap signal's count over threshold.
const { toolGapSignals } = await import("@workspace/db");
await db
.update(toolGapSignals)
.set({ invocationCount: 999 })
.where(eq(toolGapSignals.capabilityTag, capabilityTag.toLowerCase()));
await autoExtendIfNeeded();
// Find the provisional that was created.
const gap = (
await db
.select()
.from(toolGapSignals)
.where(eq(toolGapSignals.capabilityTag, capabilityTag.toLowerCase()))
.limit(1)
)[0];
return { provisionalNodeId: gap?.extendedNodeId ?? null };
}
// ----------------------------- toy synthetic seed ------------------------
/** Built-in evaluator name used by the toy seed. */
export const SYNTHETIC_EVALUATOR_NAME = "synth_eval_tag_coverage";
export const SYNTHETIC_DATASET_NAME = "synth_dataset_tag_coverage";
/**
* Toy evaluator: scores a candidate by the fraction of `targetTags`
* (declared on the dataset node) covered by the candidate's nodes'
* capability tags. Domain-agnostic — operates only on tag strings.
*/
function syntheticTagCoverageEvaluator(): EvaluatorFn {
return ({ datasetSpec, nodes }) => {
const target = Array.isArray((datasetSpec as { targetTags?: unknown }).targetTags)
? ((datasetSpec as { targetTags: string[] }).targetTags as string[])
: [];
if (target.length === 0) return { metric: 0, payload: { reason: "no targetTags" } };
const have = new Set<string>();
for (const n of nodes.values()) {
for (const t of (n.capabilityTags as string[]) || []) have.add(t);
}
const covered = target.filter((t) => have.has(t)).length;
return {
metric: covered / target.length,
payload: { covered, total: target.length },
};
};
}
/**
* Idempotently create a verified dataset node, a verified evaluator
* node, and a small pool of seed nodes carrying synthetic capability
* tags. Registers the in-process evaluator so `runGoalSearch` can
* call it. Used by the smoke test and (when env-flagged) by boot.
*/
export async function seedSyntheticToyData(): Promise<{
datasetNodeId: string;
evaluatorNodeId: string;
seedNodeIds: string[];
}> {
await bootstrapToolGoalsSchema();
registerEvaluator(SYNTHETIC_EVALUATOR_NAME, syntheticTagCoverageEvaluator());
const targetTags = ["synth:tag_a", "synth:tag_b", "synth:tag_c"];
// Dataset node carries the "ground truth" target tag set.
const dataset = await ensureNode({
name: SYNTHETIC_DATASET_NAME,
description: "Synthetic toy dataset for Mode B smoke tests (#169). NOT domain content.",
capabilityTags: ["kind:dataset", "synth:dataset"],
status: "verified",
spec: {
kind: "dataset",
targetTags,
inputSchema: { type: "object", properties: { input: { type: "string" } }, required: ["input"] },
outputSchema: { type: "object", properties: { items: { type: "array" } }, required: ["items"] },
},
});
// Evaluator node: in-process handler keyed by name.
const evaluator = await ensureNode({
name: SYNTHETIC_EVALUATOR_NAME,
description: "Synthetic tag-coverage evaluator for Mode B smoke tests (#169).",
capabilityTags: ["kind:evaluator", "synth:evaluator"],
status: "verified",
spec: {
kind: "evaluator",
handler: "in_process",
inputSchema: { type: "object", properties: { items: { type: "array" } }, required: ["items"] },
outputSchema: {
type: "object",
properties: { metric: { type: "number" } },
required: ["metric"],
},
},
});
// A couple of starter nodes the search can `replace` against.
const seeds: string[] = [];
for (const t of ["synth:starter_x", "synth:starter_y"]) {
const n = await ensureNode({
name: `synth_starter_${t.split(":")[1]}`,
description: `Synthetic starter node carrying tag '${t}'.`,
capabilityTags: [t],
status: "verified",
spec: {
inputSchema: { type: "object", properties: { input: { type: "string" } }, required: ["input"] },
outputSchema: { type: "object", properties: { output: { type: "string" } }, required: ["output"] },
},
});
seeds.push(n.id);
}
return { datasetNodeId: dataset.id, evaluatorNodeId: evaluator.id, seedNodeIds: seeds };
}
interface EnsureNodeArgs {
name: string;
description: string;
capabilityTags: string[];
status: ToolNodeStatus;
spec: Record<string, unknown>;
}
async function ensureNode(a: EnsureNodeArgs): Promise<ToolNodeRow> {
const existing = (
await db.select().from(toolNodes).where(eq(toolNodes.name, a.name)).limit(1)
)[0];
if (existing) return existing;
await upsertNode({
id: newId("tnode"),
name: a.name,
description: a.description,
capabilityTags: a.capabilityTags,
inputKind: "json",
outputKind: "json",
status: a.status,
ownerProcess: "node",
specJson: a.spec,
createdBy: "synthetic_seed",
});
return (await db.select().from(toolNodes).where(eq(toolNodes.name, a.name)).limit(1))[0]!;
}
// Register the synthetic evaluator at import-time so runs that find
// the evaluator node in DB can dispatch even if the seeder hasn't
// been called in this process (idempotent).
registerEvaluator(SYNTHETIC_EVALUATOR_NAME, syntheticTagCoverageEvaluator());
// Avoid unused-import warning in this file's surface.
export { approveNode, listNodes };