doatlas-2 / artifacts /api-server /src /lib /tool-graph.ts
Iostream-Li's picture
Add files using upload-large-folder tool
5871090 verified
/**
* tool-graph — cross-process tool capability graph access layer.
*
* Both the Node `api-server` and the Python `research-engine` read and
* write the same Postgres tables (`tool_nodes`, `tool_edges`,
* `tool_node_evidence`, `tool_gap_signals`). This file is the Node-side
* client; `backend/tools/graph_client.py` mirrors the same operations on
* the Python side.
*
* Public surface:
* - resolveSubgraph(intent) — pick a connected slice of verified
* nodes for the current user turn.
* - getNode(idOrName) — single-node lookup.
* - listNodes({status}) — list (used by admin UI + seeders).
* - recordInvocation(...) — telemetry per tool call.
* - recordPlannerGap(tag, context) — telemetry when planner finds no fit.
* - autoExtendIfNeeded() — promote crossed-threshold gap
* signals into provisional nodes.
* - approveNode / rejectNode — admin actions.
*/
import { and, asc, desc, eq, gt, inArray, isNull, lt, lte, sql } from "drizzle-orm";
import {
db,
toolNodes,
toolEdges,
toolNodeEvidence,
toolGapSignals,
toolEdgeHealth,
toolDeprecationCandidates,
toolNodesArchive,
toolEdgesArchive,
toolNodeEvidenceArchive,
toolSummaryPaths,
type ToolNodeRow,
type ToolEdgeRow,
type ToolEdgeHealthRow,
type InsertToolNodeRow,
type InsertToolEdgeRow,
type ToolDeprecationCandidateRow,
type ToolSummaryPathRow,
type InsertToolSummaryPathRow,
} from "@workspace/db";
import { newId } from "./ids";
import { logger } from "./logger";
import { evaluateSeam, type SeamSample } from "./seam-health-detector";
import {
searchTemplates,
recordTemplateOffered,
recordTemplateChoice,
recordTemplatePromoteResult,
persistTemplateOnPromote,
applyParameterizationMarkers,
type TemplateChoice,
type TemplateMatch,
type SeamFingerprintInput,
} from "./spawn-templates";
export const GAP_AUTO_EXTEND_THRESHOLD = 3;
export const SUBGRAPH_MAX_NODES = 12;
export type ToolNodeStatus =
| "verified"
| "provisional"
| "rejected"
/**
* `composition_alias` — a node that represents a coarse capability that
* has been decomposed into a chain of smaller atomic nodes. The alias
* stays callable for backwards compatibility (old prompts and saved
* chats keep working) but is **never selected by the planner** —
* `resolveSubgraph` and `listNodes({status:"verified"})` exclude it
* automatically because the status is not "verified". The alias's
* historical evidence rows remain attached so #157 can still see the
* pre-decomposition behaviour. Introduced in Task #156.
*/
| "composition_alias"
/**
* `deprecated` — a node that a human reviewer has approved for
* retirement (Task #159). It is excluded from `resolveSubgraph`
* (planner cannot select it) and from `listNodes({status:"verified"})`,
* but it still exists in the hot graph so direct references and
* outstanding evidence remain inspectable for audit. After
* `DEPRECATION_ARCHIVE_AFTER_DAYS` (default 180) it is moved to the
* `tool_*_archive` tables by the weekly archive job and the hot rows
* are deleted in a single transaction.
*/
| "deprecated";
export type OwnerProcess = "node" | "python";
/**
* A minimal JSON Schema subset we use to describe seam wires. Only the
* shapes we care about for structural compatibility checking — full
* JSON Schema validation lives elsewhere if/when needed.
*/
export interface IOSchema {
type?: "object" | "array" | "string" | "number" | "integer" | "boolean" | "null";
properties?: Record<string, IOSchema>;
required?: string[];
items?: IOSchema;
description?: string;
/** Free-form passthrough for extra hints (e.g. `enum`, `format`). */
[k: string]: unknown;
}
/**
* `ContractSpec` is the seam contract attached to an edge:
* - `produces` — the shape the upstream node emits.
* - `consumes` — the shape the downstream node requires.
* The seed-time validator enforces that every required field on
* `consumes` exists with a compatible type on `produces`.
*/
export interface ContractSpec {
produces: IOSchema;
consumes: IOSchema;
/**
* Optional reviewer-facing notes about how the upstream's produced
* fields map onto the downstream's consumed fields, plus advisory
* flags. Heuristic LLM-tool adjacencies set `advisory:true` so
* downstream seam-health (#157) can downgrade their structural
* mismatches from hard failures to non-fatal contractIssues.
*/
mappingHints?: Record<string, unknown>;
}
export interface ResolvedNode {
id: string;
name: string;
description: string;
capabilityTags: string[];
status: ToolNodeStatus;
ownerProcess: OwnerProcess;
spec: Record<string, unknown>;
costHint: number | null;
latencyHintMs: number | null;
}
export interface ResolvedEdge {
fromNode: string;
toNode: string;
relation: string;
weight: number;
/** Optional seam contract; populated for edges added under Task #156. */
contract: ContractSpec | null;
}
export interface ResolvedSubgraph {
nodes: ResolvedNode[];
edges: ResolvedEdge[];
/** Capability tags that drove the resolution. */
matchedTags: string[];
}
// ---------------------------------------------------------------- helpers
function row2node(r: ToolNodeRow): ResolvedNode {
return {
id: r.id,
name: r.name,
description: r.description,
capabilityTags: Array.isArray(r.capabilityTags)
? (r.capabilityTags as string[])
: [],
status: r.status as ToolNodeStatus,
ownerProcess: r.ownerProcess as OwnerProcess,
spec: (r.specJson as Record<string, unknown>) || {},
costHint: r.costHint ?? null,
latencyHintMs: r.latencyHintMs ?? null,
};
}
function row2edge(r: ToolEdgeRow): ResolvedEdge {
return {
fromNode: r.fromNode,
toNode: r.toNode,
relation: r.relation,
weight: r.weight,
contract: (r.contract as ContractSpec | null) ?? null,
};
}
// ---------------------------------------------------------------- contracts
export interface ContractValidationIssue {
field: string;
reason: "missing_in_produces" | "type_mismatch";
expectedType?: string;
actualType?: string;
}
export interface ContractValidationResult {
ok: boolean;
issues: ContractValidationIssue[];
}
/**
* Structural compatibility check for a single seam contract:
* every field marked `required` on the consumes side must exist on the
* produces side, and the declared `type` (when both sides specify one)
* must match. We deliberately keep this shallow — it's a wire-shape
* compatibility check, not a full JSON Schema validator. Object types
* with no `required` array trivially pass, and unspecified types are
* treated as "any". Used by `tool-graph-seed.ts` at boot to refuse a
* structurally broken graph and (eventually, in #157) by the seam
* health detector to flag drift.
*/
export function validateContract(
contract: ContractSpec,
): ContractValidationResult {
const issues: ContractValidationIssue[] = [];
const producesProps = contract.produces?.properties ?? {};
const consumesProps = contract.consumes?.properties ?? {};
const required = contract.consumes?.required ?? [];
for (const field of required) {
const c = consumesProps[field];
const p = producesProps[field];
if (!p) {
issues.push({ field, reason: "missing_in_produces" });
continue;
}
const ct = c?.type;
const pt = p?.type;
if (ct && pt && ct !== pt) {
issues.push({
field,
reason: "type_mismatch",
expectedType: String(ct),
actualType: String(pt),
});
}
}
return { ok: issues.length === 0, issues };
}
// ---------------------------------------------------------------- intent
/**
* Lightweight semantic-ish capability tagger. Takes the user turn text
* and returns the set of capability tags that look relevant. This is the
* replacement for the regex `looksLikeResearchIntent` gate. Tags are
* intentionally coarse — `resolveSubgraph` then expands the matched tag
* set into a connected node slice via the graph edges.
*/
// Single source of truth: lib/tool-graph/intent-rules.json. Both this file
// and backend/tools/graph_client.py compile their rules from that JSON so
// the Node and Python sides cannot drift. The shared fixture
// `__tests__/fixtures/intent-cases.json` is asserted against both sides.
import intentRulesData from "../../../../lib/tool-graph/intent-rules.json";
interface IntentRuleSpec {
tag: string;
patterns: string[];
}
const INTENT_RULES: Array<{ tag: string; patterns: RegExp[] }> = (
(intentRulesData as { rules: IntentRuleSpec[] }).rules || []
).map((r) => ({
tag: r.tag,
patterns: r.patterns.map((p) => new RegExp(p, "i")),
}));
export function tagsForIntent(text: string): string[] {
const t = (text || "").trim();
if (!t) return [];
const tags = new Set<string>();
for (const rule of INTENT_RULES) {
if (rule.patterns.some((re) => re.test(t))) tags.add(rule.tag);
}
return Array.from(tags);
}
// ---------------------------------------------------------------- queries
export async function listNodes(
opts: { status?: ToolNodeStatus | "any" } = {},
): Promise<ResolvedNode[]> {
const status = opts.status ?? "verified";
const rows = status === "any"
? await db.select().from(toolNodes)
: await db.select().from(toolNodes).where(eq(toolNodes.status, status));
return rows.map(row2node);
}
export async function listEdges(
nodeIds?: string[],
): Promise<ResolvedEdge[]> {
const rows = nodeIds && nodeIds.length
? await db
.select()
.from(toolEdges)
.where(
sql`${toolEdges.fromNode} IN (${sql.join(
nodeIds.map((id) => sql`${id}`),
sql`, `,
)}) OR ${toolEdges.toNode} IN (${sql.join(
nodeIds.map((id) => sql`${id}`),
sql`, `,
)})`,
)
: await db.select().from(toolEdges);
return rows.map(row2edge);
}
export async function getNode(idOrName: string): Promise<ResolvedNode | null> {
const rows = await db
.select()
.from(toolNodes)
.where(sql`${toolNodes.id} = ${idOrName} OR ${toolNodes.name} = ${idOrName}`)
.limit(1);
return rows[0] ? row2node(rows[0]) : null;
}
export async function getNodeByName(name: string): Promise<ResolvedNode | null> {
const rows = await db
.select()
.from(toolNodes)
.where(eq(toolNodes.name, name))
.limit(1);
return rows[0] ? row2node(rows[0]) : null;
}
// ---------------------------------------------------------------- subgraph
/**
* Resolve a subgraph for one user turn:
* 1. Tag the intent text.
* 2. Pick all verified nodes whose capability tags overlap with the
* intent tags ("seed nodes").
* 3. Expand by 1 hop along the edges so the model sees the immediate
* connections (e.g. literature_search → summarization).
* 4. Cap to `SUBGRAPH_MAX_NODES`.
*
* If `includeShadowNodeIds` is provided, those provisional nodes are
* appended to the result so a reviewer can drive a shadow turn through
* them.
*/
export async function resolveSubgraph(args: {
intentText: string;
includeShadowNodeIds?: string[];
}): Promise<ResolvedSubgraph> {
const matchedTags = tagsForIntent(args.intentText);
if (matchedTags.length === 0) {
return { nodes: [], edges: [], matchedTags };
}
const verified = await listNodes({ status: "verified" });
const seeds = verified.filter((n) =>
n.capabilityTags.some((t) => matchedTags.includes(t)),
);
if (seeds.length === 0) {
return { nodes: [], edges: [], matchedTags };
}
const seedIds = new Set(seeds.map((n) => n.id));
// 1-hop expansion via edges.
const allEdges = await listEdges(Array.from(seedIds));
const neighbors = new Set<string>();
for (const e of allEdges) {
if (seedIds.has(e.fromNode)) neighbors.add(e.toNode);
if (seedIds.has(e.toNode)) neighbors.add(e.fromNode);
}
const neighborRows = neighbors.size
? await db
.select()
.from(toolNodes)
.where(
and(
inArray(toolNodes.id, Array.from(neighbors)),
eq(toolNodes.status, "verified"),
),
)
: [];
const finalMap = new Map<string, ResolvedNode>();
for (const n of seeds) finalMap.set(n.id, n);
for (const r of neighborRows) {
const n = row2node(r);
if (!finalMap.has(n.id)) finalMap.set(n.id, n);
if (finalMap.size >= SUBGRAPH_MAX_NODES) break;
}
// Optional shadow nodes (provisional) for reviewer-driven sessions.
if (args.includeShadowNodeIds && args.includeShadowNodeIds.length) {
const shadowRows = await db
.select()
.from(toolNodes)
.where(inArray(toolNodes.id, args.includeShadowNodeIds));
for (const r of shadowRows) {
const n = row2node(r);
if (!finalMap.has(n.id)) finalMap.set(n.id, n);
}
}
const finalIds = new Set(finalMap.keys());
const finalEdges = allEdges.filter(
(e) => finalIds.has(e.fromNode) && finalIds.has(e.toNode),
);
return {
nodes: Array.from(finalMap.values()),
edges: finalEdges,
matchedTags,
};
}
/**
* Render a short textual description of a subgraph for inclusion in the
* system prompt — e.g. "literature_search (search_pubmed) →
* summarization (summarize_literature_bucket)".
*/
export function describeSubgraph(sg: ResolvedSubgraph): string {
if (sg.nodes.length === 0) return "";
const byId = new Map(sg.nodes.map((n) => [n.id, n.name]));
const lines: string[] = [];
lines.push(
`Tool capability subgraph for this turn (capabilities: ${sg.matchedTags.join(", ")}):`,
);
for (const n of sg.nodes) {
const tagStr = n.capabilityTags.join("/") || "general";
lines.push(`- ${n.name} [${tagStr}]: ${n.description.slice(0, 140)}`);
}
if (sg.edges.length) {
lines.push("Connections:");
for (const e of sg.edges.slice(0, 20)) {
const a = byId.get(e.fromNode);
const b = byId.get(e.toNode);
if (a && b) lines.push(`- ${a} —[${e.relation}]→ ${b}`);
}
}
return lines.join("\n");
}
// ---------------------------------------------------------------- writes
export async function upsertNode(node: InsertToolNodeRow): Promise<ResolvedNode> {
const existing = await db
.select()
.from(toolNodes)
.where(eq(toolNodes.name, node.name))
.limit(1);
if (existing[0]) {
await db
.update(toolNodes)
.set({
description: node.description,
capabilityTags: node.capabilityTags,
inputKind: node.inputKind ?? existing[0].inputKind,
outputKind: node.outputKind ?? existing[0].outputKind,
ownerProcess: node.ownerProcess,
specJson: node.specJson,
handlerRef: node.handlerRef ?? existing[0].handlerRef,
costHint: node.costHint ?? existing[0].costHint,
latencyHintMs: node.latencyHintMs ?? existing[0].latencyHintMs,
version: existing[0].version + 1,
updatedAt: new Date(),
})
.where(eq(toolNodes.id, existing[0].id));
const fresh = await db
.select()
.from(toolNodes)
.where(eq(toolNodes.id, existing[0].id))
.limit(1);
return row2node(fresh[0]!);
}
const id = node.id || newId("tnode");
await db.insert(toolNodes).values({ ...node, id });
return row2node({ ...(node as ToolNodeRow), id, createdAt: new Date(), updatedAt: new Date(), version: 1, status: node.status ?? "verified" } as ToolNodeRow);
}
export async function upsertEdge(edge: InsertToolEdgeRow): Promise<void> {
const id = edge.id || newId("tedge");
// Edges are keyed by (fromNode, toNode, relation) under a unique
// constraint. On conflict we re-set the contract jsonb so seed
// changes (e.g. #156 deriveContract) propagate to existing rows
// instead of leaving them with their pre-contract `NULL` shape.
await db
.insert(toolEdges)
.values({ ...edge, id })
.onConflictDoUpdate({
target: [toolEdges.fromNode, toolEdges.toNode, toolEdges.relation],
set: {
contract: edge.contract ?? null,
weight: edge.weight ?? 1.0,
},
});
}
export async function recordInvocation(args: {
nodeName: string;
ok: boolean;
argsSummary: Record<string, unknown> | null;
resultPreview: unknown;
errorCode?: string | null;
durationMs?: number;
shadowUserId?: string | null;
}): Promise<void> {
try {
const node = await getNodeByName(args.nodeName);
if (!node) return;
if (node.status === "deprecated") {
// Direct-reference invocation of a deprecated node. We still
// record telemetry (so we can see how often the runtime is
// hitting deprecated names), but we surface a structured warn
// so operators / agents can route to the replacement.
const replacedBy =
((node.spec as { replacedBy?: unknown })?.replacedBy as
| string
| undefined) ?? null;
logger.warn(
{
nodeName: args.nodeName,
nodeId: node.id,
replacedBy,
},
"tool-graph: invocation of deprecated node — caller should switch to replacement",
);
}
await db.insert(toolNodeEvidence).values({
id: newId("tev"),
nodeId: node.id,
kind: args.shadowUserId ? "shadow_run" : "invocation",
payload: {
args: args.argsSummary,
result_preview: redactPreview(args.resultPreview),
error_code: args.errorCode ?? null,
duration_ms: args.durationMs ?? null,
},
success: args.ok ? 1 : 0,
failure: args.ok ? 0 : 1,
shadowUserId: args.shadowUserId ?? null,
});
} catch (err) {
logger.debug({ err }, "tool-graph recordInvocation failed");
}
}
// ---------------------------------------------------------------- seam health (#157)
/** Bumped whenever the EMA / scoring formula changes. */
export const SEAM_HEALTH_FORMULA_VERSION = 1;
/** Smoothing factor for the EMAs in `recomputeEdgeHealth`. */
const EMA_ALPHA = 0.2;
export interface CaptureSeamHealthArgs {
/** Downstream consumer node name (the node about to be invoked, or
* the atomic node whose payload we are observing). */
downstreamName: string;
/** Optional upstream producer name. When provided we resolve the
* exact (upstream → downstream) edge and its contract; when absent
* we fold the sample against every incoming edge of the downstream
* node so atomic-step capture still records *something* even when
* the producer cannot be identified. */
upstreamName?: string | null;
/** The actual payload that crossed the seam. May be any JSON. */
payload: unknown;
/** Optional invocation correlation id (for cross-row tracing). */
invocationId?: string | null;
}
/**
* Capture one seam-health sample. Best-effort: every error path is
* swallowed so capture cannot break the calling tool flow. Returns the
* computed sample (or `null` when nothing was persisted) so callers and
* tests can introspect.
*/
export async function captureSeamHealth(
args: CaptureSeamHealthArgs,
): Promise<{ edgeId: string; sample: SeamSample } | null> {
try {
const downstream = await getNodeByName(args.downstreamName);
if (!downstream) return null;
let edges: Array<{ id: string; fromNode: string; toNode: string; contract: ContractSpec | null }> = [];
if (args.upstreamName) {
const upstream = await getNodeByName(args.upstreamName);
if (!upstream) return null;
const rows = await db
.select()
.from(toolEdges)
.where(and(eq(toolEdges.fromNode, upstream.id), eq(toolEdges.toNode, downstream.id)));
edges = rows.map((r) => ({
id: r.id,
fromNode: r.fromNode,
toNode: r.toNode,
contract: (r.contract as ContractSpec | null) ?? null,
}));
} else {
const rows = await db
.select()
.from(toolEdges)
.where(eq(toolEdges.toNode, downstream.id));
edges = rows.map((r) => ({
id: r.id,
fromNode: r.fromNode,
toNode: r.toNode,
contract: (r.contract as ContractSpec | null) ?? null,
}));
}
if (edges.length === 0) return null;
let firstResult: { edgeId: string; sample: SeamSample } | null = null;
for (const e of edges) {
const sample = evaluateSeam(e.contract, args.payload);
await db.insert(toolNodeEvidence).values({
id: newId("tev"),
nodeId: downstream.id,
kind: "seam_health",
payload: {
edge_id: e.id,
upstream_node: e.fromNode,
downstream_node: e.toNode,
missing_fields: sample.missingFields,
unused_fields: sample.unusedFields,
coverage: sample.coverage,
contract_issues: sample.contractIssues,
health_score: sample.healthScore,
payload_preview: redactPreview(args.payload),
invocation_id: args.invocationId ?? null,
formula_version: SEAM_HEALTH_FORMULA_VERSION,
},
success: sample.contractIssues.length === 0 && sample.missingFields.length === 0 ? 1 : 0,
failure: sample.contractIssues.length > 0 || sample.missingFields.length > 0 ? 1 : 0,
});
if (!firstResult) firstResult = { edgeId: e.id, sample };
}
return firstResult;
} catch (err) {
logger.debug({ err }, "tool-graph captureSeamHealth failed (non-blocking)");
return null;
}
}
/**
* Capture seam-health samples for *every* outgoing edge from
* `upstreamName`. This is the convenience entry point used by the
* runtime: when an atomic step completes, fold its output against each
* declared downstream consumer's contract. Returns the list of captured
* results (or empty list on no-op / failure). Always non-blocking.
*/
export async function captureOutgoingSeamSamples(
upstreamName: string,
payload: unknown,
): Promise<Array<{ edgeId: string; sample: SeamSample }>> {
try {
const upstream = await getNodeByName(upstreamName);
if (!upstream) return [];
const edges = await db
.select()
.from(toolEdges)
.where(eq(toolEdges.fromNode, upstream.id));
if (edges.length === 0) return [];
const downstreamNodes = await db
.select()
.from(toolNodes)
.where(inArray(toolNodes.id, edges.map((e) => e.toNode)));
const nameById = new Map(downstreamNodes.map((n) => [n.id, n.name]));
const out: Array<{ edgeId: string; sample: SeamSample }> = [];
for (const e of edges) {
const downName = nameById.get(e.toNode);
if (!downName) continue;
const r = await captureSeamHealth({
upstreamName,
downstreamName: downName,
payload,
});
if (r) out.push(r);
}
return out;
} catch (err) {
logger.debug({ err }, "tool-graph captureOutgoingSeamSamples failed (non-blocking)");
return [];
}
}
interface SeamRollup {
count: number;
contractIssues: number;
missingFields: number;
coverageSum: number;
healthSum: number;
topMissing: Map<string, number>;
topIssues: Map<string, number>;
lastSampleAt: Date;
/** ULID of the highest evidence row folded; "" when nothing folded. */
lastFoldedEvidenceId: string;
}
function foldSamplesIntoRollup(
rows: Array<{ id: string; payload: unknown; createdAt: Date }>,
): SeamRollup {
const r: SeamRollup = {
count: 0,
contractIssues: 0,
missingFields: 0,
coverageSum: 0,
healthSum: 0,
topMissing: new Map(),
topIssues: new Map(),
lastSampleAt: new Date(0),
lastFoldedEvidenceId: "",
};
for (const row of rows) {
const p = (row.payload || {}) as {
missing_fields?: string[];
contract_issues?: string[];
coverage?: number;
health_score?: number;
};
r.count += 1;
const mf = Array.isArray(p.missing_fields) ? p.missing_fields : [];
const ci = Array.isArray(p.contract_issues) ? p.contract_issues : [];
r.missingFields += mf.length;
r.contractIssues += ci.length;
r.coverageSum += typeof p.coverage === "number" ? p.coverage : 1;
r.healthSum += typeof p.health_score === "number" ? p.health_score : 1;
for (const f of mf) r.topMissing.set(f, (r.topMissing.get(f) ?? 0) + 1);
for (const c of ci) r.topIssues.set(c, (r.topIssues.get(c) ?? 0) + 1);
if (row.createdAt > r.lastSampleAt) r.lastSampleAt = row.createdAt;
// ULIDs sort lexicographically in time order. We pick the
// string-greatest id we have seen as the watermark so the next
// recompute pass can do a precise `id > lastFoldedEvidenceId`
// filter instead of a millisecond-truncated timestamp compare.
if (row.id > r.lastFoldedEvidenceId) r.lastFoldedEvidenceId = row.id;
}
return r;
}
function topN(m: Map<string, number>, n: number): Record<string, number> {
const out: Record<string, number> = {};
const sorted = Array.from(m.entries()).sort((a, b) => b[1] - a[1]).slice(0, n);
for (const [k, v] of sorted) out[k] = v;
return out;
}
export interface RecomputeEdgeHealthResult {
edgesUpdated: number;
samplesFolded: number;
}
/**
* Walk new `seam_health` evidence rows added since each edge's stored
* `computed_at`, fold them into per-edge rolling EMAs, and upsert
* `tool_edge_health`. Designed to be safe to run repeatedly — it never
* re-reads samples it has already folded. Edges with no contract and
* no samples are simply skipped.
*/
export async function recomputeEdgeHealth(): Promise<RecomputeEdgeHealthResult> {
const allEdges = await db.select().from(toolEdges);
if (allEdges.length === 0) return { edgesUpdated: 0, samplesFolded: 0 };
let updated = 0;
let folded = 0;
for (const edge of allEdges) {
// Per-edge transaction. Acquires `FOR UPDATE` on the existing
// health row (or an advisory lock when there is no row yet) so that
// concurrent recomputes serialize on each edge — preventing both
// (a) lost samples from racing reads and (b) double-folded samples
// from clobbering writes. The watermark we advance to is the max
// `created_at` of the rows we actually folded — never wall-clock
// `now()` — so any sample that lands during the read/write window
// is guaranteed to be picked up by the next pass.
await db.transaction(async (tx) => {
// Serialize on this edge id — works regardless of whether a
// tool_edge_health row already exists. The hash is a stable 32-bit
// projection of the edge id (postgres `hashtext` is ok for locks).
await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtext(${edge.id}))`);
const existingRows = await tx
.select()
.from(toolEdgeHealth)
.where(eq(toolEdgeHealth.edgeId, edge.id))
.limit(1);
const cur = existingRows[0];
// Exactly-once fold via UPDATE-claim: atomically stamp every
// unfolded `seam_health` row for this edge with `seam_folded_at`
// and return the row payload in the same statement. This avoids
// the time-cursor / ULID-cursor race entirely — a writer that
// commits *after* the SELECT but with `created_at` < the
// cursor (or with a smaller ULID) is still picked up here on
// the next pass because its `seam_folded_at` is still NULL.
const claimed = await tx.execute(
sql`
UPDATE tool_node_evidence
SET seam_folded_at = now()
WHERE kind = 'seam_health'
AND seam_folded_at IS NULL
AND payload ->> 'edge_id' = ${edge.id}
RETURNING id, payload, created_at
`,
);
const rows = (claimed.rows as Array<{
id: string;
payload: unknown;
created_at: Date;
}>).map((r) => ({ id: r.id, payload: r.payload, createdAt: r.created_at }));
if (rows.length === 0) return;
const rollup = foldSamplesIntoRollup(rows);
folded += rollup.count;
const sampleCoverage = rollup.coverageSum / rollup.count;
const sampleHealth = rollup.healthSum / rollup.count;
const newEmaCov = cur
? cur.emaCoverage * (1 - EMA_ALPHA) + sampleCoverage * EMA_ALPHA
: sampleCoverage;
const newEmaHealth = cur
? cur.emaHealthScore * (1 - EMA_ALPHA) + sampleHealth * EMA_ALPHA
: sampleHealth;
// Watermark = the most recent sample we actually folded. Any
// sample written concurrently with `created_at <= since` will be
// skipped (already counted), and anything strictly later than
// `lastSampleAt` will be picked up next pass. We also require
// `gt(...)` (not `gte`) so equal-timestamp duplicates don't get
// counted twice — Postgres `now()` resolution is microseconds and
// the writers use server-side `now()`, so collisions are real.
// (computedAt remains an observability timestamp; the *real*
// watermark is `lastFoldedEvidenceId` — see foldSamplesIntoRollup.)
const nextRow = {
id: cur?.id ?? newId("tedh"),
edgeId: edge.id,
traversalCount: (cur?.traversalCount ?? 0) + rollup.count,
contractIssueCount: (cur?.contractIssueCount ?? 0) + rollup.contractIssues,
missingFieldCount: (cur?.missingFieldCount ?? 0) + rollup.missingFields,
emaCoverage: newEmaCov,
emaHealthScore: newEmaHealth,
topMissingFields: topN(rollup.topMissing, 5),
topContractIssues: topN(rollup.topIssues, 5),
formulaVersion: SEAM_HEALTH_FORMULA_VERSION,
lastSampleAt: rollup.lastSampleAt,
lastFoldedEvidenceId: rollup.lastFoldedEvidenceId,
computedAt: new Date(),
};
if (cur) {
await tx
.update(toolEdgeHealth)
.set({
traversalCount: nextRow.traversalCount,
contractIssueCount: nextRow.contractIssueCount,
missingFieldCount: nextRow.missingFieldCount,
emaCoverage: nextRow.emaCoverage,
emaHealthScore: nextRow.emaHealthScore,
topMissingFields: nextRow.topMissingFields,
topContractIssues: nextRow.topContractIssues,
formulaVersion: nextRow.formulaVersion,
lastSampleAt: nextRow.lastSampleAt,
lastFoldedEvidenceId: nextRow.lastFoldedEvidenceId,
computedAt: nextRow.computedAt,
})
.where(eq(toolEdgeHealth.id, cur.id));
} else {
await tx.insert(toolEdgeHealth).values(nextRow);
}
updated += 1;
});
}
return { edgesUpdated: updated, samplesFolded: folded };
}
export interface EdgeHealthView {
edgeId: string;
fromNode: string;
toNode: string;
relation: string;
traversalCount: number;
contractIssueCount: number;
missingFieldCount: number;
emaCoverage: number;
emaHealthScore: number;
topMissingFields: Record<string, number>;
topContractIssues: Record<string, number>;
formulaVersion: number;
lastSampleAt: string | null;
computedAt: string;
}
function rowToHealthView(
h: ToolEdgeHealthRow,
e: ToolEdgeRow,
): EdgeHealthView {
return {
edgeId: h.edgeId,
fromNode: e.fromNode,
toNode: e.toNode,
relation: e.relation,
traversalCount: h.traversalCount,
contractIssueCount: h.contractIssueCount,
missingFieldCount: h.missingFieldCount,
emaCoverage: h.emaCoverage,
emaHealthScore: h.emaHealthScore,
topMissingFields: (h.topMissingFields as Record<string, number>) || {},
topContractIssues: (h.topContractIssues as Record<string, number>) || {},
formulaVersion: h.formulaVersion,
lastSampleAt: h.lastSampleAt ? h.lastSampleAt.toISOString() : null,
computedAt: h.computedAt.toISOString(),
};
}
export async function listEdgeHealth(): Promise<EdgeHealthView[]> {
const edges = await db.select().from(toolEdges);
const edgeById = new Map(edges.map((e) => [e.id, e]));
const health = await db.select().from(toolEdgeHealth);
const out: EdgeHealthView[] = [];
for (const h of health) {
const e = edgeById.get(h.edgeId);
if (e) out.push(rowToHealthView(h, e));
}
return out;
}
export async function getEdgeHealth(edgeId: string, sampleLimit = 20): Promise<{
edge: EdgeHealthView | null;
recent: Array<{ id: string; createdAt: string; payload: Record<string, unknown> }>;
}> {
const edges = await db.select().from(toolEdges).where(eq(toolEdges.id, edgeId)).limit(1);
if (!edges[0]) return { edge: null, recent: [] };
const h = await db.select().from(toolEdgeHealth).where(eq(toolEdgeHealth.edgeId, edgeId)).limit(1);
const samples = await db
.select()
.from(toolNodeEvidence)
.where(
and(
eq(toolNodeEvidence.kind, "seam_health"),
sql`${toolNodeEvidence.payload} ->> 'edge_id' = ${edgeId}`,
),
)
.orderBy(desc(toolNodeEvidence.createdAt))
.limit(sampleLimit);
return {
edge: h[0] ? rowToHealthView(h[0], edges[0]) : null,
recent: samples.map((s) => ({
id: s.id,
createdAt: s.createdAt.toISOString(),
payload: (s.payload as Record<string, unknown>) || {},
})),
};
}
export async function listUnhealthyEdges(opts: {
minTraversals?: number;
maxHealthScore?: number;
limit?: number;
} = {}): Promise<EdgeHealthView[]> {
const minT = opts.minTraversals ?? 1;
const maxScore = opts.maxHealthScore ?? 0.85;
const limit = opts.limit ?? 50;
const all = await listEdgeHealth();
return all
.filter((h) => h.traversalCount >= minT && h.emaHealthScore <= maxScore)
.sort((a, b) => a.emaHealthScore - b.emaHealthScore)
.slice(0, limit);
}
/**
* Keys whose VALUES are masked before persistence (case-insensitive
* substring match on the key name). Catches the obvious tokens / API
* keys / passwords / cookies / authorization headers that may surface
* in tool results. Conservative by design: false positives just mask a
* harmless string; false negatives would store the secret in the DB.
*/
const SENSITIVE_KEY_PATTERNS = [
/token/i,
/secret/i,
/password/i,
/passwd/i,
/api[_-]?key/i,
/authorization/i,
/cookie/i,
/session/i,
/bearer/i,
/private[_-]?key/i,
/access[_-]?key/i,
];
function looksSensitive(key: string): boolean {
for (const re of SENSITIVE_KEY_PATTERNS) if (re.test(key)) return true;
return false;
}
function maskDeep(value: unknown, depth: number): unknown {
if (depth <= 0 || value == null) return value;
if (Array.isArray(value)) {
return value.slice(0, 32).map((v) => maskDeep(v, depth - 1));
}
if (typeof value === "object") {
const out: Record<string, unknown> = {};
for (const [k, v] of Object.entries(value as Record<string, unknown>)) {
out[k] = looksSensitive(k) ? "[REDACTED]" : maskDeep(v, depth - 1);
}
return out;
}
return value;
}
function redactPreview(value: unknown): unknown {
if (value === null || value === undefined) return null;
try {
const masked = maskDeep(value, 6);
const s = JSON.stringify(masked);
if (s.length <= 800) return JSON.parse(s);
return s.slice(0, 800) + "…";
} catch {
return String(value).slice(0, 800);
}
}
export async function recordPlannerGap(
capabilityTag: string,
context: Record<string, unknown>,
): Promise<void> {
if (!capabilityTag) return;
try {
const tag = capabilityTag.toLowerCase();
const existing = await db
.select()
.from(toolGapSignals)
.where(eq(toolGapSignals.capabilityTag, tag))
.limit(1);
if (existing[0]) {
await db
.update(toolGapSignals)
.set({
invocationCount: existing[0].invocationCount + 1,
lastContext: context,
lastSeenAt: new Date(),
status: existing[0].status === "dismissed" ? "dismissed" : existing[0].status,
})
.where(eq(toolGapSignals.id, existing[0].id));
} else {
await db.insert(toolGapSignals).values({
id: newId("tgap"),
capabilityTag: tag,
invocationCount: 1,
lastContext: context,
status: "open",
});
}
} catch (err) {
logger.debug({ err }, "tool-graph recordPlannerGap failed");
}
}
// ---------------------------------------------------------------- auto-extend
/**
* For every gap signal with status="open" whose count crosses the
* threshold, create a provisional tool node, attach it to the nearest
* verified node by capability-tag overlap, and link the gap signal back
* to it. Returns the number of provisional nodes created.
*/
export async function autoExtendIfNeeded(): Promise<number> {
const open = await db
.select()
.from(toolGapSignals)
.where(eq(toolGapSignals.status, "open"));
let created = 0;
for (const gap of open) {
if (gap.invocationCount < GAP_AUTO_EXTEND_THRESHOLD) continue;
const tag = gap.capabilityTag;
const verifiedRows = await db
.select()
.from(toolNodes)
.where(eq(toolNodes.status, "verified"));
// pick nearest verified by tag overlap
let nearest: ToolNodeRow | null = null;
let bestOverlap = -1;
for (const r of verifiedRows) {
const tags = Array.isArray(r.capabilityTags)
? (r.capabilityTags as string[])
: [];
const overlap = tags.includes(tag) ? 2 : tags.some((t) => t.startsWith(tag) || tag.startsWith(t)) ? 1 : 0;
if (overlap > bestOverlap) {
bestOverlap = overlap;
nearest = r;
}
}
const provName = `auto_${tag}_${gap.id.slice(-6)}`;
const baseStub = generateHandlerStub(tag, provName);
// #161: search the cross-seam template library for a structurally
// identical (or near) past spawn. The match is *advisory only* — we
// attach it to the provisional node's specJson.proposalContext so
// the reviewer sees the banner; we never auto-promote.
const fpInput: SeamFingerprintInput = {
failureMode: "capability_gap",
downstreamInputSchema: (baseStub.spec as { parameters?: IOSchema })
.parameters,
upstreamOutputSchema:
(nearest?.specJson as { outputSchema?: IOSchema } | null)?.outputSchema,
capabilityTag: tag,
};
let templateMatch: TemplateMatch | null = null;
try {
templateMatch = await searchTemplates(fpInput);
} catch (err) {
logger.debug({ err }, "tool-graph: searchTemplates failed (non-blocking)");
}
let stub = baseStub;
if (templateMatch) {
// QUARANTINE-CONT-009 — provisional reviewer-side stub: 把 template
// skeleton 当成 reviewer 待替换的占位 handler;真实 review 流程 B1
// 接通后必须删除这一支并撤掉 hit。
// @deprecated CONT-009. Provisional reviewer placeholder. Real wiring in B1.
void import("./quarantine/index.ts").then((q) =>
q.recordQuarantineHit("CONT-009", {
kind: "reviewer",
site: "tool-graph.ts:templateMatch_stub",
capabilityTag: tag,
provisionalName: provName,
templateId: templateMatch.template.id,
strength: templateMatch.strength,
}),
);
// Use the template's contract-shaped skeleton as the candidate's
// starting point. Reviewer will replace placeholders.
const tplSpec = (templateMatch.template.specSkeleton ||
{}) as Record<string, unknown>;
const tplHandler = templateMatch.template.handlerSkeleton || baseStub.handler;
stub = {
spec: {
...baseStub.spec,
...tplSpec,
name: provName,
description: `${baseStub.spec.description as string} (seeded from template ${templateMatch.template.id}${templateMatch.strength} match)`,
},
handler: applyParameterizationMarkers(tplHandler, templateMatch),
};
// Bump offered count + log so the loop is observable.
void recordTemplateOffered(templateMatch.template.id);
logger.info(
{
gapId: gap.id,
templateId: templateMatch.template.id,
strength: templateMatch.strength,
score: templateMatch.score,
},
"tool-graph: spawn-template offered to reviewer",
);
}
const proposalContext = {
capability_tag: tag,
gap_signal_id: gap.id,
invocation_count: gap.invocationCount,
template_match: templateMatch
? {
template_id: templateMatch.template.id,
strength: templateMatch.strength,
score: Number(templateMatch.score.toFixed(3)),
source_node_name: templateMatch.template.sourceNodeName,
parameterization_gaps: templateMatch.parameterizationGaps,
// Surfaced for the reviewer banner so they can judge
// whether to trust the suggestion. These are snapshot at
// proposal time; the live counters live on the row.
offered_count: templateMatch.template.offeredCount,
reuse_count: templateMatch.template.reuseCount,
success_count: templateMatch.template.successCount,
reject_count: templateMatch.template.rejectCount,
}
: null,
};
const specJson = {
...(stub.spec as Record<string, unknown>),
proposalContext,
};
const provisional = await upsertNode({
id: newId("tnode"),
name: provName,
description: `Auto-extended provisional tool for capability '${tag}'. Drafted from ${gap.invocationCount} planner-gap signals; needs human review.`,
capabilityTags: [tag],
inputKind: "json",
outputKind: "json",
status: "provisional",
ownerProcess: nearest?.ownerProcess === "python" ? "python" : "node",
specJson: specJson as Record<string, unknown>,
handlerStub: stub.handler,
createdBy: "auto",
});
if (nearest) {
// Per #156 invariant: every edge ships with a contract. Provisional
// auto-extended edges have unknown true I/O shapes, so we attach
// an advisory contract derived from the parent's outputSchema /
// the stub's inputSchema and tag it provisional so reviewers
// know to refine it during promotion.
const nearestSpec = (nearest.specJson || {}) as {
outputSchema?: IOSchema;
};
const stubSpec = (stub.spec || {}) as { inputSchema?: IOSchema };
const provisionalContract: ContractSpec = {
produces: nearestSpec.outputSchema || { type: "object" },
consumes: stubSpec.inputSchema || { type: "object" },
mappingHints: {
advisory: true,
source: "auto_extend_provisional",
gap_signal_id: gap.id,
},
};
await upsertEdge({
id: newId("tedge"),
fromNode: nearest.id,
toNode: provisional.id,
relation: "alternative_to",
weight: 0.5,
contract: provisionalContract,
});
}
await db.insert(toolNodeEvidence).values({
id: newId("tev"),
nodeId: provisional.id,
kind: "auto_extend",
payload: {
capability_tag: tag,
gap_signal_id: gap.id,
invocation_count: gap.invocationCount,
last_context: gap.lastContext,
},
success: 0,
failure: 0,
});
await db
.update(toolGapSignals)
.set({ status: "extended", extendedNodeId: provisional.id })
.where(eq(toolGapSignals.id, gap.id));
created += 1;
}
// #160: every #158 splice may have changed which atoms are reachable
// from an alias chain. Mark active summary paths pending_rebuild so
// the admin UI can flag the staleness; the next `seedToolGraph` call
// (boot-time, idempotent) re-derives them and flips the status back
// to "active".
if (created > 0) {
try {
const marked = await markSummaryPathsPendingRebuild();
if (marked > 0) {
logger.info(
{ marked, autoExtendCreated: created },
"tool-graph: marked summary paths pending_rebuild after #158 splice",
);
}
} catch (err) {
logger.debug(
{ err },
"tool-graph autoExtendIfNeeded: markSummaryPathsPendingRebuild failed (non-blocking)",
);
}
}
return created;
}
/**
* QUARANTINE-CONT-009 — Provisional handler stub.
* Auto-generated when no real handler is available for a capability tag.
* Real fix in B1 (graph-executor真接通) — implementer must delete this
* function + the corresponding KNOWN_CONTAMINATIONS entry.
* @deprecated CONT-009. Provisional placeholder. Real wiring in B1.
*/
export function generateHandlerStub(tag: string, name: string): {
spec: Record<string, unknown>;
handler: string;
} {
void import("./quarantine/index.ts").then((q) =>
q.recordQuarantineHit("CONT-009", {
kind: "handler",
site: "tool-graph.ts:generateHandlerStub",
capabilityTag: tag,
provisionalName: name,
}),
);
const spec = {
name,
description: `Provisional handler for capability '${tag}'. Replace this stub during review.`,
parameters: {
type: "object",
properties: {
query: { type: "string", description: "Natural language query." },
},
required: ["query"],
},
};
const handler = `// Stub handler for ${name} (capability: ${tag}).
// Auto-generated from planner-gap signals. Replace with real implementation
// before approving.
export async function invoke(args: { query: string }) {
return { error: "not_implemented", capability: "${tag}" };
}
`;
return { spec, handler };
}
// ---------------------------------------------------------------- admin
export interface ApproveNodeOptions {
/**
* #161: reviewer's choice w.r.t. an offered template. When omitted,
* we treat it as "fresh" (no template was offered, or reviewer
* ignored the banner). When set, we update the source template's
* counters accordingly.
*/
templateChoice?: TemplateChoice;
/**
* Override the source template id. Defaults to the templateMatch
* recorded in the provisional node's specJson.proposalContext.
*/
sourceTemplateId?: string | null;
reviewer?: string;
}
export async function approveNode(
id: string,
options: ApproveNodeOptions = {},
): Promise<ResolvedNode | null> {
const rows = await db
.select()
.from(toolNodes)
.where(eq(toolNodes.id, id))
.limit(1);
if (!rows[0]) return null;
const node = rows[0];
await db
.update(toolNodes)
.set({ status: "verified", updatedAt: new Date() })
.where(eq(toolNodes.id, id));
// ---------- #161: template accounting + ingestion ------------------
try {
const spec = (node.specJson || {}) as Record<string, unknown>;
const ctx = (spec.proposalContext || {}) as {
capability_tag?: string;
template_match?: { template_id?: string; strength?: string } | null;
};
const tags = Array.isArray(node.capabilityTags)
? (node.capabilityTags as string[])
: [];
const tag = ctx.capability_tag || tags[0] || "";
// Determine source template id (explicit override > spec context).
const matchedTemplateId =
options.sourceTemplateId !== undefined
? options.sourceTemplateId
: ctx.template_match?.template_id || null;
// Record reviewer choice if a template was offered.
// ORDER MATTERS: bump success_count FIRST so the demote check
// (triggered inside recordTemplateChoice → maybeAutoDemote) sees
// the post-success success rate. Otherwise a successful reuse
// could push a borderline template across the demote threshold.
if (matchedTemplateId) {
// Default to "fresh" when caller did not supply a choice — this
// is the safe accounting answer: an older client / scripted call
// without an explicit reviewer choice should NOT be counted as a
// template reuse and inflate success metrics.
const choice: TemplateChoice = options.templateChoice ?? "fresh";
if (choice !== "fresh") {
await recordTemplatePromoteResult(matchedTemplateId);
}
await recordTemplateChoice({
templateId: matchedTemplateId,
choice,
});
}
// Always learn from the promoted node — idempotent on fingerprint.
// This is the meta-evolution step: every successful spawn becomes a
// future template.
if (tag) {
// Find the alternative_to upstream peer to recover the seam shape.
let upstreamOutputSchema: IOSchema | undefined;
try {
const inEdges = await db
.select()
.from(toolEdges)
.where(eq(toolEdges.toNode, id));
for (const e of inEdges) {
if (e.relation !== "alternative_to") continue;
const up = await db
.select()
.from(toolNodes)
.where(eq(toolNodes.id, e.fromNode))
.limit(1);
const upSpec = (up[0]?.specJson || {}) as { outputSchema?: IOSchema };
if (upSpec.outputSchema) {
upstreamOutputSchema = upSpec.outputSchema;
break;
}
}
} catch (err) {
logger.debug({ err }, "tool-graph approveNode: upstream lookup failed");
}
const fpInput: SeamFingerprintInput = {
failureMode: "capability_gap",
downstreamInputSchema: spec.parameters as IOSchema | undefined,
upstreamOutputSchema,
capabilityTag: tag,
};
const persisted = await persistTemplateOnPromote({
fingerprintInput: fpInput,
promotedNodeName: node.name,
promotedInputSchema:
(spec.inputSchema as IOSchema | undefined) ||
(spec.parameters as IOSchema | undefined),
promotedOutputSchema: spec.outputSchema as IOSchema | undefined,
handlerSkeleton: node.handlerStub || "",
specSkeleton: spec,
});
if (persisted) {
await db.insert(toolNodeEvidence).values({
id: newId("tev"),
nodeId: id,
kind: "template_ingest",
payload: {
template_id: persisted.id,
fingerprint_hash_prefix: persisted.fingerprintHash.slice(0, 12),
failure_mode: persisted.failureMode,
reviewer: options.reviewer || "unknown",
template_choice: options.templateChoice || "fresh",
source_template_id: matchedTemplateId,
},
success: 1,
failure: 0,
});
}
}
} catch (err) {
logger.warn({ err, nodeId: id }, "tool-graph: template ingest failed (non-fatal)");
}
return getNode(id);
}
export interface RejectNodeOptions {
/** #161: when set, recorded against the originating template. */
templateChoice?: TemplateChoice;
sourceTemplateId?: string | null;
reviewer?: string;
}
export async function rejectNode(
id: string,
options: RejectNodeOptions = {},
): Promise<boolean> {
const rows = await db
.select()
.from(toolNodes)
.where(eq(toolNodes.id, id))
.limit(1);
if (!rows[0]) return false;
const node = rows[0];
// #161: account against the offered template *before* deleting the node
// so we still have access to the proposalContext.
try {
const spec = (node.specJson || {}) as Record<string, unknown>;
const ctx = (spec.proposalContext || {}) as {
template_match?: { template_id?: string } | null;
};
const matchedTemplateId =
options.sourceTemplateId !== undefined
? options.sourceTemplateId
: ctx.template_match?.template_id || null;
if (matchedTemplateId) {
// Default to "fresh" on reject — reviewer chose to discard the
// template's suggestion entirely. If they had originally picked
// Use/Use+edit, the caller can still pass that explicitly.
const choice: TemplateChoice = options.templateChoice ?? "fresh";
await recordTemplateChoice({
templateId: matchedTemplateId,
choice,
});
}
} catch (err) {
logger.debug({ err, nodeId: id }, "tool-graph rejectNode: template accounting failed");
}
await db.delete(toolNodes).where(eq(toolNodes.id, id));
// mark related gap signal as dismissed (best-effort)
await db
.update(toolGapSignals)
.set({ status: "dismissed" })
.where(eq(toolGapSignals.extendedNodeId, id));
return true;
}
export async function listNodeEvidence(nodeId: string, limit = 50) {
return db
.select()
.from(toolNodeEvidence)
.where(eq(toolNodeEvidence.nodeId, nodeId))
.orderBy(desc(toolNodeEvidence.createdAt))
.limit(limit);
}
export async function listGapSignals(
status?: "open" | "extended" | "dismissed",
) {
if (status) {
return db
.select()
.from(toolGapSignals)
.where(eq(toolGapSignals.status, status))
.orderBy(desc(toolGapSignals.lastSeenAt));
}
return db
.select()
.from(toolGapSignals)
.orderBy(desc(toolGapSignals.lastSeenAt));
}
// ---------------------------------------------------------------- deprecation (#159)
/**
* Deprecation tunables. All windows are in days. Defaults are conservative
* — the detector is opt-in via DEPRECATION_DETECTOR_ENABLED, and even when
* enabled it only proposes; no node is auto-deprecated.
*/
function envInt(name: string, fallback: number): number {
const raw = (process.env[name] ?? "").trim();
if (!raw) return fallback;
const n = Number.parseInt(raw, 10);
return Number.isFinite(n) && n > 0 ? n : fallback;
}
export function deprecationDetectorEnabled(): boolean {
const v = (process.env["DEPRECATION_DETECTOR_ENABLED"] ?? "").trim().toLowerCase();
return v === "1" || v === "true" || v === "yes" || v === "on";
}
export function deprecationConfig(): {
coldDays: number;
deferDays: number;
reArmDays: number;
archiveAfterDays: number;
} {
return {
coldDays: envInt("DEPRECATION_COLD_DAYS", 60),
deferDays: envInt("DEPRECATION_DEFER_DAYS", 30),
reArmDays: envInt("DEPRECATION_RE_ARM_DAYS", 90),
archiveAfterDays: envInt("DEPRECATION_ARCHIVE_AFTER_DAYS", 180),
};
}
export type DeprecationClassification =
| "active"
| "cold"
| "superseded"
| "redundant";
export interface DeprecationCandidateProposal {
nodeId: string;
nodeName: string;
classification: DeprecationClassification;
/** Snapshot of the metrics that drove the classification. Stored
* verbatim in `proposal_context` so reviewers see why the detector
* flagged the node. */
supportingMetrics: {
lastInvocationAt: string | null;
invocationCount: number;
daysSinceLastInvocation: number | null;
inboundEdgeCount: number;
outboundEdgeCount: number;
/** For "redundant": the sibling node that subsumes this one. */
redundantWithNodeId?: string;
redundantWithNodeName?: string;
siblingTraversalRatio?: number;
siblingHealthScore?: number;
selfHealthScore?: number;
/** For "superseded": the splice/replacement node id from
* `spec_json.replacedBy` (set by Task #158). */
replacedBy?: string;
};
/** Other verified nodes whose outgoing edges still point at this node.
* Reviewers must understand the blast radius before approving. */
dependencies: Array<{ nodeId: string; nodeName: string; relation: string }>;
}
interface NodeMetrics {
lastInvocationAt: Date | null;
invocationCount: number;
}
async function gatherInvocationMetrics(): Promise<Map<string, NodeMetrics>> {
const out = new Map<string, NodeMetrics>();
const rows = await db
.select({
nodeId: toolNodeEvidence.nodeId,
kind: toolNodeEvidence.kind,
createdAt: toolNodeEvidence.createdAt,
})
.from(toolNodeEvidence)
.where(inArray(toolNodeEvidence.kind, ["invocation", "shadow_run"]));
for (const r of rows) {
const cur = out.get(r.nodeId) ?? { lastInvocationAt: null, invocationCount: 0 };
cur.invocationCount += 1;
if (!cur.lastInvocationAt || r.createdAt > cur.lastInvocationAt) {
cur.lastInvocationAt = r.createdAt;
}
out.set(r.nodeId, cur);
}
return out;
}
/**
* Pure classifier — given the verified-node universe and the metric
* snapshot, return one proposal per non-active node. Does not touch the
* database. The detector wraps this with idempotent persistence.
*
* Heuristics:
* - cold: no invocation/shadow_run row newer than `now - coldDays`.
* - superseded: `spec_json.replacedBy` is set (the #158 splice writes
* this when an alternative supersedes the node).
* - redundant: there exists a verified sibling node such that
* (a) capability_tags overlap by ≥1 tag,
* (b) sibling.invocationCount ≥ 10 × self.invocationCount,
* (c) self has ≥1 invocation (do not flag totally cold nodes —
* those go through the cold path with cleaner signal),
* (d) sibling EMA health ≥ 0.85 AND self EMA health ≤ 0.5
* (only meaningful when both have edges; otherwise skip).
* The thresholds are conservative because false positives delete
* real capability.
*/
export function classifyDeprecationCandidates(args: {
now: Date;
coldDays: number;
verifiedNodes: ResolvedNode[];
edges: ResolvedEdge[];
metrics: Map<string, NodeMetrics>;
edgeHealthByNodeId: Map<string, { count: number; sum: number }>;
}): DeprecationCandidateProposal[] {
const { now, coldDays, verifiedNodes, edges, metrics, edgeHealthByNodeId } = args;
const coldCutoff = new Date(now.getTime() - coldDays * 86_400_000);
const byId = new Map(verifiedNodes.map((n) => [n.id, n]));
const inboundCount = new Map<string, number>();
const outboundCount = new Map<string, number>();
const dependents = new Map<string, Array<{ from: string; relation: string }>>();
for (const e of edges) {
inboundCount.set(e.toNode, (inboundCount.get(e.toNode) ?? 0) + 1);
outboundCount.set(e.fromNode, (outboundCount.get(e.fromNode) ?? 0) + 1);
if (byId.has(e.toNode)) {
const arr = dependents.get(e.toNode) ?? [];
arr.push({ from: e.fromNode, relation: e.relation });
dependents.set(e.toNode, arr);
}
}
function healthOf(nodeId: string): number | null {
const h = edgeHealthByNodeId.get(nodeId);
if (!h || h.count === 0) return null;
return h.sum / h.count;
}
const out: DeprecationCandidateProposal[] = [];
for (const node of verifiedNodes) {
const m = metrics.get(node.id) ?? { lastInvocationAt: null, invocationCount: 0 };
const inbound = inboundCount.get(node.id) ?? 0;
const outbound = outboundCount.get(node.id) ?? 0;
const deps = (dependents.get(node.id) ?? [])
.map((d) => {
const dep = byId.get(d.from);
return dep ? { nodeId: dep.id, nodeName: dep.name, relation: d.relation } : null;
})
.filter((x): x is { nodeId: string; nodeName: string; relation: string } => !!x);
const baseMetrics = {
lastInvocationAt: m.lastInvocationAt ? m.lastInvocationAt.toISOString() : null,
invocationCount: m.invocationCount,
daysSinceLastInvocation: m.lastInvocationAt
? Math.floor((now.getTime() - m.lastInvocationAt.getTime()) / 86_400_000)
: null,
inboundEdgeCount: inbound,
outboundEdgeCount: outbound,
};
// ---- superseded (highest priority — explicit human/system signal)
const replacedBy = (node.spec as { replacedBy?: unknown })?.replacedBy;
if (typeof replacedBy === "string" && replacedBy.length > 0) {
out.push({
nodeId: node.id,
nodeName: node.name,
classification: "superseded",
supportingMetrics: { ...baseMetrics, replacedBy },
dependencies: deps,
});
continue;
}
// ---- redundant (conservative sibling overlap)
let redundantWith: ResolvedNode | null = null;
let siblingRatio = 0;
let siblingHealth: number | null = null;
const selfHealth = healthOf(node.id);
if (m.invocationCount > 0) {
for (const sib of verifiedNodes) {
if (sib.id === node.id) continue;
const overlap = sib.capabilityTags.some((t) => node.capabilityTags.includes(t));
if (!overlap) continue;
const sm = metrics.get(sib.id);
if (!sm) continue;
if (sm.invocationCount < m.invocationCount * 10) continue;
const sh = healthOf(sib.id);
// Both sides need health data and the sibling must be much healthier.
if (sh === null || selfHealth === null) continue;
if (sh < 0.85) continue;
if (selfHealth > 0.5) continue;
const ratio = sm.invocationCount / Math.max(1, m.invocationCount);
if (!redundantWith || ratio > siblingRatio) {
redundantWith = sib;
siblingRatio = ratio;
siblingHealth = sh;
}
}
}
if (redundantWith) {
out.push({
nodeId: node.id,
nodeName: node.name,
classification: "redundant",
supportingMetrics: {
...baseMetrics,
redundantWithNodeId: redundantWith.id,
redundantWithNodeName: redundantWith.name,
siblingTraversalRatio: siblingRatio,
siblingHealthScore: siblingHealth ?? undefined,
selfHealthScore: selfHealth ?? undefined,
},
dependencies: deps,
});
continue;
}
// ---- cold
const isCold =
m.lastInvocationAt === null
? // Never invoked: only flag as cold if the node has been around
// longer than coldDays. We can't see node createdAt here, but
// the detector caller filters on it; in this pure step, a
// never-invoked node always qualifies.
true
: m.lastInvocationAt < coldCutoff;
if (isCold) {
out.push({
nodeId: node.id,
nodeName: node.name,
classification: "cold",
supportingMetrics: baseMetrics,
dependencies: deps,
});
continue;
}
}
return out;
}
/**
* Run one detector pass: builds proposals, persists open candidate
* rows, and refreshes the supporting context on existing open rows.
* Idempotent: re-running without state changes makes no row updates
* beyond `updated_at`. Respects defer/re-arm windows.
*
* Returns counts so the admin route and tests can assert behaviour
* without having to re-query.
*/
export async function runDeprecationDetector(args: {
now?: Date;
coldDays?: number;
} = {}): Promise<{
proposalsConsidered: number;
candidatesCreated: number;
candidatesRefreshed: number;
skippedDeferred: number;
skippedReArm: number;
}> {
const cfg = deprecationConfig();
const now = args.now ?? new Date();
const coldDays = args.coldDays ?? cfg.coldDays;
// Filter to nodes older than coldDays — never-invoked freshly-seeded
// nodes should not be proposed on day 1.
const allVerifiedRows = await db
.select()
.from(toolNodes)
.where(eq(toolNodes.status, "verified"));
const ageCutoff = new Date(now.getTime() - coldDays * 86_400_000);
const verifiedRows = allVerifiedRows.filter(
(r) => r.createdAt <= ageCutoff,
);
const verifiedNodes = verifiedRows.map(row2node);
const allEdges = await db.select().from(toolEdges);
const resolvedEdges = allEdges.map(row2edge);
const metrics = await gatherInvocationMetrics();
// Aggregate edge health by node id (any edge attached to the node
// contributes; sum + count → mean).
const healthRows = await db.select().from(toolEdgeHealth);
const healthByEdgeId = new Map(healthRows.map((h) => [h.edgeId, h.emaHealthScore]));
const edgeHealthByNodeId = new Map<string, { count: number; sum: number }>();
for (const e of allEdges) {
const score = healthByEdgeId.get(e.id);
if (score === undefined) continue;
for (const id of [e.fromNode, e.toNode]) {
const cur = edgeHealthByNodeId.get(id) ?? { count: 0, sum: 0 };
cur.count += 1;
cur.sum += score;
edgeHealthByNodeId.set(id, cur);
}
}
const proposals = classifyDeprecationCandidates({
now,
coldDays,
verifiedNodes,
edges: resolvedEdges,
metrics,
edgeHealthByNodeId,
});
const existing = await db.select().from(toolDeprecationCandidates);
const existingByNodeId = new Map(existing.map((r) => [r.nodeId, r]));
let created = 0;
let refreshed = 0;
let skippedDeferred = 0;
let skippedReArm = 0;
const proposedNodeIds = new Set<string>();
for (const p of proposals) {
proposedNodeIds.add(p.nodeId);
const prior = existingByNodeId.get(p.nodeId);
if (prior) {
if (prior.status === "deferred" && prior.deferUntil && prior.deferUntil > now) {
skippedDeferred += 1;
continue;
}
if (prior.status === "rejected" && prior.reArmUntil && prior.reArmUntil > now) {
skippedReArm += 1;
continue;
}
if (prior.status === "approved") {
// Already deprecated; nothing to refresh.
continue;
}
// Refresh the open/deferred-but-armed/rejected-but-armed row so
// the latest metrics are visible to the reviewer. We scope the
// update to the snapshot status we observed at the start of the
// run; if a reviewer concurrently approved/deferred/rejected the
// candidate after our snapshot, the WHERE clause misses and we
// leave their decision intact (race-safe — the reviewer wins).
const updated = await db
.update(toolDeprecationCandidates)
.set({
status: "open",
classification: p.classification,
proposalContext: {
supportingMetrics: p.supportingMetrics,
dependencies: p.dependencies,
} as Record<string, unknown>,
deferUntil: null,
reArmUntil: null,
updatedAt: now,
})
.where(
and(
eq(toolDeprecationCandidates.id, prior.id),
eq(toolDeprecationCandidates.status, prior.status),
),
)
.returning({ id: toolDeprecationCandidates.id });
if (updated.length > 0) refreshed += 1;
continue;
}
await db.insert(toolDeprecationCandidates).values({
id: newId("tdep"),
nodeId: p.nodeId,
classification: p.classification,
status: "open",
proposalContext: {
supportingMetrics: p.supportingMetrics,
dependencies: p.dependencies,
} as Record<string, unknown>,
decidedBy: null,
decidedAt: null,
createdAt: now,
updatedAt: now,
});
created += 1;
}
// For previously-open candidates whose nodes are no longer flagged
// (e.g. someone exercised them again), close the open row out by
// marking it rejected with a short re-arm window so the reviewer
// does not see stale proposals. Deferred / approved rows are never
// touched by this sweep.
for (const row of existing) {
if (row.status !== "open") continue;
if (proposedNodeIds.has(row.nodeId)) continue;
// Race-safe: only auto-reject if the row is still "open" at write
// time. If a reviewer concurrently approved/deferred/rejected it,
// the WHERE clause misses and their decision wins.
await db
.update(toolDeprecationCandidates)
.set({
status: "rejected",
decidedBy: "system",
decidedAt: now,
reArmUntil: new Date(now.getTime() + cfg.reArmDays * 86_400_000),
updatedAt: now,
})
.where(
and(
eq(toolDeprecationCandidates.id, row.id),
eq(toolDeprecationCandidates.status, "open"),
),
);
}
return {
proposalsConsidered: proposals.length,
candidatesCreated: created,
candidatesRefreshed: refreshed,
skippedDeferred,
skippedReArm,
};
}
export interface DeprecationCandidateView {
id: string;
nodeId: string;
nodeName: string;
classification: DeprecationClassification;
status: "open" | "deferred" | "rejected" | "approved";
proposalContext: Record<string, unknown>;
deferUntil: string | null;
reArmUntil: string | null;
decidedBy: string | null;
decidedAt: string | null;
createdAt: string;
updatedAt: string;
}
function rowToCandidateView(
r: ToolDeprecationCandidateRow,
nameById: Map<string, string>,
): DeprecationCandidateView {
return {
id: r.id,
nodeId: r.nodeId,
nodeName: nameById.get(r.nodeId) ?? r.nodeId,
classification: r.classification as DeprecationClassification,
status: r.status as DeprecationCandidateView["status"],
proposalContext: (r.proposalContext as Record<string, unknown>) ?? {},
deferUntil: r.deferUntil ? r.deferUntil.toISOString() : null,
reArmUntil: r.reArmUntil ? r.reArmUntil.toISOString() : null,
decidedBy: r.decidedBy,
decidedAt: r.decidedAt ? r.decidedAt.toISOString() : null,
createdAt: r.createdAt.toISOString(),
updatedAt: r.updatedAt.toISOString(),
};
}
export async function listDeprecationCandidates(opts: {
status?: DeprecationCandidateView["status"] | "any";
} = {}): Promise<DeprecationCandidateView[]> {
const status = opts.status ?? "open";
const rows = status === "any"
? await db
.select()
.from(toolDeprecationCandidates)
.orderBy(desc(toolDeprecationCandidates.updatedAt))
: await db
.select()
.from(toolDeprecationCandidates)
.where(eq(toolDeprecationCandidates.status, status))
.orderBy(desc(toolDeprecationCandidates.updatedAt));
if (rows.length === 0) return [];
const nodeIds = Array.from(new Set(rows.map((r) => r.nodeId)));
const nodes = await db
.select({ id: toolNodes.id, name: toolNodes.name })
.from(toolNodes)
.where(inArray(toolNodes.id, nodeIds));
const nameById = new Map(nodes.map((n) => [n.id, n.name]));
return rows.map((r) => rowToCandidateView(r, nameById));
}
/**
* Approve a deprecation candidate: flips the candidate row to
* `approved` and the underlying node to `deprecated`. Idempotent — a
* second call on an already-approved row is a no-op. Returns null when
* the candidate id is unknown.
*/
export async function deprecateNode(
candidateId: string,
reviewer: string,
): Promise<DeprecationCandidateView | null> {
const rows = await db
.select()
.from(toolDeprecationCandidates)
.where(eq(toolDeprecationCandidates.id, candidateId))
.limit(1);
const cand = rows[0];
if (!cand) return null;
const now = new Date();
await db.transaction(async (tx) => {
await tx
.update(toolDeprecationCandidates)
.set({
status: "approved",
decidedBy: reviewer,
decidedAt: now,
deferUntil: null,
reArmUntil: null,
updatedAt: now,
})
.where(eq(toolDeprecationCandidates.id, candidateId));
await tx
.update(toolNodes)
.set({ status: "deprecated", updatedAt: now })
.where(eq(toolNodes.id, cand.nodeId));
await tx.insert(toolNodeEvidence).values({
id: newId("tev"),
nodeId: cand.nodeId,
kind: "deprecation_decision",
payload: {
decision: "deprecate",
candidate_id: candidateId,
classification: cand.classification,
reviewer,
},
success: 0,
failure: 0,
});
});
const out = await db
.select()
.from(toolDeprecationCandidates)
.where(eq(toolDeprecationCandidates.id, candidateId))
.limit(1);
if (!out[0]) return null;
const node = await db
.select({ id: toolNodes.id, name: toolNodes.name })
.from(toolNodes)
.where(eq(toolNodes.id, cand.nodeId))
.limit(1);
const nameById = new Map(node.map((n) => [n.id, n.name]));
return rowToCandidateView(out[0], nameById);
}
export async function deferDeprecationCandidate(
candidateId: string,
reviewer: string,
days?: number,
): Promise<DeprecationCandidateView | null> {
const cfg = deprecationConfig();
const window = days ?? cfg.deferDays;
const now = new Date();
const until = new Date(now.getTime() + window * 86_400_000);
const updated = await db
.update(toolDeprecationCandidates)
.set({
status: "deferred",
decidedBy: reviewer,
decidedAt: now,
deferUntil: until,
reArmUntil: null,
updatedAt: now,
})
.where(eq(toolDeprecationCandidates.id, candidateId))
.returning();
if (updated.length === 0) return null;
const cand = updated[0]!;
await db.insert(toolNodeEvidence).values({
id: newId("tev"),
nodeId: cand.nodeId,
kind: "deprecation_decision",
payload: {
decision: "defer",
candidate_id: candidateId,
classification: cand.classification,
reviewer,
defer_until: until.toISOString(),
},
success: 0,
failure: 0,
});
const node = await db
.select({ id: toolNodes.id, name: toolNodes.name })
.from(toolNodes)
.where(eq(toolNodes.id, cand.nodeId))
.limit(1);
const nameById = new Map(node.map((n) => [n.id, n.name]));
return rowToCandidateView(cand, nameById);
}
export async function rejectDeprecationCandidate(
candidateId: string,
reviewer: string,
days?: number,
): Promise<DeprecationCandidateView | null> {
const cfg = deprecationConfig();
const window = days ?? cfg.reArmDays;
const now = new Date();
const until = new Date(now.getTime() + window * 86_400_000);
const updated = await db
.update(toolDeprecationCandidates)
.set({
status: "rejected",
decidedBy: reviewer,
decidedAt: now,
deferUntil: null,
reArmUntil: until,
updatedAt: now,
})
.where(eq(toolDeprecationCandidates.id, candidateId))
.returning();
if (updated.length === 0) return null;
const cand = updated[0]!;
await db.insert(toolNodeEvidence).values({
id: newId("tev"),
nodeId: cand.nodeId,
kind: "deprecation_decision",
payload: {
decision: "reject",
candidate_id: candidateId,
classification: cand.classification,
reviewer,
re_arm_until: until.toISOString(),
},
success: 0,
failure: 0,
});
const node = await db
.select({ id: toolNodes.id, name: toolNodes.name })
.from(toolNodes)
.where(eq(toolNodes.id, cand.nodeId))
.limit(1);
const nameById = new Map(node.map((n) => [n.id, n.name]));
return rowToCandidateView(cand, nameById);
}
/**
* Resolve a node by name and, if it has been deprecated, return a
* machine-readable error envelope including any `replacedBy` set on the
* spec_json (Task #158 splice). Returns `{ deprecated: false, node }`
* for active/provisional/alias nodes. Returns `null` when the node
* doesn't exist.
*/
export async function resolveNodeForDirectReference(
name: string,
): Promise<
| { deprecated: false; node: ResolvedNode }
| { deprecated: true; nodeName: string; replacedBy: string | null }
| null
> {
const n = await getNodeByName(name);
if (!n) return null;
if (n.status === "deprecated") {
const replacedByRaw = (n.spec as { replacedBy?: unknown })?.replacedBy;
return {
deprecated: true,
nodeName: n.name,
replacedBy: typeof replacedByRaw === "string" ? replacedByRaw : null,
};
}
return { deprecated: false, node: n };
}
/**
* Run the weekly archive job: any node that has been `deprecated` for
* ≥ archiveAfterDays (measured from `tool_nodes.updated_at`, which is
* stamped when `deprecateNode` flips the status) is moved into the
* archive tables in a single transaction per node. The hot rows
* (node, all attached edges, all evidence rows, the candidate row) are
* deleted only after the archive INSERTs succeed.
*
* Returns per-node counts. Safe to re-run; never operates on
* non-deprecated nodes.
*/
export async function runArchiveJob(args: {
now?: Date;
archiveAfterDays?: number;
} = {}): Promise<{
nodesArchived: number;
edgesArchived: number;
evidenceArchived: number;
candidatesArchived: number;
}> {
const cfg = deprecationConfig();
const now = args.now ?? new Date();
const after = args.archiveAfterDays ?? cfg.archiveAfterDays;
const cutoff = new Date(now.getTime() - after * 86_400_000);
const targets = await db
.select()
.from(toolNodes)
.where(
and(eq(toolNodes.status, "deprecated"), lte(toolNodes.updatedAt, cutoff)),
);
let nodesArchived = 0;
let edgesArchived = 0;
let evidenceArchived = 0;
let candidatesArchived = 0;
for (const node of targets) {
await db.transaction(async (tx) => {
const edges = await tx
.select()
.from(toolEdges)
.where(
sql`${toolEdges.fromNode} = ${node.id} OR ${toolEdges.toNode} = ${node.id}`,
);
const evidence = await tx
.select()
.from(toolNodeEvidence)
.where(eq(toolNodeEvidence.nodeId, node.id));
const candidates = await tx
.select()
.from(toolDeprecationCandidates)
.where(eq(toolDeprecationCandidates.nodeId, node.id));
// Mirror inserts first; if any of these throws we abort and the
// hot rows are untouched.
await tx.insert(toolNodesArchive).values({
id: node.id,
name: node.name,
description: node.description,
capabilityTags: node.capabilityTags,
inputKind: node.inputKind,
outputKind: node.outputKind,
status: node.status,
ownerProcess: node.ownerProcess,
specJson: node.specJson,
createdBy: node.createdBy,
handlerRef: node.handlerRef ?? null,
handlerStub: node.handlerStub ?? null,
costHint: node.costHint ?? null,
latencyHintMs: node.latencyHintMs ?? null,
version: node.version,
createdAt: node.createdAt,
updatedAt: node.updatedAt,
archivedAt: now,
});
if (edges.length > 0) {
await tx.insert(toolEdgesArchive).values(
edges.map((e) => ({
id: e.id,
fromNode: e.fromNode,
toNode: e.toNode,
relation: e.relation,
weight: e.weight,
contract: e.contract,
createdAt: e.createdAt,
archivedAt: now,
})),
);
}
if (evidence.length > 0) {
await tx.insert(toolNodeEvidenceArchive).values(
evidence.map((ev) => ({
id: ev.id,
nodeId: ev.nodeId,
kind: ev.kind,
payload: ev.payload,
success: ev.success,
failure: ev.failure,
shadowUserId: ev.shadowUserId ?? null,
createdAt: ev.createdAt,
seamFoldedAt: ev.seamFoldedAt ?? null,
archivedAt: now,
})),
);
}
// Now delete hot rows. Edges/evidence/candidates cascade off
// tool_nodes via FK, but we delete explicitly to keep the order
// deterministic and to capture counts.
if (candidates.length > 0) {
await tx
.delete(toolDeprecationCandidates)
.where(eq(toolDeprecationCandidates.nodeId, node.id));
}
if (edges.length > 0) {
await tx
.delete(toolEdges)
.where(
sql`${toolEdges.fromNode} = ${node.id} OR ${toolEdges.toNode} = ${node.id}`,
);
}
if (evidence.length > 0) {
await tx
.delete(toolNodeEvidence)
.where(eq(toolNodeEvidence.nodeId, node.id));
}
await tx.delete(toolNodes).where(eq(toolNodes.id, node.id));
nodesArchived += 1;
edgesArchived += edges.length;
evidenceArchived += evidence.length;
candidatesArchived += candidates.length;
});
}
return { nodesArchived, edgesArchived, evidenceArchived, candidatesArchived };
}
// ---------------------------------------------------------------- archive read API (#159)
/**
* Read-only listing of archived nodes (and a small slice of their
* incident archived edges + most-recent archived evidence) for the
* admin "View archive" surface. `search` is a case-insensitive name
* substring filter. Results are ordered newest-archived first.
*/
export async function listArchive(opts: {
limit?: number;
search?: string;
}): Promise<{
nodes: Array<{
id: string;
name: string;
description: string | null;
capabilityTags: string[];
ownerProcess: string;
archivedAt: string;
edges: Array<{
id: string;
fromNode: string;
toNode: string;
relation: string;
}>;
recentEvidence: Array<{
id: string;
kind: string;
success: number;
failure: number;
createdAt: string;
}>;
}>;
}> {
const limit = Math.min(Math.max(opts.limit ?? 100, 1), 500);
const search = opts.search?.toLowerCase();
const all = await db
.select()
.from(toolNodesArchive)
.orderBy(desc(toolNodesArchive.archivedAt))
.limit(limit * 2);
const filtered = search
? all.filter((n) => n.name.toLowerCase().includes(search))
: all;
const slice = filtered.slice(0, limit);
if (slice.length === 0) return { nodes: [] };
const nodeIds = slice.map((n) => n.id);
const archivedEdges = await db
.select()
.from(toolEdgesArchive)
.where(
sql`${toolEdgesArchive.fromNode} IN (${sql.join(
nodeIds.map((i) => sql`${i}`),
sql`, `,
)}) OR ${toolEdgesArchive.toNode} IN (${sql.join(
nodeIds.map((i) => sql`${i}`),
sql`, `,
)})`,
);
const archivedEv = await db
.select()
.from(toolNodeEvidenceArchive)
.where(inArray(toolNodeEvidenceArchive.nodeId, nodeIds))
.orderBy(desc(toolNodeEvidenceArchive.createdAt))
.limit(5 * slice.length);
const evByNode = new Map<string, typeof archivedEv>();
for (const ev of archivedEv) {
const arr = evByNode.get(ev.nodeId) ?? [];
if (arr.length < 5) arr.push(ev);
evByNode.set(ev.nodeId, arr);
}
return {
nodes: slice.map((n) => ({
id: n.id,
name: n.name,
description: n.description ?? null,
capabilityTags: (n.capabilityTags as string[]) ?? [],
ownerProcess: n.ownerProcess,
archivedAt: n.archivedAt.toISOString(),
edges: archivedEdges
.filter((e) => e.fromNode === n.id || e.toNode === n.id)
.map((e) => ({
id: e.id,
fromNode: e.fromNode,
toNode: e.toNode,
relation: e.relation,
})),
recentEvidence: (evByNode.get(n.id) ?? []).map((ev) => ({
id: ev.id,
kind: ev.kind,
success: ev.success,
failure: ev.failure,
createdAt: ev.createdAt.toISOString(),
})),
})),
};
}
// ---------------------------------------------------------------- scheduler (#159)
/**
* Lightweight in-process scheduler. When DEPRECATION_DETECTOR_ENABLED=1
* this fires the detector daily and the archive job weekly. Both jobs
* are idempotent so running them while a manual run is in flight is
* safe (the second run is a no-op if the first already advanced state).
*
* Returns a stop() handle; tests and graceful shutdown should call it.
*/
export function startDeprecationScheduler(opts?: {
detectorIntervalMs?: number;
archiveIntervalMs?: number;
}): { stop: () => void } {
if (!deprecationDetectorEnabled()) {
logger.info(
"tool-graph deprecation scheduler: disabled (DEPRECATION_DETECTOR_ENABLED unset)",
);
return { stop: () => {} };
}
const detectorMs = opts?.detectorIntervalMs ?? 24 * 60 * 60 * 1000;
const archiveMs = opts?.archiveIntervalMs ?? 7 * 24 * 60 * 60 * 1000;
let stopped = false;
const detectorTick = setInterval(() => {
if (stopped) return;
runDeprecationDetector()
.then((s) =>
logger.info(
{ ...s, kind: "deprecation_detector_tick" },
"tool-graph deprecation scheduler: detector ran",
),
)
.catch((err) =>
logger.error(
{ err },
"tool-graph deprecation scheduler: detector failed",
),
);
}, detectorMs);
const archiveTick = setInterval(() => {
if (stopped) return;
runArchiveJob()
.then((s) =>
logger.info(
{ ...s, kind: "deprecation_archive_tick" },
"tool-graph deprecation scheduler: archive ran",
),
)
.catch((err) =>
logger.error(
{ err },
"tool-graph deprecation scheduler: archive failed",
),
);
}, archiveMs);
// Don't keep the event loop alive just for these timers; the HTTP
// server is the lifeline.
detectorTick.unref();
archiveTick.unref();
logger.info(
{ detectorMs, archiveMs },
"tool-graph deprecation scheduler: started",
);
return {
stop: () => {
stopped = true;
clearInterval(detectorTick);
clearInterval(archiveTick);
},
};
}
// ---------------------------------------------------------------- summary paths (#160)
/**
* Resolved summary path view: a `composition_alias` chain rendered as a
* first-class planner abstraction. The runtime description shown to the
* LLM is `descriptionOverride ?? descriptionDerived`.
*/
export interface ResolvedSummaryPath {
id: string;
name: string;
aliasNodeId: string | null;
description: string;
descriptionDerived: string;
descriptionOverride: string | null;
expansionNodeNames: string[];
headAtomName: string | null;
tailAtomName: string | null;
capabilityTags: string[];
status: "active" | "pending_rebuild";
estCostHint: number | null;
estLatencyMsHint: number | null;
traversalCount: number;
version: number;
updatedAt: string;
}
function row2summaryPath(r: ToolSummaryPathRow): ResolvedSummaryPath {
const exp = Array.isArray(r.expansionNodeNames)
? (r.expansionNodeNames as string[])
: [];
const tags = Array.isArray(r.capabilityTags)
? (r.capabilityTags as string[])
: [];
return {
id: r.id,
name: r.name,
aliasNodeId: r.aliasNodeId ?? null,
descriptionDerived: r.descriptionDerived ?? "",
descriptionOverride: r.descriptionOverride ?? null,
description: (r.descriptionOverride ?? r.descriptionDerived ?? "").trim(),
expansionNodeNames: exp,
headAtomName: r.headAtomName ?? null,
tailAtomName: r.tailAtomName ?? null,
capabilityTags: tags,
status: (r.status as "active" | "pending_rebuild") ?? "active",
estCostHint: r.estCostHint ?? null,
estLatencyMsHint: r.estLatencyMsHint ?? null,
traversalCount: r.traversalCount ?? 0,
version: r.version ?? 1,
updatedAt: r.updatedAt.toISOString(),
};
}
export async function listSummaryPaths(opts: {
status?: "active" | "pending_rebuild" | "any";
} = {}): Promise<ResolvedSummaryPath[]> {
const status = opts.status ?? "any";
const rows = status === "any"
? await db.select().from(toolSummaryPaths)
: await db
.select()
.from(toolSummaryPaths)
.where(eq(toolSummaryPaths.status, status));
return rows.map(row2summaryPath);
}
/**
* Idempotent upsert. Two correctness properties:
* 1. **Concurrency-safe** — uses Postgres `ON CONFLICT (name)` so two
* parallel seed/rebuild calls cannot race past the unique-name
* check and double-insert.
* 2. **No version churn** — `version` only bumps when the *material*
* fields (description_derived, expansion, head/tail, tags) actually
* change. Re-running `seedToolGraph` on an unchanged graph is a
* no-op for `version` and `updated_at` (other than the
* status-back-to-active flip on rows that were pending_rebuild).
*
* `description_override` is admin-edited; the seed never touches it
* unless the caller explicitly passes a string or `null`.
*/
export async function upsertSummaryPath(
row: InsertToolSummaryPathRow,
): Promise<ResolvedSummaryPath> {
const id = row.id && row.id.length > 0 ? row.id : newId("tsum");
const existing = await db
.select()
.from(toolSummaryPaths)
.where(eq(toolSummaryPaths.name, row.name))
.limit(1);
if (existing[0]) {
const cur = existing[0];
const newDesc = row.descriptionDerived ?? cur.descriptionDerived;
const newExp = (row.expansionNodeNames ??
cur.expansionNodeNames) as unknown[];
const newTags = (row.capabilityTags ?? cur.capabilityTags) as unknown[];
const material =
newDesc !== cur.descriptionDerived ||
JSON.stringify(newExp) !== JSON.stringify(cur.expansionNodeNames) ||
(row.headAtomName ?? cur.headAtomName) !== cur.headAtomName ||
(row.tailAtomName ?? cur.tailAtomName) !== cur.tailAtomName ||
JSON.stringify(newTags) !== JSON.stringify(cur.capabilityTags) ||
(row.aliasNodeId ?? cur.aliasNodeId) !== cur.aliasNodeId;
const desiredStatus = row.status ?? "active";
const statusFlip = desiredStatus !== cur.status;
if (!material && !statusFlip && row.descriptionOverride === undefined) {
// True no-op rebuild — preserve version + updated_at so admins can
// see that nothing changed.
return row2summaryPath(cur);
}
await db
.update(toolSummaryPaths)
.set({
aliasNodeId: row.aliasNodeId ?? cur.aliasNodeId,
descriptionDerived: newDesc,
descriptionOverride:
row.descriptionOverride !== undefined
? row.descriptionOverride
: cur.descriptionOverride,
expansionNodeNames: newExp as never,
headAtomName: row.headAtomName ?? cur.headAtomName,
tailAtomName: row.tailAtomName ?? cur.tailAtomName,
capabilityTags: newTags as never,
estCostHint: row.estCostHint ?? cur.estCostHint,
estLatencyMsHint: row.estLatencyMsHint ?? cur.estLatencyMsHint,
status: desiredStatus,
version: material ? cur.version + 1 : cur.version,
updatedAt: new Date(),
})
.where(eq(toolSummaryPaths.id, cur.id));
const fresh = await db
.select()
.from(toolSummaryPaths)
.where(eq(toolSummaryPaths.id, cur.id))
.limit(1);
return row2summaryPath(fresh[0]!);
}
// Insert path. ON CONFLICT (name) guards against the
// select-then-insert race when two seeders run concurrently.
await db
.insert(toolSummaryPaths)
.values({ ...row, id })
.onConflictDoUpdate({
target: toolSummaryPaths.name,
set: {
aliasNodeId: row.aliasNodeId,
descriptionDerived: row.descriptionDerived,
expansionNodeNames: row.expansionNodeNames as never,
headAtomName: row.headAtomName,
tailAtomName: row.tailAtomName,
capabilityTags: row.capabilityTags as never,
estCostHint: row.estCostHint,
estLatencyMsHint: row.estLatencyMsHint,
status: row.status ?? "active",
updatedAt: new Date(),
},
});
const fresh = await db
.select()
.from(toolSummaryPaths)
.where(eq(toolSummaryPaths.name, row.name))
.limit(1);
return row2summaryPath(fresh[0]!);
}
/**
* Prune summary path rows whose name is not in the canonical set
* passed in. Called at the end of a rebuild so removing an alias from
* the seed (or renaming it) cleans up its dangling summary row.
* Returns the number of rows deleted.
*/
export async function pruneSummaryPathsNotIn(
keepNames: string[],
): Promise<number> {
if (keepNames.length === 0) {
// Defensive: if the canonical set is empty we still don't truncate
// — that's almost certainly a bug in the caller, not an intent.
return 0;
}
const all = await db
.select({ id: toolSummaryPaths.id, name: toolSummaryPaths.name })
.from(toolSummaryPaths);
const keep = new Set(keepNames);
const stale = all.filter((r) => !keep.has(r.name)).map((r) => r.id);
if (stale.length === 0) return 0;
await db
.delete(toolSummaryPaths)
.where(inArray(toolSummaryPaths.id, stale));
return stale.length;
}
export async function setSummaryPathOverride(
id: string,
override: string | null,
): Promise<ResolvedSummaryPath | null> {
const trimmed =
typeof override === "string" ? override.trim().slice(0, 4000) : null;
const updated = await db
.update(toolSummaryPaths)
.set({
descriptionOverride: trimmed && trimmed.length > 0 ? trimmed : null,
updatedAt: new Date(),
})
.where(eq(toolSummaryPaths.id, id))
.returning();
return updated[0] ? row2summaryPath(updated[0]) : null;
}
/**
* Mark every active summary path as `pending_rebuild`. Called after the
* #158 splice path (autoExtendIfNeeded) lands a new node/edge so the
* admin UI can flag that the cached chains may be stale until the next
* `seedToolGraph` run rebuilds them.
*/
export async function markSummaryPathsPendingRebuild(): Promise<number> {
const updated = await db
.update(toolSummaryPaths)
.set({ status: "pending_rebuild", updatedAt: new Date() })
.where(eq(toolSummaryPaths.status, "active"))
.returning({ id: toolSummaryPaths.id });
return updated.length;
}
export async function recordSummaryPathTraversal(
name: string,
): Promise<void> {
try {
await db
.update(toolSummaryPaths)
.set({
traversalCount: sql`${toolSummaryPaths.traversalCount} + 1`,
updatedAt: new Date(),
})
.where(eq(toolSummaryPaths.name, name));
} catch (err) {
logger.debug({ err, name }, "recordSummaryPathTraversal failed (non-blocking)");
}
}
/**
* Resolve summary paths for one user turn under the hierarchy planner
* flag. Picks every `active` path whose `capabilityTags` overlap with
* `matchedTags`, capped at `PLANNER_MAX_SUMMARIES`. Empty array when
* no overlap (caller should fall back to flat atomic listing).
*/
export async function resolveSummaryPathsForTags(
matchedTags: string[],
cap = PLANNER_MAX_SUMMARIES,
): Promise<ResolvedSummaryPath[]> {
if (!matchedTags.length) return [];
const all = await listSummaryPaths({ status: "any" });
const tagSet = new Set(matchedTags);
const filtered = all
.filter((s) => s.capabilityTags.some((t) => tagSet.has(t)))
// Active paths first; pending_rebuild last so admins see the chain
// even when stale, but a healthier alternative wins ranking ties.
.sort((a, b) => {
if (a.status !== b.status) return a.status === "active" ? -1 : 1;
return b.traversalCount - a.traversalCount;
});
return filtered.slice(0, Math.max(0, cap));
}
export function describeSummaryPaths(paths: ResolvedSummaryPath[]): string {
if (!paths.length) return "";
// Compact format — one line per path: `name: a → b → c`. The full
// description is intentionally omitted; the planner already has the
// atomic tool descriptions in the flat catalog below, so duplicating
// them here would blow the 15% net-overhead budget. Stale chains are
// marked with a trailing `*` so the planner can still reference them
// while the admin rebuild is pending.
const lines: string[] = ["preferred chains:"];
for (const p of paths) {
const stale = p.status === "pending_rebuild" ? "*" : "";
lines.push(`- ${p.name}${stale}: ${p.expansionNodeNames.join(" -> ")}`);
}
return lines.join("\n");
}
// ---------------------------------------------------------------- feature flag
export function toolGraphEnabled(): boolean {
const v = (process.env["TOOL_GRAPH_ENABLED"] ?? "").trim().toLowerCase();
return v === "1" || v === "true" || v === "yes" || v === "on";
}
/**
* Hierarchical planner kill-switch (#160). Default OFF — when unset the
* planner sees the flat atomic catalog exactly as before, guaranteeing
* the both-mode-equivalence invariant. When set to "1"/"true" the
* planner prompt is augmented with the resolved summary paths from
* `tool_summary_paths`; the underlying tool list and dispatch path are
* unchanged so flipping the flag mid-flight cannot diverge outputs.
*/
export function plannerHierarchyEnabled(): boolean {
const v = (process.env["PLANNER_HIERARCHY_ENABLED"] ?? "")
.trim()
.toLowerCase();
return v === "1" || v === "true" || v === "yes" || v === "on";
}
/** Cap on summary-path entries injected into a single planner prompt. */
export const PLANNER_MAX_SUMMARIES = (() => {
const raw = Number(process.env["PLANNER_MAX_SUMMARIES"] ?? "10");
return Number.isFinite(raw) && raw > 0 ? Math.floor(raw) : 10;
})();