/** * tool-graph — cross-process tool capability graph access layer. * * Both the Node `api-server` and the Python `research-engine` read and * write the same Postgres tables (`tool_nodes`, `tool_edges`, * `tool_node_evidence`, `tool_gap_signals`). This file is the Node-side * client; `backend/tools/graph_client.py` mirrors the same operations on * the Python side. * * Public surface: * - resolveSubgraph(intent) — pick a connected slice of verified * nodes for the current user turn. * - getNode(idOrName) — single-node lookup. * - listNodes({status}) — list (used by admin UI + seeders). * - recordInvocation(...) — telemetry per tool call. * - recordPlannerGap(tag, context) — telemetry when planner finds no fit. * - autoExtendIfNeeded() — promote crossed-threshold gap * signals into provisional nodes. * - approveNode / rejectNode — admin actions. */ import { and, asc, desc, eq, gt, inArray, isNull, lt, lte, sql } from "drizzle-orm"; import { db, toolNodes, toolEdges, toolNodeEvidence, toolGapSignals, toolEdgeHealth, toolDeprecationCandidates, toolNodesArchive, toolEdgesArchive, toolNodeEvidenceArchive, toolSummaryPaths, type ToolNodeRow, type ToolEdgeRow, type ToolEdgeHealthRow, type InsertToolNodeRow, type InsertToolEdgeRow, type ToolDeprecationCandidateRow, type ToolSummaryPathRow, type InsertToolSummaryPathRow, } from "@workspace/db"; import { newId } from "./ids"; import { logger } from "./logger"; import { evaluateSeam, type SeamSample } from "./seam-health-detector"; import { searchTemplates, recordTemplateOffered, recordTemplateChoice, recordTemplatePromoteResult, persistTemplateOnPromote, applyParameterizationMarkers, type TemplateChoice, type TemplateMatch, type SeamFingerprintInput, } from "./spawn-templates"; export const GAP_AUTO_EXTEND_THRESHOLD = 3; export const SUBGRAPH_MAX_NODES = 12; export type ToolNodeStatus = | "verified" | "provisional" | "rejected" /** * `composition_alias` — a node that represents a coarse capability that * has been decomposed into a chain of smaller atomic nodes. The alias * stays callable for backwards compatibility (old prompts and saved * chats keep working) but is **never selected by the planner** — * `resolveSubgraph` and `listNodes({status:"verified"})` exclude it * automatically because the status is not "verified". The alias's * historical evidence rows remain attached so #157 can still see the * pre-decomposition behaviour. Introduced in Task #156. */ | "composition_alias" /** * `deprecated` — a node that a human reviewer has approved for * retirement (Task #159). It is excluded from `resolveSubgraph` * (planner cannot select it) and from `listNodes({status:"verified"})`, * but it still exists in the hot graph so direct references and * outstanding evidence remain inspectable for audit. After * `DEPRECATION_ARCHIVE_AFTER_DAYS` (default 180) it is moved to the * `tool_*_archive` tables by the weekly archive job and the hot rows * are deleted in a single transaction. */ | "deprecated"; export type OwnerProcess = "node" | "python"; /** * A minimal JSON Schema subset we use to describe seam wires. Only the * shapes we care about for structural compatibility checking — full * JSON Schema validation lives elsewhere if/when needed. */ export interface IOSchema { type?: "object" | "array" | "string" | "number" | "integer" | "boolean" | "null"; properties?: Record; required?: string[]; items?: IOSchema; description?: string; /** Free-form passthrough for extra hints (e.g. `enum`, `format`). */ [k: string]: unknown; } /** * `ContractSpec` is the seam contract attached to an edge: * - `produces` — the shape the upstream node emits. * - `consumes` — the shape the downstream node requires. * The seed-time validator enforces that every required field on * `consumes` exists with a compatible type on `produces`. */ export interface ContractSpec { produces: IOSchema; consumes: IOSchema; /** * Optional reviewer-facing notes about how the upstream's produced * fields map onto the downstream's consumed fields, plus advisory * flags. Heuristic LLM-tool adjacencies set `advisory:true` so * downstream seam-health (#157) can downgrade their structural * mismatches from hard failures to non-fatal contractIssues. */ mappingHints?: Record; } export interface ResolvedNode { id: string; name: string; description: string; capabilityTags: string[]; status: ToolNodeStatus; ownerProcess: OwnerProcess; spec: Record; costHint: number | null; latencyHintMs: number | null; } export interface ResolvedEdge { fromNode: string; toNode: string; relation: string; weight: number; /** Optional seam contract; populated for edges added under Task #156. */ contract: ContractSpec | null; } export interface ResolvedSubgraph { nodes: ResolvedNode[]; edges: ResolvedEdge[]; /** Capability tags that drove the resolution. */ matchedTags: string[]; } // ---------------------------------------------------------------- helpers function row2node(r: ToolNodeRow): ResolvedNode { return { id: r.id, name: r.name, description: r.description, capabilityTags: Array.isArray(r.capabilityTags) ? (r.capabilityTags as string[]) : [], status: r.status as ToolNodeStatus, ownerProcess: r.ownerProcess as OwnerProcess, spec: (r.specJson as Record) || {}, costHint: r.costHint ?? null, latencyHintMs: r.latencyHintMs ?? null, }; } function row2edge(r: ToolEdgeRow): ResolvedEdge { return { fromNode: r.fromNode, toNode: r.toNode, relation: r.relation, weight: r.weight, contract: (r.contract as ContractSpec | null) ?? null, }; } // ---------------------------------------------------------------- contracts export interface ContractValidationIssue { field: string; reason: "missing_in_produces" | "type_mismatch"; expectedType?: string; actualType?: string; } export interface ContractValidationResult { ok: boolean; issues: ContractValidationIssue[]; } /** * Structural compatibility check for a single seam contract: * every field marked `required` on the consumes side must exist on the * produces side, and the declared `type` (when both sides specify one) * must match. We deliberately keep this shallow — it's a wire-shape * compatibility check, not a full JSON Schema validator. Object types * with no `required` array trivially pass, and unspecified types are * treated as "any". Used by `tool-graph-seed.ts` at boot to refuse a * structurally broken graph and (eventually, in #157) by the seam * health detector to flag drift. */ export function validateContract( contract: ContractSpec, ): ContractValidationResult { const issues: ContractValidationIssue[] = []; const producesProps = contract.produces?.properties ?? {}; const consumesProps = contract.consumes?.properties ?? {}; const required = contract.consumes?.required ?? []; for (const field of required) { const c = consumesProps[field]; const p = producesProps[field]; if (!p) { issues.push({ field, reason: "missing_in_produces" }); continue; } const ct = c?.type; const pt = p?.type; if (ct && pt && ct !== pt) { issues.push({ field, reason: "type_mismatch", expectedType: String(ct), actualType: String(pt), }); } } return { ok: issues.length === 0, issues }; } // ---------------------------------------------------------------- intent /** * Lightweight semantic-ish capability tagger. Takes the user turn text * and returns the set of capability tags that look relevant. This is the * replacement for the regex `looksLikeResearchIntent` gate. Tags are * intentionally coarse — `resolveSubgraph` then expands the matched tag * set into a connected node slice via the graph edges. */ // Single source of truth: lib/tool-graph/intent-rules.json. Both this file // and backend/tools/graph_client.py compile their rules from that JSON so // the Node and Python sides cannot drift. The shared fixture // `__tests__/fixtures/intent-cases.json` is asserted against both sides. import intentRulesData from "../../../../lib/tool-graph/intent-rules.json"; interface IntentRuleSpec { tag: string; patterns: string[]; } const INTENT_RULES: Array<{ tag: string; patterns: RegExp[] }> = ( (intentRulesData as { rules: IntentRuleSpec[] }).rules || [] ).map((r) => ({ tag: r.tag, patterns: r.patterns.map((p) => new RegExp(p, "i")), })); export function tagsForIntent(text: string): string[] { const t = (text || "").trim(); if (!t) return []; const tags = new Set(); for (const rule of INTENT_RULES) { if (rule.patterns.some((re) => re.test(t))) tags.add(rule.tag); } return Array.from(tags); } // ---------------------------------------------------------------- queries export async function listNodes( opts: { status?: ToolNodeStatus | "any" } = {}, ): Promise { const status = opts.status ?? "verified"; const rows = status === "any" ? await db.select().from(toolNodes) : await db.select().from(toolNodes).where(eq(toolNodes.status, status)); return rows.map(row2node); } export async function listEdges( nodeIds?: string[], ): Promise { const rows = nodeIds && nodeIds.length ? await db .select() .from(toolEdges) .where( sql`${toolEdges.fromNode} IN (${sql.join( nodeIds.map((id) => sql`${id}`), sql`, `, )}) OR ${toolEdges.toNode} IN (${sql.join( nodeIds.map((id) => sql`${id}`), sql`, `, )})`, ) : await db.select().from(toolEdges); return rows.map(row2edge); } export async function getNode(idOrName: string): Promise { const rows = await db .select() .from(toolNodes) .where(sql`${toolNodes.id} = ${idOrName} OR ${toolNodes.name} = ${idOrName}`) .limit(1); return rows[0] ? row2node(rows[0]) : null; } export async function getNodeByName(name: string): Promise { const rows = await db .select() .from(toolNodes) .where(eq(toolNodes.name, name)) .limit(1); return rows[0] ? row2node(rows[0]) : null; } // ---------------------------------------------------------------- subgraph /** * Resolve a subgraph for one user turn: * 1. Tag the intent text. * 2. Pick all verified nodes whose capability tags overlap with the * intent tags ("seed nodes"). * 3. Expand by 1 hop along the edges so the model sees the immediate * connections (e.g. literature_search → summarization). * 4. Cap to `SUBGRAPH_MAX_NODES`. * * If `includeShadowNodeIds` is provided, those provisional nodes are * appended to the result so a reviewer can drive a shadow turn through * them. */ export async function resolveSubgraph(args: { intentText: string; includeShadowNodeIds?: string[]; }): Promise { const matchedTags = tagsForIntent(args.intentText); if (matchedTags.length === 0) { return { nodes: [], edges: [], matchedTags }; } const verified = await listNodes({ status: "verified" }); const seeds = verified.filter((n) => n.capabilityTags.some((t) => matchedTags.includes(t)), ); if (seeds.length === 0) { return { nodes: [], edges: [], matchedTags }; } const seedIds = new Set(seeds.map((n) => n.id)); // 1-hop expansion via edges. const allEdges = await listEdges(Array.from(seedIds)); const neighbors = new Set(); for (const e of allEdges) { if (seedIds.has(e.fromNode)) neighbors.add(e.toNode); if (seedIds.has(e.toNode)) neighbors.add(e.fromNode); } const neighborRows = neighbors.size ? await db .select() .from(toolNodes) .where( and( inArray(toolNodes.id, Array.from(neighbors)), eq(toolNodes.status, "verified"), ), ) : []; const finalMap = new Map(); for (const n of seeds) finalMap.set(n.id, n); for (const r of neighborRows) { const n = row2node(r); if (!finalMap.has(n.id)) finalMap.set(n.id, n); if (finalMap.size >= SUBGRAPH_MAX_NODES) break; } // Optional shadow nodes (provisional) for reviewer-driven sessions. if (args.includeShadowNodeIds && args.includeShadowNodeIds.length) { const shadowRows = await db .select() .from(toolNodes) .where(inArray(toolNodes.id, args.includeShadowNodeIds)); for (const r of shadowRows) { const n = row2node(r); if (!finalMap.has(n.id)) finalMap.set(n.id, n); } } const finalIds = new Set(finalMap.keys()); const finalEdges = allEdges.filter( (e) => finalIds.has(e.fromNode) && finalIds.has(e.toNode), ); return { nodes: Array.from(finalMap.values()), edges: finalEdges, matchedTags, }; } /** * Render a short textual description of a subgraph for inclusion in the * system prompt — e.g. "literature_search (search_pubmed) → * summarization (summarize_literature_bucket)". */ export function describeSubgraph(sg: ResolvedSubgraph): string { if (sg.nodes.length === 0) return ""; const byId = new Map(sg.nodes.map((n) => [n.id, n.name])); const lines: string[] = []; lines.push( `Tool capability subgraph for this turn (capabilities: ${sg.matchedTags.join(", ")}):`, ); for (const n of sg.nodes) { const tagStr = n.capabilityTags.join("/") || "general"; lines.push(`- ${n.name} [${tagStr}]: ${n.description.slice(0, 140)}`); } if (sg.edges.length) { lines.push("Connections:"); for (const e of sg.edges.slice(0, 20)) { const a = byId.get(e.fromNode); const b = byId.get(e.toNode); if (a && b) lines.push(`- ${a} —[${e.relation}]→ ${b}`); } } return lines.join("\n"); } // ---------------------------------------------------------------- writes export async function upsertNode(node: InsertToolNodeRow): Promise { const existing = await db .select() .from(toolNodes) .where(eq(toolNodes.name, node.name)) .limit(1); if (existing[0]) { await db .update(toolNodes) .set({ description: node.description, capabilityTags: node.capabilityTags, inputKind: node.inputKind ?? existing[0].inputKind, outputKind: node.outputKind ?? existing[0].outputKind, ownerProcess: node.ownerProcess, specJson: node.specJson, handlerRef: node.handlerRef ?? existing[0].handlerRef, costHint: node.costHint ?? existing[0].costHint, latencyHintMs: node.latencyHintMs ?? existing[0].latencyHintMs, version: existing[0].version + 1, updatedAt: new Date(), }) .where(eq(toolNodes.id, existing[0].id)); const fresh = await db .select() .from(toolNodes) .where(eq(toolNodes.id, existing[0].id)) .limit(1); return row2node(fresh[0]!); } const id = node.id || newId("tnode"); await db.insert(toolNodes).values({ ...node, id }); return row2node({ ...(node as ToolNodeRow), id, createdAt: new Date(), updatedAt: new Date(), version: 1, status: node.status ?? "verified" } as ToolNodeRow); } export async function upsertEdge(edge: InsertToolEdgeRow): Promise { const id = edge.id || newId("tedge"); // Edges are keyed by (fromNode, toNode, relation) under a unique // constraint. On conflict we re-set the contract jsonb so seed // changes (e.g. #156 deriveContract) propagate to existing rows // instead of leaving them with their pre-contract `NULL` shape. await db .insert(toolEdges) .values({ ...edge, id }) .onConflictDoUpdate({ target: [toolEdges.fromNode, toolEdges.toNode, toolEdges.relation], set: { contract: edge.contract ?? null, weight: edge.weight ?? 1.0, }, }); } export async function recordInvocation(args: { nodeName: string; ok: boolean; argsSummary: Record | null; resultPreview: unknown; errorCode?: string | null; durationMs?: number; shadowUserId?: string | null; }): Promise { try { const node = await getNodeByName(args.nodeName); if (!node) return; if (node.status === "deprecated") { // Direct-reference invocation of a deprecated node. We still // record telemetry (so we can see how often the runtime is // hitting deprecated names), but we surface a structured warn // so operators / agents can route to the replacement. const replacedBy = ((node.spec as { replacedBy?: unknown })?.replacedBy as | string | undefined) ?? null; logger.warn( { nodeName: args.nodeName, nodeId: node.id, replacedBy, }, "tool-graph: invocation of deprecated node — caller should switch to replacement", ); } await db.insert(toolNodeEvidence).values({ id: newId("tev"), nodeId: node.id, kind: args.shadowUserId ? "shadow_run" : "invocation", payload: { args: args.argsSummary, result_preview: redactPreview(args.resultPreview), error_code: args.errorCode ?? null, duration_ms: args.durationMs ?? null, }, success: args.ok ? 1 : 0, failure: args.ok ? 0 : 1, shadowUserId: args.shadowUserId ?? null, }); } catch (err) { logger.debug({ err }, "tool-graph recordInvocation failed"); } } // ---------------------------------------------------------------- seam health (#157) /** Bumped whenever the EMA / scoring formula changes. */ export const SEAM_HEALTH_FORMULA_VERSION = 1; /** Smoothing factor for the EMAs in `recomputeEdgeHealth`. */ const EMA_ALPHA = 0.2; export interface CaptureSeamHealthArgs { /** Downstream consumer node name (the node about to be invoked, or * the atomic node whose payload we are observing). */ downstreamName: string; /** Optional upstream producer name. When provided we resolve the * exact (upstream → downstream) edge and its contract; when absent * we fold the sample against every incoming edge of the downstream * node so atomic-step capture still records *something* even when * the producer cannot be identified. */ upstreamName?: string | null; /** The actual payload that crossed the seam. May be any JSON. */ payload: unknown; /** Optional invocation correlation id (for cross-row tracing). */ invocationId?: string | null; } /** * Capture one seam-health sample. Best-effort: every error path is * swallowed so capture cannot break the calling tool flow. Returns the * computed sample (or `null` when nothing was persisted) so callers and * tests can introspect. */ export async function captureSeamHealth( args: CaptureSeamHealthArgs, ): Promise<{ edgeId: string; sample: SeamSample } | null> { try { const downstream = await getNodeByName(args.downstreamName); if (!downstream) return null; let edges: Array<{ id: string; fromNode: string; toNode: string; contract: ContractSpec | null }> = []; if (args.upstreamName) { const upstream = await getNodeByName(args.upstreamName); if (!upstream) return null; const rows = await db .select() .from(toolEdges) .where(and(eq(toolEdges.fromNode, upstream.id), eq(toolEdges.toNode, downstream.id))); edges = rows.map((r) => ({ id: r.id, fromNode: r.fromNode, toNode: r.toNode, contract: (r.contract as ContractSpec | null) ?? null, })); } else { const rows = await db .select() .from(toolEdges) .where(eq(toolEdges.toNode, downstream.id)); edges = rows.map((r) => ({ id: r.id, fromNode: r.fromNode, toNode: r.toNode, contract: (r.contract as ContractSpec | null) ?? null, })); } if (edges.length === 0) return null; let firstResult: { edgeId: string; sample: SeamSample } | null = null; for (const e of edges) { const sample = evaluateSeam(e.contract, args.payload); await db.insert(toolNodeEvidence).values({ id: newId("tev"), nodeId: downstream.id, kind: "seam_health", payload: { edge_id: e.id, upstream_node: e.fromNode, downstream_node: e.toNode, missing_fields: sample.missingFields, unused_fields: sample.unusedFields, coverage: sample.coverage, contract_issues: sample.contractIssues, health_score: sample.healthScore, payload_preview: redactPreview(args.payload), invocation_id: args.invocationId ?? null, formula_version: SEAM_HEALTH_FORMULA_VERSION, }, success: sample.contractIssues.length === 0 && sample.missingFields.length === 0 ? 1 : 0, failure: sample.contractIssues.length > 0 || sample.missingFields.length > 0 ? 1 : 0, }); if (!firstResult) firstResult = { edgeId: e.id, sample }; } return firstResult; } catch (err) { logger.debug({ err }, "tool-graph captureSeamHealth failed (non-blocking)"); return null; } } /** * Capture seam-health samples for *every* outgoing edge from * `upstreamName`. This is the convenience entry point used by the * runtime: when an atomic step completes, fold its output against each * declared downstream consumer's contract. Returns the list of captured * results (or empty list on no-op / failure). Always non-blocking. */ export async function captureOutgoingSeamSamples( upstreamName: string, payload: unknown, ): Promise> { try { const upstream = await getNodeByName(upstreamName); if (!upstream) return []; const edges = await db .select() .from(toolEdges) .where(eq(toolEdges.fromNode, upstream.id)); if (edges.length === 0) return []; const downstreamNodes = await db .select() .from(toolNodes) .where(inArray(toolNodes.id, edges.map((e) => e.toNode))); const nameById = new Map(downstreamNodes.map((n) => [n.id, n.name])); const out: Array<{ edgeId: string; sample: SeamSample }> = []; for (const e of edges) { const downName = nameById.get(e.toNode); if (!downName) continue; const r = await captureSeamHealth({ upstreamName, downstreamName: downName, payload, }); if (r) out.push(r); } return out; } catch (err) { logger.debug({ err }, "tool-graph captureOutgoingSeamSamples failed (non-blocking)"); return []; } } interface SeamRollup { count: number; contractIssues: number; missingFields: number; coverageSum: number; healthSum: number; topMissing: Map; topIssues: Map; lastSampleAt: Date; /** ULID of the highest evidence row folded; "" when nothing folded. */ lastFoldedEvidenceId: string; } function foldSamplesIntoRollup( rows: Array<{ id: string; payload: unknown; createdAt: Date }>, ): SeamRollup { const r: SeamRollup = { count: 0, contractIssues: 0, missingFields: 0, coverageSum: 0, healthSum: 0, topMissing: new Map(), topIssues: new Map(), lastSampleAt: new Date(0), lastFoldedEvidenceId: "", }; for (const row of rows) { const p = (row.payload || {}) as { missing_fields?: string[]; contract_issues?: string[]; coverage?: number; health_score?: number; }; r.count += 1; const mf = Array.isArray(p.missing_fields) ? p.missing_fields : []; const ci = Array.isArray(p.contract_issues) ? p.contract_issues : []; r.missingFields += mf.length; r.contractIssues += ci.length; r.coverageSum += typeof p.coverage === "number" ? p.coverage : 1; r.healthSum += typeof p.health_score === "number" ? p.health_score : 1; for (const f of mf) r.topMissing.set(f, (r.topMissing.get(f) ?? 0) + 1); for (const c of ci) r.topIssues.set(c, (r.topIssues.get(c) ?? 0) + 1); if (row.createdAt > r.lastSampleAt) r.lastSampleAt = row.createdAt; // ULIDs sort lexicographically in time order. We pick the // string-greatest id we have seen as the watermark so the next // recompute pass can do a precise `id > lastFoldedEvidenceId` // filter instead of a millisecond-truncated timestamp compare. if (row.id > r.lastFoldedEvidenceId) r.lastFoldedEvidenceId = row.id; } return r; } function topN(m: Map, n: number): Record { const out: Record = {}; const sorted = Array.from(m.entries()).sort((a, b) => b[1] - a[1]).slice(0, n); for (const [k, v] of sorted) out[k] = v; return out; } export interface RecomputeEdgeHealthResult { edgesUpdated: number; samplesFolded: number; } /** * Walk new `seam_health` evidence rows added since each edge's stored * `computed_at`, fold them into per-edge rolling EMAs, and upsert * `tool_edge_health`. Designed to be safe to run repeatedly — it never * re-reads samples it has already folded. Edges with no contract and * no samples are simply skipped. */ export async function recomputeEdgeHealth(): Promise { const allEdges = await db.select().from(toolEdges); if (allEdges.length === 0) return { edgesUpdated: 0, samplesFolded: 0 }; let updated = 0; let folded = 0; for (const edge of allEdges) { // Per-edge transaction. Acquires `FOR UPDATE` on the existing // health row (or an advisory lock when there is no row yet) so that // concurrent recomputes serialize on each edge — preventing both // (a) lost samples from racing reads and (b) double-folded samples // from clobbering writes. The watermark we advance to is the max // `created_at` of the rows we actually folded — never wall-clock // `now()` — so any sample that lands during the read/write window // is guaranteed to be picked up by the next pass. await db.transaction(async (tx) => { // Serialize on this edge id — works regardless of whether a // tool_edge_health row already exists. The hash is a stable 32-bit // projection of the edge id (postgres `hashtext` is ok for locks). await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtext(${edge.id}))`); const existingRows = await tx .select() .from(toolEdgeHealth) .where(eq(toolEdgeHealth.edgeId, edge.id)) .limit(1); const cur = existingRows[0]; // Exactly-once fold via UPDATE-claim: atomically stamp every // unfolded `seam_health` row for this edge with `seam_folded_at` // and return the row payload in the same statement. This avoids // the time-cursor / ULID-cursor race entirely — a writer that // commits *after* the SELECT but with `created_at` < the // cursor (or with a smaller ULID) is still picked up here on // the next pass because its `seam_folded_at` is still NULL. const claimed = await tx.execute( sql` UPDATE tool_node_evidence SET seam_folded_at = now() WHERE kind = 'seam_health' AND seam_folded_at IS NULL AND payload ->> 'edge_id' = ${edge.id} RETURNING id, payload, created_at `, ); const rows = (claimed.rows as Array<{ id: string; payload: unknown; created_at: Date; }>).map((r) => ({ id: r.id, payload: r.payload, createdAt: r.created_at })); if (rows.length === 0) return; const rollup = foldSamplesIntoRollup(rows); folded += rollup.count; const sampleCoverage = rollup.coverageSum / rollup.count; const sampleHealth = rollup.healthSum / rollup.count; const newEmaCov = cur ? cur.emaCoverage * (1 - EMA_ALPHA) + sampleCoverage * EMA_ALPHA : sampleCoverage; const newEmaHealth = cur ? cur.emaHealthScore * (1 - EMA_ALPHA) + sampleHealth * EMA_ALPHA : sampleHealth; // Watermark = the most recent sample we actually folded. Any // sample written concurrently with `created_at <= since` will be // skipped (already counted), and anything strictly later than // `lastSampleAt` will be picked up next pass. We also require // `gt(...)` (not `gte`) so equal-timestamp duplicates don't get // counted twice — Postgres `now()` resolution is microseconds and // the writers use server-side `now()`, so collisions are real. // (computedAt remains an observability timestamp; the *real* // watermark is `lastFoldedEvidenceId` — see foldSamplesIntoRollup.) const nextRow = { id: cur?.id ?? newId("tedh"), edgeId: edge.id, traversalCount: (cur?.traversalCount ?? 0) + rollup.count, contractIssueCount: (cur?.contractIssueCount ?? 0) + rollup.contractIssues, missingFieldCount: (cur?.missingFieldCount ?? 0) + rollup.missingFields, emaCoverage: newEmaCov, emaHealthScore: newEmaHealth, topMissingFields: topN(rollup.topMissing, 5), topContractIssues: topN(rollup.topIssues, 5), formulaVersion: SEAM_HEALTH_FORMULA_VERSION, lastSampleAt: rollup.lastSampleAt, lastFoldedEvidenceId: rollup.lastFoldedEvidenceId, computedAt: new Date(), }; if (cur) { await tx .update(toolEdgeHealth) .set({ traversalCount: nextRow.traversalCount, contractIssueCount: nextRow.contractIssueCount, missingFieldCount: nextRow.missingFieldCount, emaCoverage: nextRow.emaCoverage, emaHealthScore: nextRow.emaHealthScore, topMissingFields: nextRow.topMissingFields, topContractIssues: nextRow.topContractIssues, formulaVersion: nextRow.formulaVersion, lastSampleAt: nextRow.lastSampleAt, lastFoldedEvidenceId: nextRow.lastFoldedEvidenceId, computedAt: nextRow.computedAt, }) .where(eq(toolEdgeHealth.id, cur.id)); } else { await tx.insert(toolEdgeHealth).values(nextRow); } updated += 1; }); } return { edgesUpdated: updated, samplesFolded: folded }; } export interface EdgeHealthView { edgeId: string; fromNode: string; toNode: string; relation: string; traversalCount: number; contractIssueCount: number; missingFieldCount: number; emaCoverage: number; emaHealthScore: number; topMissingFields: Record; topContractIssues: Record; formulaVersion: number; lastSampleAt: string | null; computedAt: string; } function rowToHealthView( h: ToolEdgeHealthRow, e: ToolEdgeRow, ): EdgeHealthView { return { edgeId: h.edgeId, fromNode: e.fromNode, toNode: e.toNode, relation: e.relation, traversalCount: h.traversalCount, contractIssueCount: h.contractIssueCount, missingFieldCount: h.missingFieldCount, emaCoverage: h.emaCoverage, emaHealthScore: h.emaHealthScore, topMissingFields: (h.topMissingFields as Record) || {}, topContractIssues: (h.topContractIssues as Record) || {}, formulaVersion: h.formulaVersion, lastSampleAt: h.lastSampleAt ? h.lastSampleAt.toISOString() : null, computedAt: h.computedAt.toISOString(), }; } export async function listEdgeHealth(): Promise { const edges = await db.select().from(toolEdges); const edgeById = new Map(edges.map((e) => [e.id, e])); const health = await db.select().from(toolEdgeHealth); const out: EdgeHealthView[] = []; for (const h of health) { const e = edgeById.get(h.edgeId); if (e) out.push(rowToHealthView(h, e)); } return out; } export async function getEdgeHealth(edgeId: string, sampleLimit = 20): Promise<{ edge: EdgeHealthView | null; recent: Array<{ id: string; createdAt: string; payload: Record }>; }> { const edges = await db.select().from(toolEdges).where(eq(toolEdges.id, edgeId)).limit(1); if (!edges[0]) return { edge: null, recent: [] }; const h = await db.select().from(toolEdgeHealth).where(eq(toolEdgeHealth.edgeId, edgeId)).limit(1); const samples = await db .select() .from(toolNodeEvidence) .where( and( eq(toolNodeEvidence.kind, "seam_health"), sql`${toolNodeEvidence.payload} ->> 'edge_id' = ${edgeId}`, ), ) .orderBy(desc(toolNodeEvidence.createdAt)) .limit(sampleLimit); return { edge: h[0] ? rowToHealthView(h[0], edges[0]) : null, recent: samples.map((s) => ({ id: s.id, createdAt: s.createdAt.toISOString(), payload: (s.payload as Record) || {}, })), }; } export async function listUnhealthyEdges(opts: { minTraversals?: number; maxHealthScore?: number; limit?: number; } = {}): Promise { const minT = opts.minTraversals ?? 1; const maxScore = opts.maxHealthScore ?? 0.85; const limit = opts.limit ?? 50; const all = await listEdgeHealth(); return all .filter((h) => h.traversalCount >= minT && h.emaHealthScore <= maxScore) .sort((a, b) => a.emaHealthScore - b.emaHealthScore) .slice(0, limit); } /** * Keys whose VALUES are masked before persistence (case-insensitive * substring match on the key name). Catches the obvious tokens / API * keys / passwords / cookies / authorization headers that may surface * in tool results. Conservative by design: false positives just mask a * harmless string; false negatives would store the secret in the DB. */ const SENSITIVE_KEY_PATTERNS = [ /token/i, /secret/i, /password/i, /passwd/i, /api[_-]?key/i, /authorization/i, /cookie/i, /session/i, /bearer/i, /private[_-]?key/i, /access[_-]?key/i, ]; function looksSensitive(key: string): boolean { for (const re of SENSITIVE_KEY_PATTERNS) if (re.test(key)) return true; return false; } function maskDeep(value: unknown, depth: number): unknown { if (depth <= 0 || value == null) return value; if (Array.isArray(value)) { return value.slice(0, 32).map((v) => maskDeep(v, depth - 1)); } if (typeof value === "object") { const out: Record = {}; for (const [k, v] of Object.entries(value as Record)) { out[k] = looksSensitive(k) ? "[REDACTED]" : maskDeep(v, depth - 1); } return out; } return value; } function redactPreview(value: unknown): unknown { if (value === null || value === undefined) return null; try { const masked = maskDeep(value, 6); const s = JSON.stringify(masked); if (s.length <= 800) return JSON.parse(s); return s.slice(0, 800) + "…"; } catch { return String(value).slice(0, 800); } } export async function recordPlannerGap( capabilityTag: string, context: Record, ): Promise { if (!capabilityTag) return; try { const tag = capabilityTag.toLowerCase(); const existing = await db .select() .from(toolGapSignals) .where(eq(toolGapSignals.capabilityTag, tag)) .limit(1); if (existing[0]) { await db .update(toolGapSignals) .set({ invocationCount: existing[0].invocationCount + 1, lastContext: context, lastSeenAt: new Date(), status: existing[0].status === "dismissed" ? "dismissed" : existing[0].status, }) .where(eq(toolGapSignals.id, existing[0].id)); } else { await db.insert(toolGapSignals).values({ id: newId("tgap"), capabilityTag: tag, invocationCount: 1, lastContext: context, status: "open", }); } } catch (err) { logger.debug({ err }, "tool-graph recordPlannerGap failed"); } } // ---------------------------------------------------------------- auto-extend /** * For every gap signal with status="open" whose count crosses the * threshold, create a provisional tool node, attach it to the nearest * verified node by capability-tag overlap, and link the gap signal back * to it. Returns the number of provisional nodes created. */ export async function autoExtendIfNeeded(): Promise { const open = await db .select() .from(toolGapSignals) .where(eq(toolGapSignals.status, "open")); let created = 0; for (const gap of open) { if (gap.invocationCount < GAP_AUTO_EXTEND_THRESHOLD) continue; const tag = gap.capabilityTag; const verifiedRows = await db .select() .from(toolNodes) .where(eq(toolNodes.status, "verified")); // pick nearest verified by tag overlap let nearest: ToolNodeRow | null = null; let bestOverlap = -1; for (const r of verifiedRows) { const tags = Array.isArray(r.capabilityTags) ? (r.capabilityTags as string[]) : []; const overlap = tags.includes(tag) ? 2 : tags.some((t) => t.startsWith(tag) || tag.startsWith(t)) ? 1 : 0; if (overlap > bestOverlap) { bestOverlap = overlap; nearest = r; } } const provName = `auto_${tag}_${gap.id.slice(-6)}`; const baseStub = generateHandlerStub(tag, provName); // #161: search the cross-seam template library for a structurally // identical (or near) past spawn. The match is *advisory only* — we // attach it to the provisional node's specJson.proposalContext so // the reviewer sees the banner; we never auto-promote. const fpInput: SeamFingerprintInput = { failureMode: "capability_gap", downstreamInputSchema: (baseStub.spec as { parameters?: IOSchema }) .parameters, upstreamOutputSchema: (nearest?.specJson as { outputSchema?: IOSchema } | null)?.outputSchema, capabilityTag: tag, }; let templateMatch: TemplateMatch | null = null; try { templateMatch = await searchTemplates(fpInput); } catch (err) { logger.debug({ err }, "tool-graph: searchTemplates failed (non-blocking)"); } let stub = baseStub; if (templateMatch) { // QUARANTINE-CONT-009 — provisional reviewer-side stub: 把 template // skeleton 当成 reviewer 待替换的占位 handler;真实 review 流程 B1 // 接通后必须删除这一支并撤掉 hit。 // @deprecated CONT-009. Provisional reviewer placeholder. Real wiring in B1. void import("./quarantine/index.ts").then((q) => q.recordQuarantineHit("CONT-009", { kind: "reviewer", site: "tool-graph.ts:templateMatch_stub", capabilityTag: tag, provisionalName: provName, templateId: templateMatch.template.id, strength: templateMatch.strength, }), ); // Use the template's contract-shaped skeleton as the candidate's // starting point. Reviewer will replace placeholders. const tplSpec = (templateMatch.template.specSkeleton || {}) as Record; const tplHandler = templateMatch.template.handlerSkeleton || baseStub.handler; stub = { spec: { ...baseStub.spec, ...tplSpec, name: provName, description: `${baseStub.spec.description as string} (seeded from template ${templateMatch.template.id} — ${templateMatch.strength} match)`, }, handler: applyParameterizationMarkers(tplHandler, templateMatch), }; // Bump offered count + log so the loop is observable. void recordTemplateOffered(templateMatch.template.id); logger.info( { gapId: gap.id, templateId: templateMatch.template.id, strength: templateMatch.strength, score: templateMatch.score, }, "tool-graph: spawn-template offered to reviewer", ); } const proposalContext = { capability_tag: tag, gap_signal_id: gap.id, invocation_count: gap.invocationCount, template_match: templateMatch ? { template_id: templateMatch.template.id, strength: templateMatch.strength, score: Number(templateMatch.score.toFixed(3)), source_node_name: templateMatch.template.sourceNodeName, parameterization_gaps: templateMatch.parameterizationGaps, // Surfaced for the reviewer banner so they can judge // whether to trust the suggestion. These are snapshot at // proposal time; the live counters live on the row. offered_count: templateMatch.template.offeredCount, reuse_count: templateMatch.template.reuseCount, success_count: templateMatch.template.successCount, reject_count: templateMatch.template.rejectCount, } : null, }; const specJson = { ...(stub.spec as Record), proposalContext, }; const provisional = await upsertNode({ id: newId("tnode"), name: provName, description: `Auto-extended provisional tool for capability '${tag}'. Drafted from ${gap.invocationCount} planner-gap signals; needs human review.`, capabilityTags: [tag], inputKind: "json", outputKind: "json", status: "provisional", ownerProcess: nearest?.ownerProcess === "python" ? "python" : "node", specJson: specJson as Record, handlerStub: stub.handler, createdBy: "auto", }); if (nearest) { // Per #156 invariant: every edge ships with a contract. Provisional // auto-extended edges have unknown true I/O shapes, so we attach // an advisory contract derived from the parent's outputSchema / // the stub's inputSchema and tag it provisional so reviewers // know to refine it during promotion. const nearestSpec = (nearest.specJson || {}) as { outputSchema?: IOSchema; }; const stubSpec = (stub.spec || {}) as { inputSchema?: IOSchema }; const provisionalContract: ContractSpec = { produces: nearestSpec.outputSchema || { type: "object" }, consumes: stubSpec.inputSchema || { type: "object" }, mappingHints: { advisory: true, source: "auto_extend_provisional", gap_signal_id: gap.id, }, }; await upsertEdge({ id: newId("tedge"), fromNode: nearest.id, toNode: provisional.id, relation: "alternative_to", weight: 0.5, contract: provisionalContract, }); } await db.insert(toolNodeEvidence).values({ id: newId("tev"), nodeId: provisional.id, kind: "auto_extend", payload: { capability_tag: tag, gap_signal_id: gap.id, invocation_count: gap.invocationCount, last_context: gap.lastContext, }, success: 0, failure: 0, }); await db .update(toolGapSignals) .set({ status: "extended", extendedNodeId: provisional.id }) .where(eq(toolGapSignals.id, gap.id)); created += 1; } // #160: every #158 splice may have changed which atoms are reachable // from an alias chain. Mark active summary paths pending_rebuild so // the admin UI can flag the staleness; the next `seedToolGraph` call // (boot-time, idempotent) re-derives them and flips the status back // to "active". if (created > 0) { try { const marked = await markSummaryPathsPendingRebuild(); if (marked > 0) { logger.info( { marked, autoExtendCreated: created }, "tool-graph: marked summary paths pending_rebuild after #158 splice", ); } } catch (err) { logger.debug( { err }, "tool-graph autoExtendIfNeeded: markSummaryPathsPendingRebuild failed (non-blocking)", ); } } return created; } /** * QUARANTINE-CONT-009 — Provisional handler stub. * Auto-generated when no real handler is available for a capability tag. * Real fix in B1 (graph-executor真接通) — implementer must delete this * function + the corresponding KNOWN_CONTAMINATIONS entry. * @deprecated CONT-009. Provisional placeholder. Real wiring in B1. */ export function generateHandlerStub(tag: string, name: string): { spec: Record; handler: string; } { void import("./quarantine/index.ts").then((q) => q.recordQuarantineHit("CONT-009", { kind: "handler", site: "tool-graph.ts:generateHandlerStub", capabilityTag: tag, provisionalName: name, }), ); const spec = { name, description: `Provisional handler for capability '${tag}'. Replace this stub during review.`, parameters: { type: "object", properties: { query: { type: "string", description: "Natural language query." }, }, required: ["query"], }, }; const handler = `// Stub handler for ${name} (capability: ${tag}). // Auto-generated from planner-gap signals. Replace with real implementation // before approving. export async function invoke(args: { query: string }) { return { error: "not_implemented", capability: "${tag}" }; } `; return { spec, handler }; } // ---------------------------------------------------------------- admin export interface ApproveNodeOptions { /** * #161: reviewer's choice w.r.t. an offered template. When omitted, * we treat it as "fresh" (no template was offered, or reviewer * ignored the banner). When set, we update the source template's * counters accordingly. */ templateChoice?: TemplateChoice; /** * Override the source template id. Defaults to the templateMatch * recorded in the provisional node's specJson.proposalContext. */ sourceTemplateId?: string | null; reviewer?: string; } export async function approveNode( id: string, options: ApproveNodeOptions = {}, ): Promise { const rows = await db .select() .from(toolNodes) .where(eq(toolNodes.id, id)) .limit(1); if (!rows[0]) return null; const node = rows[0]; await db .update(toolNodes) .set({ status: "verified", updatedAt: new Date() }) .where(eq(toolNodes.id, id)); // ---------- #161: template accounting + ingestion ------------------ try { const spec = (node.specJson || {}) as Record; const ctx = (spec.proposalContext || {}) as { capability_tag?: string; template_match?: { template_id?: string; strength?: string } | null; }; const tags = Array.isArray(node.capabilityTags) ? (node.capabilityTags as string[]) : []; const tag = ctx.capability_tag || tags[0] || ""; // Determine source template id (explicit override > spec context). const matchedTemplateId = options.sourceTemplateId !== undefined ? options.sourceTemplateId : ctx.template_match?.template_id || null; // Record reviewer choice if a template was offered. // ORDER MATTERS: bump success_count FIRST so the demote check // (triggered inside recordTemplateChoice → maybeAutoDemote) sees // the post-success success rate. Otherwise a successful reuse // could push a borderline template across the demote threshold. if (matchedTemplateId) { // Default to "fresh" when caller did not supply a choice — this // is the safe accounting answer: an older client / scripted call // without an explicit reviewer choice should NOT be counted as a // template reuse and inflate success metrics. const choice: TemplateChoice = options.templateChoice ?? "fresh"; if (choice !== "fresh") { await recordTemplatePromoteResult(matchedTemplateId); } await recordTemplateChoice({ templateId: matchedTemplateId, choice, }); } // Always learn from the promoted node — idempotent on fingerprint. // This is the meta-evolution step: every successful spawn becomes a // future template. if (tag) { // Find the alternative_to upstream peer to recover the seam shape. let upstreamOutputSchema: IOSchema | undefined; try { const inEdges = await db .select() .from(toolEdges) .where(eq(toolEdges.toNode, id)); for (const e of inEdges) { if (e.relation !== "alternative_to") continue; const up = await db .select() .from(toolNodes) .where(eq(toolNodes.id, e.fromNode)) .limit(1); const upSpec = (up[0]?.specJson || {}) as { outputSchema?: IOSchema }; if (upSpec.outputSchema) { upstreamOutputSchema = upSpec.outputSchema; break; } } } catch (err) { logger.debug({ err }, "tool-graph approveNode: upstream lookup failed"); } const fpInput: SeamFingerprintInput = { failureMode: "capability_gap", downstreamInputSchema: spec.parameters as IOSchema | undefined, upstreamOutputSchema, capabilityTag: tag, }; const persisted = await persistTemplateOnPromote({ fingerprintInput: fpInput, promotedNodeName: node.name, promotedInputSchema: (spec.inputSchema as IOSchema | undefined) || (spec.parameters as IOSchema | undefined), promotedOutputSchema: spec.outputSchema as IOSchema | undefined, handlerSkeleton: node.handlerStub || "", specSkeleton: spec, }); if (persisted) { await db.insert(toolNodeEvidence).values({ id: newId("tev"), nodeId: id, kind: "template_ingest", payload: { template_id: persisted.id, fingerprint_hash_prefix: persisted.fingerprintHash.slice(0, 12), failure_mode: persisted.failureMode, reviewer: options.reviewer || "unknown", template_choice: options.templateChoice || "fresh", source_template_id: matchedTemplateId, }, success: 1, failure: 0, }); } } } catch (err) { logger.warn({ err, nodeId: id }, "tool-graph: template ingest failed (non-fatal)"); } return getNode(id); } export interface RejectNodeOptions { /** #161: when set, recorded against the originating template. */ templateChoice?: TemplateChoice; sourceTemplateId?: string | null; reviewer?: string; } export async function rejectNode( id: string, options: RejectNodeOptions = {}, ): Promise { const rows = await db .select() .from(toolNodes) .where(eq(toolNodes.id, id)) .limit(1); if (!rows[0]) return false; const node = rows[0]; // #161: account against the offered template *before* deleting the node // so we still have access to the proposalContext. try { const spec = (node.specJson || {}) as Record; const ctx = (spec.proposalContext || {}) as { template_match?: { template_id?: string } | null; }; const matchedTemplateId = options.sourceTemplateId !== undefined ? options.sourceTemplateId : ctx.template_match?.template_id || null; if (matchedTemplateId) { // Default to "fresh" on reject — reviewer chose to discard the // template's suggestion entirely. If they had originally picked // Use/Use+edit, the caller can still pass that explicitly. const choice: TemplateChoice = options.templateChoice ?? "fresh"; await recordTemplateChoice({ templateId: matchedTemplateId, choice, }); } } catch (err) { logger.debug({ err, nodeId: id }, "tool-graph rejectNode: template accounting failed"); } await db.delete(toolNodes).where(eq(toolNodes.id, id)); // mark related gap signal as dismissed (best-effort) await db .update(toolGapSignals) .set({ status: "dismissed" }) .where(eq(toolGapSignals.extendedNodeId, id)); return true; } export async function listNodeEvidence(nodeId: string, limit = 50) { return db .select() .from(toolNodeEvidence) .where(eq(toolNodeEvidence.nodeId, nodeId)) .orderBy(desc(toolNodeEvidence.createdAt)) .limit(limit); } export async function listGapSignals( status?: "open" | "extended" | "dismissed", ) { if (status) { return db .select() .from(toolGapSignals) .where(eq(toolGapSignals.status, status)) .orderBy(desc(toolGapSignals.lastSeenAt)); } return db .select() .from(toolGapSignals) .orderBy(desc(toolGapSignals.lastSeenAt)); } // ---------------------------------------------------------------- deprecation (#159) /** * Deprecation tunables. All windows are in days. Defaults are conservative * — the detector is opt-in via DEPRECATION_DETECTOR_ENABLED, and even when * enabled it only proposes; no node is auto-deprecated. */ function envInt(name: string, fallback: number): number { const raw = (process.env[name] ?? "").trim(); if (!raw) return fallback; const n = Number.parseInt(raw, 10); return Number.isFinite(n) && n > 0 ? n : fallback; } export function deprecationDetectorEnabled(): boolean { const v = (process.env["DEPRECATION_DETECTOR_ENABLED"] ?? "").trim().toLowerCase(); return v === "1" || v === "true" || v === "yes" || v === "on"; } export function deprecationConfig(): { coldDays: number; deferDays: number; reArmDays: number; archiveAfterDays: number; } { return { coldDays: envInt("DEPRECATION_COLD_DAYS", 60), deferDays: envInt("DEPRECATION_DEFER_DAYS", 30), reArmDays: envInt("DEPRECATION_RE_ARM_DAYS", 90), archiveAfterDays: envInt("DEPRECATION_ARCHIVE_AFTER_DAYS", 180), }; } export type DeprecationClassification = | "active" | "cold" | "superseded" | "redundant"; export interface DeprecationCandidateProposal { nodeId: string; nodeName: string; classification: DeprecationClassification; /** Snapshot of the metrics that drove the classification. Stored * verbatim in `proposal_context` so reviewers see why the detector * flagged the node. */ supportingMetrics: { lastInvocationAt: string | null; invocationCount: number; daysSinceLastInvocation: number | null; inboundEdgeCount: number; outboundEdgeCount: number; /** For "redundant": the sibling node that subsumes this one. */ redundantWithNodeId?: string; redundantWithNodeName?: string; siblingTraversalRatio?: number; siblingHealthScore?: number; selfHealthScore?: number; /** For "superseded": the splice/replacement node id from * `spec_json.replacedBy` (set by Task #158). */ replacedBy?: string; }; /** Other verified nodes whose outgoing edges still point at this node. * Reviewers must understand the blast radius before approving. */ dependencies: Array<{ nodeId: string; nodeName: string; relation: string }>; } interface NodeMetrics { lastInvocationAt: Date | null; invocationCount: number; } async function gatherInvocationMetrics(): Promise> { const out = new Map(); const rows = await db .select({ nodeId: toolNodeEvidence.nodeId, kind: toolNodeEvidence.kind, createdAt: toolNodeEvidence.createdAt, }) .from(toolNodeEvidence) .where(inArray(toolNodeEvidence.kind, ["invocation", "shadow_run"])); for (const r of rows) { const cur = out.get(r.nodeId) ?? { lastInvocationAt: null, invocationCount: 0 }; cur.invocationCount += 1; if (!cur.lastInvocationAt || r.createdAt > cur.lastInvocationAt) { cur.lastInvocationAt = r.createdAt; } out.set(r.nodeId, cur); } return out; } /** * Pure classifier — given the verified-node universe and the metric * snapshot, return one proposal per non-active node. Does not touch the * database. The detector wraps this with idempotent persistence. * * Heuristics: * - cold: no invocation/shadow_run row newer than `now - coldDays`. * - superseded: `spec_json.replacedBy` is set (the #158 splice writes * this when an alternative supersedes the node). * - redundant: there exists a verified sibling node such that * (a) capability_tags overlap by ≥1 tag, * (b) sibling.invocationCount ≥ 10 × self.invocationCount, * (c) self has ≥1 invocation (do not flag totally cold nodes — * those go through the cold path with cleaner signal), * (d) sibling EMA health ≥ 0.85 AND self EMA health ≤ 0.5 * (only meaningful when both have edges; otherwise skip). * The thresholds are conservative because false positives delete * real capability. */ export function classifyDeprecationCandidates(args: { now: Date; coldDays: number; verifiedNodes: ResolvedNode[]; edges: ResolvedEdge[]; metrics: Map; edgeHealthByNodeId: Map; }): DeprecationCandidateProposal[] { const { now, coldDays, verifiedNodes, edges, metrics, edgeHealthByNodeId } = args; const coldCutoff = new Date(now.getTime() - coldDays * 86_400_000); const byId = new Map(verifiedNodes.map((n) => [n.id, n])); const inboundCount = new Map(); const outboundCount = new Map(); const dependents = new Map>(); for (const e of edges) { inboundCount.set(e.toNode, (inboundCount.get(e.toNode) ?? 0) + 1); outboundCount.set(e.fromNode, (outboundCount.get(e.fromNode) ?? 0) + 1); if (byId.has(e.toNode)) { const arr = dependents.get(e.toNode) ?? []; arr.push({ from: e.fromNode, relation: e.relation }); dependents.set(e.toNode, arr); } } function healthOf(nodeId: string): number | null { const h = edgeHealthByNodeId.get(nodeId); if (!h || h.count === 0) return null; return h.sum / h.count; } const out: DeprecationCandidateProposal[] = []; for (const node of verifiedNodes) { const m = metrics.get(node.id) ?? { lastInvocationAt: null, invocationCount: 0 }; const inbound = inboundCount.get(node.id) ?? 0; const outbound = outboundCount.get(node.id) ?? 0; const deps = (dependents.get(node.id) ?? []) .map((d) => { const dep = byId.get(d.from); return dep ? { nodeId: dep.id, nodeName: dep.name, relation: d.relation } : null; }) .filter((x): x is { nodeId: string; nodeName: string; relation: string } => !!x); const baseMetrics = { lastInvocationAt: m.lastInvocationAt ? m.lastInvocationAt.toISOString() : null, invocationCount: m.invocationCount, daysSinceLastInvocation: m.lastInvocationAt ? Math.floor((now.getTime() - m.lastInvocationAt.getTime()) / 86_400_000) : null, inboundEdgeCount: inbound, outboundEdgeCount: outbound, }; // ---- superseded (highest priority — explicit human/system signal) const replacedBy = (node.spec as { replacedBy?: unknown })?.replacedBy; if (typeof replacedBy === "string" && replacedBy.length > 0) { out.push({ nodeId: node.id, nodeName: node.name, classification: "superseded", supportingMetrics: { ...baseMetrics, replacedBy }, dependencies: deps, }); continue; } // ---- redundant (conservative sibling overlap) let redundantWith: ResolvedNode | null = null; let siblingRatio = 0; let siblingHealth: number | null = null; const selfHealth = healthOf(node.id); if (m.invocationCount > 0) { for (const sib of verifiedNodes) { if (sib.id === node.id) continue; const overlap = sib.capabilityTags.some((t) => node.capabilityTags.includes(t)); if (!overlap) continue; const sm = metrics.get(sib.id); if (!sm) continue; if (sm.invocationCount < m.invocationCount * 10) continue; const sh = healthOf(sib.id); // Both sides need health data and the sibling must be much healthier. if (sh === null || selfHealth === null) continue; if (sh < 0.85) continue; if (selfHealth > 0.5) continue; const ratio = sm.invocationCount / Math.max(1, m.invocationCount); if (!redundantWith || ratio > siblingRatio) { redundantWith = sib; siblingRatio = ratio; siblingHealth = sh; } } } if (redundantWith) { out.push({ nodeId: node.id, nodeName: node.name, classification: "redundant", supportingMetrics: { ...baseMetrics, redundantWithNodeId: redundantWith.id, redundantWithNodeName: redundantWith.name, siblingTraversalRatio: siblingRatio, siblingHealthScore: siblingHealth ?? undefined, selfHealthScore: selfHealth ?? undefined, }, dependencies: deps, }); continue; } // ---- cold const isCold = m.lastInvocationAt === null ? // Never invoked: only flag as cold if the node has been around // longer than coldDays. We can't see node createdAt here, but // the detector caller filters on it; in this pure step, a // never-invoked node always qualifies. true : m.lastInvocationAt < coldCutoff; if (isCold) { out.push({ nodeId: node.id, nodeName: node.name, classification: "cold", supportingMetrics: baseMetrics, dependencies: deps, }); continue; } } return out; } /** * Run one detector pass: builds proposals, persists open candidate * rows, and refreshes the supporting context on existing open rows. * Idempotent: re-running without state changes makes no row updates * beyond `updated_at`. Respects defer/re-arm windows. * * Returns counts so the admin route and tests can assert behaviour * without having to re-query. */ export async function runDeprecationDetector(args: { now?: Date; coldDays?: number; } = {}): Promise<{ proposalsConsidered: number; candidatesCreated: number; candidatesRefreshed: number; skippedDeferred: number; skippedReArm: number; }> { const cfg = deprecationConfig(); const now = args.now ?? new Date(); const coldDays = args.coldDays ?? cfg.coldDays; // Filter to nodes older than coldDays — never-invoked freshly-seeded // nodes should not be proposed on day 1. const allVerifiedRows = await db .select() .from(toolNodes) .where(eq(toolNodes.status, "verified")); const ageCutoff = new Date(now.getTime() - coldDays * 86_400_000); const verifiedRows = allVerifiedRows.filter( (r) => r.createdAt <= ageCutoff, ); const verifiedNodes = verifiedRows.map(row2node); const allEdges = await db.select().from(toolEdges); const resolvedEdges = allEdges.map(row2edge); const metrics = await gatherInvocationMetrics(); // Aggregate edge health by node id (any edge attached to the node // contributes; sum + count → mean). const healthRows = await db.select().from(toolEdgeHealth); const healthByEdgeId = new Map(healthRows.map((h) => [h.edgeId, h.emaHealthScore])); const edgeHealthByNodeId = new Map(); for (const e of allEdges) { const score = healthByEdgeId.get(e.id); if (score === undefined) continue; for (const id of [e.fromNode, e.toNode]) { const cur = edgeHealthByNodeId.get(id) ?? { count: 0, sum: 0 }; cur.count += 1; cur.sum += score; edgeHealthByNodeId.set(id, cur); } } const proposals = classifyDeprecationCandidates({ now, coldDays, verifiedNodes, edges: resolvedEdges, metrics, edgeHealthByNodeId, }); const existing = await db.select().from(toolDeprecationCandidates); const existingByNodeId = new Map(existing.map((r) => [r.nodeId, r])); let created = 0; let refreshed = 0; let skippedDeferred = 0; let skippedReArm = 0; const proposedNodeIds = new Set(); for (const p of proposals) { proposedNodeIds.add(p.nodeId); const prior = existingByNodeId.get(p.nodeId); if (prior) { if (prior.status === "deferred" && prior.deferUntil && prior.deferUntil > now) { skippedDeferred += 1; continue; } if (prior.status === "rejected" && prior.reArmUntil && prior.reArmUntil > now) { skippedReArm += 1; continue; } if (prior.status === "approved") { // Already deprecated; nothing to refresh. continue; } // Refresh the open/deferred-but-armed/rejected-but-armed row so // the latest metrics are visible to the reviewer. We scope the // update to the snapshot status we observed at the start of the // run; if a reviewer concurrently approved/deferred/rejected the // candidate after our snapshot, the WHERE clause misses and we // leave their decision intact (race-safe — the reviewer wins). const updated = await db .update(toolDeprecationCandidates) .set({ status: "open", classification: p.classification, proposalContext: { supportingMetrics: p.supportingMetrics, dependencies: p.dependencies, } as Record, deferUntil: null, reArmUntil: null, updatedAt: now, }) .where( and( eq(toolDeprecationCandidates.id, prior.id), eq(toolDeprecationCandidates.status, prior.status), ), ) .returning({ id: toolDeprecationCandidates.id }); if (updated.length > 0) refreshed += 1; continue; } await db.insert(toolDeprecationCandidates).values({ id: newId("tdep"), nodeId: p.nodeId, classification: p.classification, status: "open", proposalContext: { supportingMetrics: p.supportingMetrics, dependencies: p.dependencies, } as Record, decidedBy: null, decidedAt: null, createdAt: now, updatedAt: now, }); created += 1; } // For previously-open candidates whose nodes are no longer flagged // (e.g. someone exercised them again), close the open row out by // marking it rejected with a short re-arm window so the reviewer // does not see stale proposals. Deferred / approved rows are never // touched by this sweep. for (const row of existing) { if (row.status !== "open") continue; if (proposedNodeIds.has(row.nodeId)) continue; // Race-safe: only auto-reject if the row is still "open" at write // time. If a reviewer concurrently approved/deferred/rejected it, // the WHERE clause misses and their decision wins. await db .update(toolDeprecationCandidates) .set({ status: "rejected", decidedBy: "system", decidedAt: now, reArmUntil: new Date(now.getTime() + cfg.reArmDays * 86_400_000), updatedAt: now, }) .where( and( eq(toolDeprecationCandidates.id, row.id), eq(toolDeprecationCandidates.status, "open"), ), ); } return { proposalsConsidered: proposals.length, candidatesCreated: created, candidatesRefreshed: refreshed, skippedDeferred, skippedReArm, }; } export interface DeprecationCandidateView { id: string; nodeId: string; nodeName: string; classification: DeprecationClassification; status: "open" | "deferred" | "rejected" | "approved"; proposalContext: Record; deferUntil: string | null; reArmUntil: string | null; decidedBy: string | null; decidedAt: string | null; createdAt: string; updatedAt: string; } function rowToCandidateView( r: ToolDeprecationCandidateRow, nameById: Map, ): DeprecationCandidateView { return { id: r.id, nodeId: r.nodeId, nodeName: nameById.get(r.nodeId) ?? r.nodeId, classification: r.classification as DeprecationClassification, status: r.status as DeprecationCandidateView["status"], proposalContext: (r.proposalContext as Record) ?? {}, deferUntil: r.deferUntil ? r.deferUntil.toISOString() : null, reArmUntil: r.reArmUntil ? r.reArmUntil.toISOString() : null, decidedBy: r.decidedBy, decidedAt: r.decidedAt ? r.decidedAt.toISOString() : null, createdAt: r.createdAt.toISOString(), updatedAt: r.updatedAt.toISOString(), }; } export async function listDeprecationCandidates(opts: { status?: DeprecationCandidateView["status"] | "any"; } = {}): Promise { const status = opts.status ?? "open"; const rows = status === "any" ? await db .select() .from(toolDeprecationCandidates) .orderBy(desc(toolDeprecationCandidates.updatedAt)) : await db .select() .from(toolDeprecationCandidates) .where(eq(toolDeprecationCandidates.status, status)) .orderBy(desc(toolDeprecationCandidates.updatedAt)); if (rows.length === 0) return []; const nodeIds = Array.from(new Set(rows.map((r) => r.nodeId))); const nodes = await db .select({ id: toolNodes.id, name: toolNodes.name }) .from(toolNodes) .where(inArray(toolNodes.id, nodeIds)); const nameById = new Map(nodes.map((n) => [n.id, n.name])); return rows.map((r) => rowToCandidateView(r, nameById)); } /** * Approve a deprecation candidate: flips the candidate row to * `approved` and the underlying node to `deprecated`. Idempotent — a * second call on an already-approved row is a no-op. Returns null when * the candidate id is unknown. */ export async function deprecateNode( candidateId: string, reviewer: string, ): Promise { const rows = await db .select() .from(toolDeprecationCandidates) .where(eq(toolDeprecationCandidates.id, candidateId)) .limit(1); const cand = rows[0]; if (!cand) return null; const now = new Date(); await db.transaction(async (tx) => { await tx .update(toolDeprecationCandidates) .set({ status: "approved", decidedBy: reviewer, decidedAt: now, deferUntil: null, reArmUntil: null, updatedAt: now, }) .where(eq(toolDeprecationCandidates.id, candidateId)); await tx .update(toolNodes) .set({ status: "deprecated", updatedAt: now }) .where(eq(toolNodes.id, cand.nodeId)); await tx.insert(toolNodeEvidence).values({ id: newId("tev"), nodeId: cand.nodeId, kind: "deprecation_decision", payload: { decision: "deprecate", candidate_id: candidateId, classification: cand.classification, reviewer, }, success: 0, failure: 0, }); }); const out = await db .select() .from(toolDeprecationCandidates) .where(eq(toolDeprecationCandidates.id, candidateId)) .limit(1); if (!out[0]) return null; const node = await db .select({ id: toolNodes.id, name: toolNodes.name }) .from(toolNodes) .where(eq(toolNodes.id, cand.nodeId)) .limit(1); const nameById = new Map(node.map((n) => [n.id, n.name])); return rowToCandidateView(out[0], nameById); } export async function deferDeprecationCandidate( candidateId: string, reviewer: string, days?: number, ): Promise { const cfg = deprecationConfig(); const window = days ?? cfg.deferDays; const now = new Date(); const until = new Date(now.getTime() + window * 86_400_000); const updated = await db .update(toolDeprecationCandidates) .set({ status: "deferred", decidedBy: reviewer, decidedAt: now, deferUntil: until, reArmUntil: null, updatedAt: now, }) .where(eq(toolDeprecationCandidates.id, candidateId)) .returning(); if (updated.length === 0) return null; const cand = updated[0]!; await db.insert(toolNodeEvidence).values({ id: newId("tev"), nodeId: cand.nodeId, kind: "deprecation_decision", payload: { decision: "defer", candidate_id: candidateId, classification: cand.classification, reviewer, defer_until: until.toISOString(), }, success: 0, failure: 0, }); const node = await db .select({ id: toolNodes.id, name: toolNodes.name }) .from(toolNodes) .where(eq(toolNodes.id, cand.nodeId)) .limit(1); const nameById = new Map(node.map((n) => [n.id, n.name])); return rowToCandidateView(cand, nameById); } export async function rejectDeprecationCandidate( candidateId: string, reviewer: string, days?: number, ): Promise { const cfg = deprecationConfig(); const window = days ?? cfg.reArmDays; const now = new Date(); const until = new Date(now.getTime() + window * 86_400_000); const updated = await db .update(toolDeprecationCandidates) .set({ status: "rejected", decidedBy: reviewer, decidedAt: now, deferUntil: null, reArmUntil: until, updatedAt: now, }) .where(eq(toolDeprecationCandidates.id, candidateId)) .returning(); if (updated.length === 0) return null; const cand = updated[0]!; await db.insert(toolNodeEvidence).values({ id: newId("tev"), nodeId: cand.nodeId, kind: "deprecation_decision", payload: { decision: "reject", candidate_id: candidateId, classification: cand.classification, reviewer, re_arm_until: until.toISOString(), }, success: 0, failure: 0, }); const node = await db .select({ id: toolNodes.id, name: toolNodes.name }) .from(toolNodes) .where(eq(toolNodes.id, cand.nodeId)) .limit(1); const nameById = new Map(node.map((n) => [n.id, n.name])); return rowToCandidateView(cand, nameById); } /** * Resolve a node by name and, if it has been deprecated, return a * machine-readable error envelope including any `replacedBy` set on the * spec_json (Task #158 splice). Returns `{ deprecated: false, node }` * for active/provisional/alias nodes. Returns `null` when the node * doesn't exist. */ export async function resolveNodeForDirectReference( name: string, ): Promise< | { deprecated: false; node: ResolvedNode } | { deprecated: true; nodeName: string; replacedBy: string | null } | null > { const n = await getNodeByName(name); if (!n) return null; if (n.status === "deprecated") { const replacedByRaw = (n.spec as { replacedBy?: unknown })?.replacedBy; return { deprecated: true, nodeName: n.name, replacedBy: typeof replacedByRaw === "string" ? replacedByRaw : null, }; } return { deprecated: false, node: n }; } /** * Run the weekly archive job: any node that has been `deprecated` for * ≥ archiveAfterDays (measured from `tool_nodes.updated_at`, which is * stamped when `deprecateNode` flips the status) is moved into the * archive tables in a single transaction per node. The hot rows * (node, all attached edges, all evidence rows, the candidate row) are * deleted only after the archive INSERTs succeed. * * Returns per-node counts. Safe to re-run; never operates on * non-deprecated nodes. */ export async function runArchiveJob(args: { now?: Date; archiveAfterDays?: number; } = {}): Promise<{ nodesArchived: number; edgesArchived: number; evidenceArchived: number; candidatesArchived: number; }> { const cfg = deprecationConfig(); const now = args.now ?? new Date(); const after = args.archiveAfterDays ?? cfg.archiveAfterDays; const cutoff = new Date(now.getTime() - after * 86_400_000); const targets = await db .select() .from(toolNodes) .where( and(eq(toolNodes.status, "deprecated"), lte(toolNodes.updatedAt, cutoff)), ); let nodesArchived = 0; let edgesArchived = 0; let evidenceArchived = 0; let candidatesArchived = 0; for (const node of targets) { await db.transaction(async (tx) => { const edges = await tx .select() .from(toolEdges) .where( sql`${toolEdges.fromNode} = ${node.id} OR ${toolEdges.toNode} = ${node.id}`, ); const evidence = await tx .select() .from(toolNodeEvidence) .where(eq(toolNodeEvidence.nodeId, node.id)); const candidates = await tx .select() .from(toolDeprecationCandidates) .where(eq(toolDeprecationCandidates.nodeId, node.id)); // Mirror inserts first; if any of these throws we abort and the // hot rows are untouched. await tx.insert(toolNodesArchive).values({ id: node.id, name: node.name, description: node.description, capabilityTags: node.capabilityTags, inputKind: node.inputKind, outputKind: node.outputKind, status: node.status, ownerProcess: node.ownerProcess, specJson: node.specJson, createdBy: node.createdBy, handlerRef: node.handlerRef ?? null, handlerStub: node.handlerStub ?? null, costHint: node.costHint ?? null, latencyHintMs: node.latencyHintMs ?? null, version: node.version, createdAt: node.createdAt, updatedAt: node.updatedAt, archivedAt: now, }); if (edges.length > 0) { await tx.insert(toolEdgesArchive).values( edges.map((e) => ({ id: e.id, fromNode: e.fromNode, toNode: e.toNode, relation: e.relation, weight: e.weight, contract: e.contract, createdAt: e.createdAt, archivedAt: now, })), ); } if (evidence.length > 0) { await tx.insert(toolNodeEvidenceArchive).values( evidence.map((ev) => ({ id: ev.id, nodeId: ev.nodeId, kind: ev.kind, payload: ev.payload, success: ev.success, failure: ev.failure, shadowUserId: ev.shadowUserId ?? null, createdAt: ev.createdAt, seamFoldedAt: ev.seamFoldedAt ?? null, archivedAt: now, })), ); } // Now delete hot rows. Edges/evidence/candidates cascade off // tool_nodes via FK, but we delete explicitly to keep the order // deterministic and to capture counts. if (candidates.length > 0) { await tx .delete(toolDeprecationCandidates) .where(eq(toolDeprecationCandidates.nodeId, node.id)); } if (edges.length > 0) { await tx .delete(toolEdges) .where( sql`${toolEdges.fromNode} = ${node.id} OR ${toolEdges.toNode} = ${node.id}`, ); } if (evidence.length > 0) { await tx .delete(toolNodeEvidence) .where(eq(toolNodeEvidence.nodeId, node.id)); } await tx.delete(toolNodes).where(eq(toolNodes.id, node.id)); nodesArchived += 1; edgesArchived += edges.length; evidenceArchived += evidence.length; candidatesArchived += candidates.length; }); } return { nodesArchived, edgesArchived, evidenceArchived, candidatesArchived }; } // ---------------------------------------------------------------- archive read API (#159) /** * Read-only listing of archived nodes (and a small slice of their * incident archived edges + most-recent archived evidence) for the * admin "View archive" surface. `search` is a case-insensitive name * substring filter. Results are ordered newest-archived first. */ export async function listArchive(opts: { limit?: number; search?: string; }): Promise<{ nodes: Array<{ id: string; name: string; description: string | null; capabilityTags: string[]; ownerProcess: string; archivedAt: string; edges: Array<{ id: string; fromNode: string; toNode: string; relation: string; }>; recentEvidence: Array<{ id: string; kind: string; success: number; failure: number; createdAt: string; }>; }>; }> { const limit = Math.min(Math.max(opts.limit ?? 100, 1), 500); const search = opts.search?.toLowerCase(); const all = await db .select() .from(toolNodesArchive) .orderBy(desc(toolNodesArchive.archivedAt)) .limit(limit * 2); const filtered = search ? all.filter((n) => n.name.toLowerCase().includes(search)) : all; const slice = filtered.slice(0, limit); if (slice.length === 0) return { nodes: [] }; const nodeIds = slice.map((n) => n.id); const archivedEdges = await db .select() .from(toolEdgesArchive) .where( sql`${toolEdgesArchive.fromNode} IN (${sql.join( nodeIds.map((i) => sql`${i}`), sql`, `, )}) OR ${toolEdgesArchive.toNode} IN (${sql.join( nodeIds.map((i) => sql`${i}`), sql`, `, )})`, ); const archivedEv = await db .select() .from(toolNodeEvidenceArchive) .where(inArray(toolNodeEvidenceArchive.nodeId, nodeIds)) .orderBy(desc(toolNodeEvidenceArchive.createdAt)) .limit(5 * slice.length); const evByNode = new Map(); for (const ev of archivedEv) { const arr = evByNode.get(ev.nodeId) ?? []; if (arr.length < 5) arr.push(ev); evByNode.set(ev.nodeId, arr); } return { nodes: slice.map((n) => ({ id: n.id, name: n.name, description: n.description ?? null, capabilityTags: (n.capabilityTags as string[]) ?? [], ownerProcess: n.ownerProcess, archivedAt: n.archivedAt.toISOString(), edges: archivedEdges .filter((e) => e.fromNode === n.id || e.toNode === n.id) .map((e) => ({ id: e.id, fromNode: e.fromNode, toNode: e.toNode, relation: e.relation, })), recentEvidence: (evByNode.get(n.id) ?? []).map((ev) => ({ id: ev.id, kind: ev.kind, success: ev.success, failure: ev.failure, createdAt: ev.createdAt.toISOString(), })), })), }; } // ---------------------------------------------------------------- scheduler (#159) /** * Lightweight in-process scheduler. When DEPRECATION_DETECTOR_ENABLED=1 * this fires the detector daily and the archive job weekly. Both jobs * are idempotent so running them while a manual run is in flight is * safe (the second run is a no-op if the first already advanced state). * * Returns a stop() handle; tests and graceful shutdown should call it. */ export function startDeprecationScheduler(opts?: { detectorIntervalMs?: number; archiveIntervalMs?: number; }): { stop: () => void } { if (!deprecationDetectorEnabled()) { logger.info( "tool-graph deprecation scheduler: disabled (DEPRECATION_DETECTOR_ENABLED unset)", ); return { stop: () => {} }; } const detectorMs = opts?.detectorIntervalMs ?? 24 * 60 * 60 * 1000; const archiveMs = opts?.archiveIntervalMs ?? 7 * 24 * 60 * 60 * 1000; let stopped = false; const detectorTick = setInterval(() => { if (stopped) return; runDeprecationDetector() .then((s) => logger.info( { ...s, kind: "deprecation_detector_tick" }, "tool-graph deprecation scheduler: detector ran", ), ) .catch((err) => logger.error( { err }, "tool-graph deprecation scheduler: detector failed", ), ); }, detectorMs); const archiveTick = setInterval(() => { if (stopped) return; runArchiveJob() .then((s) => logger.info( { ...s, kind: "deprecation_archive_tick" }, "tool-graph deprecation scheduler: archive ran", ), ) .catch((err) => logger.error( { err }, "tool-graph deprecation scheduler: archive failed", ), ); }, archiveMs); // Don't keep the event loop alive just for these timers; the HTTP // server is the lifeline. detectorTick.unref(); archiveTick.unref(); logger.info( { detectorMs, archiveMs }, "tool-graph deprecation scheduler: started", ); return { stop: () => { stopped = true; clearInterval(detectorTick); clearInterval(archiveTick); }, }; } // ---------------------------------------------------------------- summary paths (#160) /** * Resolved summary path view: a `composition_alias` chain rendered as a * first-class planner abstraction. The runtime description shown to the * LLM is `descriptionOverride ?? descriptionDerived`. */ export interface ResolvedSummaryPath { id: string; name: string; aliasNodeId: string | null; description: string; descriptionDerived: string; descriptionOverride: string | null; expansionNodeNames: string[]; headAtomName: string | null; tailAtomName: string | null; capabilityTags: string[]; status: "active" | "pending_rebuild"; estCostHint: number | null; estLatencyMsHint: number | null; traversalCount: number; version: number; updatedAt: string; } function row2summaryPath(r: ToolSummaryPathRow): ResolvedSummaryPath { const exp = Array.isArray(r.expansionNodeNames) ? (r.expansionNodeNames as string[]) : []; const tags = Array.isArray(r.capabilityTags) ? (r.capabilityTags as string[]) : []; return { id: r.id, name: r.name, aliasNodeId: r.aliasNodeId ?? null, descriptionDerived: r.descriptionDerived ?? "", descriptionOverride: r.descriptionOverride ?? null, description: (r.descriptionOverride ?? r.descriptionDerived ?? "").trim(), expansionNodeNames: exp, headAtomName: r.headAtomName ?? null, tailAtomName: r.tailAtomName ?? null, capabilityTags: tags, status: (r.status as "active" | "pending_rebuild") ?? "active", estCostHint: r.estCostHint ?? null, estLatencyMsHint: r.estLatencyMsHint ?? null, traversalCount: r.traversalCount ?? 0, version: r.version ?? 1, updatedAt: r.updatedAt.toISOString(), }; } export async function listSummaryPaths(opts: { status?: "active" | "pending_rebuild" | "any"; } = {}): Promise { const status = opts.status ?? "any"; const rows = status === "any" ? await db.select().from(toolSummaryPaths) : await db .select() .from(toolSummaryPaths) .where(eq(toolSummaryPaths.status, status)); return rows.map(row2summaryPath); } /** * Idempotent upsert. Two correctness properties: * 1. **Concurrency-safe** — uses Postgres `ON CONFLICT (name)` so two * parallel seed/rebuild calls cannot race past the unique-name * check and double-insert. * 2. **No version churn** — `version` only bumps when the *material* * fields (description_derived, expansion, head/tail, tags) actually * change. Re-running `seedToolGraph` on an unchanged graph is a * no-op for `version` and `updated_at` (other than the * status-back-to-active flip on rows that were pending_rebuild). * * `description_override` is admin-edited; the seed never touches it * unless the caller explicitly passes a string or `null`. */ export async function upsertSummaryPath( row: InsertToolSummaryPathRow, ): Promise { const id = row.id && row.id.length > 0 ? row.id : newId("tsum"); const existing = await db .select() .from(toolSummaryPaths) .where(eq(toolSummaryPaths.name, row.name)) .limit(1); if (existing[0]) { const cur = existing[0]; const newDesc = row.descriptionDerived ?? cur.descriptionDerived; const newExp = (row.expansionNodeNames ?? cur.expansionNodeNames) as unknown[]; const newTags = (row.capabilityTags ?? cur.capabilityTags) as unknown[]; const material = newDesc !== cur.descriptionDerived || JSON.stringify(newExp) !== JSON.stringify(cur.expansionNodeNames) || (row.headAtomName ?? cur.headAtomName) !== cur.headAtomName || (row.tailAtomName ?? cur.tailAtomName) !== cur.tailAtomName || JSON.stringify(newTags) !== JSON.stringify(cur.capabilityTags) || (row.aliasNodeId ?? cur.aliasNodeId) !== cur.aliasNodeId; const desiredStatus = row.status ?? "active"; const statusFlip = desiredStatus !== cur.status; if (!material && !statusFlip && row.descriptionOverride === undefined) { // True no-op rebuild — preserve version + updated_at so admins can // see that nothing changed. return row2summaryPath(cur); } await db .update(toolSummaryPaths) .set({ aliasNodeId: row.aliasNodeId ?? cur.aliasNodeId, descriptionDerived: newDesc, descriptionOverride: row.descriptionOverride !== undefined ? row.descriptionOverride : cur.descriptionOverride, expansionNodeNames: newExp as never, headAtomName: row.headAtomName ?? cur.headAtomName, tailAtomName: row.tailAtomName ?? cur.tailAtomName, capabilityTags: newTags as never, estCostHint: row.estCostHint ?? cur.estCostHint, estLatencyMsHint: row.estLatencyMsHint ?? cur.estLatencyMsHint, status: desiredStatus, version: material ? cur.version + 1 : cur.version, updatedAt: new Date(), }) .where(eq(toolSummaryPaths.id, cur.id)); const fresh = await db .select() .from(toolSummaryPaths) .where(eq(toolSummaryPaths.id, cur.id)) .limit(1); return row2summaryPath(fresh[0]!); } // Insert path. ON CONFLICT (name) guards against the // select-then-insert race when two seeders run concurrently. await db .insert(toolSummaryPaths) .values({ ...row, id }) .onConflictDoUpdate({ target: toolSummaryPaths.name, set: { aliasNodeId: row.aliasNodeId, descriptionDerived: row.descriptionDerived, expansionNodeNames: row.expansionNodeNames as never, headAtomName: row.headAtomName, tailAtomName: row.tailAtomName, capabilityTags: row.capabilityTags as never, estCostHint: row.estCostHint, estLatencyMsHint: row.estLatencyMsHint, status: row.status ?? "active", updatedAt: new Date(), }, }); const fresh = await db .select() .from(toolSummaryPaths) .where(eq(toolSummaryPaths.name, row.name)) .limit(1); return row2summaryPath(fresh[0]!); } /** * Prune summary path rows whose name is not in the canonical set * passed in. Called at the end of a rebuild so removing an alias from * the seed (or renaming it) cleans up its dangling summary row. * Returns the number of rows deleted. */ export async function pruneSummaryPathsNotIn( keepNames: string[], ): Promise { if (keepNames.length === 0) { // Defensive: if the canonical set is empty we still don't truncate // — that's almost certainly a bug in the caller, not an intent. return 0; } const all = await db .select({ id: toolSummaryPaths.id, name: toolSummaryPaths.name }) .from(toolSummaryPaths); const keep = new Set(keepNames); const stale = all.filter((r) => !keep.has(r.name)).map((r) => r.id); if (stale.length === 0) return 0; await db .delete(toolSummaryPaths) .where(inArray(toolSummaryPaths.id, stale)); return stale.length; } export async function setSummaryPathOverride( id: string, override: string | null, ): Promise { const trimmed = typeof override === "string" ? override.trim().slice(0, 4000) : null; const updated = await db .update(toolSummaryPaths) .set({ descriptionOverride: trimmed && trimmed.length > 0 ? trimmed : null, updatedAt: new Date(), }) .where(eq(toolSummaryPaths.id, id)) .returning(); return updated[0] ? row2summaryPath(updated[0]) : null; } /** * Mark every active summary path as `pending_rebuild`. Called after the * #158 splice path (autoExtendIfNeeded) lands a new node/edge so the * admin UI can flag that the cached chains may be stale until the next * `seedToolGraph` run rebuilds them. */ export async function markSummaryPathsPendingRebuild(): Promise { const updated = await db .update(toolSummaryPaths) .set({ status: "pending_rebuild", updatedAt: new Date() }) .where(eq(toolSummaryPaths.status, "active")) .returning({ id: toolSummaryPaths.id }); return updated.length; } export async function recordSummaryPathTraversal( name: string, ): Promise { try { await db .update(toolSummaryPaths) .set({ traversalCount: sql`${toolSummaryPaths.traversalCount} + 1`, updatedAt: new Date(), }) .where(eq(toolSummaryPaths.name, name)); } catch (err) { logger.debug({ err, name }, "recordSummaryPathTraversal failed (non-blocking)"); } } /** * Resolve summary paths for one user turn under the hierarchy planner * flag. Picks every `active` path whose `capabilityTags` overlap with * `matchedTags`, capped at `PLANNER_MAX_SUMMARIES`. Empty array when * no overlap (caller should fall back to flat atomic listing). */ export async function resolveSummaryPathsForTags( matchedTags: string[], cap = PLANNER_MAX_SUMMARIES, ): Promise { if (!matchedTags.length) return []; const all = await listSummaryPaths({ status: "any" }); const tagSet = new Set(matchedTags); const filtered = all .filter((s) => s.capabilityTags.some((t) => tagSet.has(t))) // Active paths first; pending_rebuild last so admins see the chain // even when stale, but a healthier alternative wins ranking ties. .sort((a, b) => { if (a.status !== b.status) return a.status === "active" ? -1 : 1; return b.traversalCount - a.traversalCount; }); return filtered.slice(0, Math.max(0, cap)); } export function describeSummaryPaths(paths: ResolvedSummaryPath[]): string { if (!paths.length) return ""; // Compact format — one line per path: `name: a → b → c`. The full // description is intentionally omitted; the planner already has the // atomic tool descriptions in the flat catalog below, so duplicating // them here would blow the 15% net-overhead budget. Stale chains are // marked with a trailing `*` so the planner can still reference them // while the admin rebuild is pending. const lines: string[] = ["preferred chains:"]; for (const p of paths) { const stale = p.status === "pending_rebuild" ? "*" : ""; lines.push(`- ${p.name}${stale}: ${p.expansionNodeNames.join(" -> ")}`); } return lines.join("\n"); } // ---------------------------------------------------------------- feature flag export function toolGraphEnabled(): boolean { const v = (process.env["TOOL_GRAPH_ENABLED"] ?? "").trim().toLowerCase(); return v === "1" || v === "true" || v === "yes" || v === "on"; } /** * Hierarchical planner kill-switch (#160). Default OFF — when unset the * planner sees the flat atomic catalog exactly as before, guaranteeing * the both-mode-equivalence invariant. When set to "1"/"true" the * planner prompt is augmented with the resolved summary paths from * `tool_summary_paths`; the underlying tool list and dispatch path are * unchanged so flipping the flag mid-flight cannot diverge outputs. */ export function plannerHierarchyEnabled(): boolean { const v = (process.env["PLANNER_HIERARCHY_ENABLED"] ?? "") .trim() .toLowerCase(); return v === "1" || v === "true" || v === "yes" || v === "on"; } /** Cap on summary-path entries injected into a single planner prompt. */ export const PLANNER_MAX_SUMMARIES = (() => { const raw = Number(process.env["PLANNER_MAX_SUMMARIES"] ?? "10"); return Number.isFinite(raw) && raw > 0 ? Math.floor(raw) : 10; })();