/** * tool-network — registry + executor for first-class tool networks * (Task #176, Wave A). * * A tool network is a curated, end-to-end runnable subgraph that solves * a problem class. The planner LLM sees a single synthetic * `run_` service card whose JSON-Schema parameters are the * network's input contract — internal atomic tool calls happen inside * this module's executor and are invisible to the planner. * * This file owns: * - CRUD against tool_networks / network_versions / network_promotions * / problem_classes. * - Active-variant resolution (single primary + zero-or-more shadows). * - Internal subgraph registration (executor lookup table). * - `runNetwork()` — entry point used by the synthetic tool dispatcher. * * The executor is intentionally a small dispatch layer; per-network * runners (e.g. drugclip/runner) register themselves at boot via * `registerNetworkRunner()` and the registry hands them validated input * + the chosen variant config. */ import { and, desc, eq, isNull } from "drizzle-orm"; import { createHash } from "node:crypto"; import { db, toolNetworks, networkVersions, networkPromotions, problemClasses, networkDispatchViolations, type ToolNetworkRow, type NetworkVersionRow, type ProblemClassRow, } from "@workspace/db"; import { newId } from "./ids"; import { logger } from "./logger"; import type { LlmTool } from "../llm/types"; import { validate as ajvValidate } from "@workspace/networks"; import { executeByGraph, toNetworkRunResult, type StepHandler, } from "./tool-network/graph-executor.js"; // --------------------------------------------------------------------------- // Types // --------------------------------------------------------------------------- export interface NetworkInternalGraph { /** Atomic tool node names this network executes, in declared order. */ nodes: string[]; /** Optional edges between nodes (informational, executor uses `nodes`). */ edges?: Array<{ from: string; to: string; relation?: string }>; } export interface NetworkRuntimeContext { conversationId?: string | null; messageId?: string | null; ownerUserId?: string | null; /** Optional planId, populated when invoked through the blueprint runner. */ planId?: string | null; /** Free-form per-call metadata propagated to evidence rows. */ meta?: Record; /** * Optional cooperative-cancellation signal. The shadow A/B sampler * (`evolution/shadow.executeShadowSample`) plumbs an `AbortSignal` * here that fires when the 1.5× wall-clock budget cap is breached. * Runners and atomic tools SHOULD check `signal?.aborted` (or pass * `signal` into `fetch`/timeouts) at I/O boundaries so an over-budget * shadow run actually stops billing instead of merely being ignored * by the caller. Runners that do not honour the signal still get * timeout-bookkeeping protection — the shadow row is marked * `budget_skipped` regardless — but the underlying work continues. */ signal?: AbortSignal; } export interface NetworkRunResult { output: Record; /** Per-step traces for log + reviewer. */ steps: Array<{ name: string; durationMs: number; status: "ok" | "error"; summary?: string; }>; /** Optional path to a primary artifact this run produced. */ artifactPath?: string | null; /** Total executor latency in ms. */ durationMs: number; /** * Optional runner-side metrics. The Reviewer pipeline reads two * conventional fields here: * * - `runnerScore` — a 0..1 self-evaluation proxy (e.g. retrieval * coherence, evidence consistency, recall@K). Networks define * their own normalisation; the framework just feeds it into the * `runner_signal` Reviewer channel via `gradeChannels`. * - `channelBreakdown` — pre-graded channel scores the runner * wants to override the framework defaults with. Keys SHOULD * match `REVIEWER_CHANNEL_KEYS` from `lib/reviewer/channels.ts`; * unknown keys are still merged but the fitness gate ignores * them unless the matching `problem_classes.reviewer_weights` * entry exists. * * Other arbitrary metrics (e.g. raw `aggregateEf1` for audit) MAY * be carried alongside but are not consumed by the gate. */ metrics?: { runnerScore?: number; channelBreakdown?: Record; [k: string]: unknown; }; } export type NetworkRunner = ( input: Record, variant: NetworkVersionRow, ctx: NetworkRuntimeContext, ) => Promise; const runners = new Map(); /** * Register an executor for a given network name. Called at boot from * each network's seed module (e.g. drugclip/seed). */ export function registerNetworkRunner(name: string, runner: NetworkRunner): void { if (runners.has(name)) { logger.warn({ name }, "tool-network: runner re-registered (overwriting)"); } runners.set(name, runner); } export function listRegisteredRunners(): string[] { return [...runners.keys()].sort(); } // --------------------------------------------------------------------------- // Task #242 (B1) — graph-executor step handler registry // --------------------------------------------------------------------------- /** * 漏洞 1 修复:graph-executor 路径。当 `variant.config.executor === 'graph'` * 时 runNetwork 走 `executeByGraph(internalGraph, ...)`,按 node 名查这张 * map 找 step handler。这是与 `runners`(per-network runner)正交的注册: * 一个 step handler 可以被多个 capability/network 复用(例如 "fetch_pocket" * 同时被 drugclip 和未来 dockingv2 用)。 * * B8 之后 drugclip 才迁移到 graph executor;本任务只把通路打通。 */ const graphStepHandlers = new Map(); export function registerGraphStepHandler(name: string, handler: StepHandler): void { if (graphStepHandlers.has(name)) { logger.warn( { name }, "tool-network: graph step handler re-registered (overwriting)", ); } graphStepHandlers.set(name, handler); } export function listRegisteredGraphStepHandlers(): string[] { return [...graphStepHandlers.keys()].sort(); } export function clearGraphStepHandlersForTest(): void { graphStepHandlers.clear(); } function isGraphExecutorEnabled(): boolean { const v = process.env["DOATLAS_GRAPH_EXECUTOR_ENABLED"]; if (v === undefined) return true; return v !== "0" && v.toLowerCase() !== "false"; } function isInputContractEnforced(): boolean { const v = process.env["DOATLAS_INPUT_CONTRACT_ENFORCE"]; if (v === undefined) { // dev=true / prod=false (per task #242 plan, soft-launch) return process.env["NODE_ENV"] !== "production"; } return v !== "0" && v.toLowerCase() !== "false"; } // --------------------------------------------------------------------------- // problem_classes // --------------------------------------------------------------------------- export interface UpsertProblemClassInput { path: string; parentPath?: string | null; label: string; description?: string; capabilityTags?: string[]; /** * Task #227 — 接受两种 jsonb 形态: * - 旧扁平:`{contract_validity: 0.25, ...}`(直接 channel→weight) * - 新双套:`{weightsLabelFree: {...}, weightsWithTruth: {...}}` * 由 fitness.weightedScore 按 metric 行级 ledger 状态切换。 * 用 unknown 而非 number 是因为新形态值是嵌套对象;resolveChannelWeights * 在读出时再做 narrow / fallback。 */ reviewerWeights?: Record | null; } export async function upsertProblemClass( input: UpsertProblemClassInput, ): Promise { const existing = await db .select() .from(problemClasses) .where(eq(problemClasses.path, input.path)) .limit(1); if (existing[0]) { const [updated] = await db .update(problemClasses) .set({ parentPath: input.parentPath ?? existing[0].parentPath, label: input.label, description: input.description ?? existing[0].description, capabilityTags: input.capabilityTags ?? (existing[0].capabilityTags as string[]), reviewerWeights: input.reviewerWeights ?? existing[0].reviewerWeights, updatedAt: new Date(), }) .where(eq(problemClasses.id, existing[0].id)) .returning(); return updated!; } const [inserted] = await db .insert(problemClasses) .values({ id: newId("pcls"), path: input.path, parentPath: input.parentPath ?? null, label: input.label, description: input.description ?? "", capabilityTags: input.capabilityTags ?? [], reviewerWeights: input.reviewerWeights ?? null, }) .returning(); return inserted!; } export async function listProblemClasses(): Promise { return db.select().from(problemClasses); } export async function getProblemClassByPath( path: string, ): Promise { const rows = await db .select() .from(problemClasses) .where(eq(problemClasses.path, path)) .limit(1); return rows[0] ?? null; } // --------------------------------------------------------------------------- // tool_networks + network_versions // --------------------------------------------------------------------------- export interface UpsertNetworkInput { name: string; problemClassPath: string; description?: string; inputContract: Record; outputContract: Record; internalGraph: NetworkInternalGraph; capabilityTags?: string[]; costHint?: number | null; latencyHintMs?: number | null; builderModelTier?: "weak" | "strong"; legacyAliasNodeId?: string | null; /** * Initial variant config — when supplied (and no active variant * exists) this is persisted as v1 and promoted to active. */ initialVariant?: { versionLabel: string; config: Record; }; } /** * Idempotent upsert of a network row + (optional) initial variant. * Returns the network row with `activeVariantId` populated. */ export async function upsertNetwork( input: UpsertNetworkInput, ): Promise { const existing = await db .select() .from(toolNetworks) .where(eq(toolNetworks.name, input.name)) .limit(1); let row: ToolNetworkRow; if (existing[0]) { const [updated] = await db .update(toolNetworks) .set({ problemClassPath: input.problemClassPath, description: input.description ?? existing[0].description, inputContract: input.inputContract, outputContract: input.outputContract, internalGraph: input.internalGraph, capabilityTags: input.capabilityTags ?? (existing[0].capabilityTags as string[]), costHint: input.costHint ?? existing[0].costHint, latencyHintMs: input.latencyHintMs ?? existing[0].latencyHintMs, builderModelTier: input.builderModelTier ?? existing[0].builderModelTier, legacyAliasNodeId: input.legacyAliasNodeId ?? existing[0].legacyAliasNodeId, updatedAt: new Date(), }) .where(eq(toolNetworks.id, existing[0].id)) .returning(); row = updated!; } else { const [inserted] = await db .insert(toolNetworks) .values({ id: newId("tnet"), name: input.name, problemClassPath: input.problemClassPath, description: input.description ?? "", inputContract: input.inputContract, outputContract: input.outputContract, internalGraph: input.internalGraph, capabilityTags: input.capabilityTags ?? [], costHint: input.costHint ?? null, latencyHintMs: input.latencyHintMs ?? null, builderModelTier: input.builderModelTier ?? "strong", legacyAliasNodeId: input.legacyAliasNodeId ?? null, status: "active", }) .returning(); row = inserted!; } if (!row.activeVariantId && input.initialVariant) { const variant = await createVariant({ networkId: row.id, versionLabel: input.initialVariant.versionLabel, config: input.initialVariant.config, internalGraph: input.internalGraph, builderModelTier: input.builderModelTier ?? "strong", promote: true, reason: "initial_seed", }); row = { ...row, activeVariantId: variant.id }; } return row; } export async function getNetworkByName( name: string, ): Promise { const rows = await db .select() .from(toolNetworks) .where(eq(toolNetworks.name, name)) .limit(1); return rows[0] ?? null; } export async function listActiveNetworks(): Promise { return db .select() .from(toolNetworks) .where(eq(toolNetworks.status, "active")); } export async function listNetworksForClass( problemClassPath: string, ): Promise { return db .select() .from(toolNetworks) .where( and( eq(toolNetworks.problemClassPath, problemClassPath), eq(toolNetworks.status, "active"), ), ); } /** * Resolve the active variant for a network, with optional user-private * namespace override. * * Task #242 (B1) — 漏洞 4 修复:`network_versions.private_namespace` 字段 * 已经存在但 `getActiveVariant` 完全忽略它,意味着用户 fork 出来的私有 * variant 永远派发不到。现在的策略: * * 1. 当 userId 给出且非空 → 先查 `private_namespace = userId AND * status='active' AND networkId = network.id`,命中即返回(私有覆盖)。 * 2. 否则(或私有未命中)→ 回落到公共 `network.activeVariantId`。 * * 注:私有 variant 不更新 `tool_networks.active_variant_id`(那是全局 * 共享槽);只在自己 namespace 内 active。 */ export async function getActiveVariant( network: ToolNetworkRow, userId?: string | null, ): Promise { if (userId) { const privateRows = await db .select() .from(networkVersions) .where( and( eq(networkVersions.networkId, network.id), eq(networkVersions.privateNamespace, userId), eq(networkVersions.status, "active"), ), ) .limit(1); if (privateRows[0]) return privateRows[0]; } if (!network.activeVariantId) return null; const rows = await db .select() .from(networkVersions) .where(eq(networkVersions.id, network.activeVariantId)) .limit(1); return rows[0] ?? null; } /** * Task #242 (B1) — 漏洞 4 配套 API:首次 fork 时 clone 公共 active * variant 到用户私有 namespace。B9 用。 * * - 已有 `(networkId, privateNamespace=userId, status='active')` 的私有行 → 直接返回。 * - 否则:读公共 active 的 config + internalGraph,clone 一行 status='active' * privateNamespace=userId,**不动** `tool_networks.active_variant_id`。 * * Throws 当公共 active variant 不存在(无法 clone 源)。 */ export async function getOrCreateUserPrivateVariant( networkId: string, userId: string, reason: string, ): Promise { if (!userId) { throw new Error("getOrCreateUserPrivateVariant: userId required"); } // Architect review: real concurrency-safe idempotency. Two requests // racing for the same (networkId, userId) must converge on a single // active private variant. We rely on the partial unique index // `network_versions_uniq_active_private` (created in raw SQL, see // schema/toolNetwork.ts) so the DB rejects the second insert; the // loser retries and re-selects the winner's row. // // 注意:Postgres 在 statement 失败后会把整个 tx 标 aborted,无法 // 在同一 tx 内继续 select。所以 race 检测必须在 tx 外层 retry。 // 最多 2 次:第 1 次 select-then-insert,第 2 次必命中 winner。 for (let attempt = 0; attempt < 2; attempt += 1) { const existing = await db .select() .from(networkVersions) .where( and( eq(networkVersions.networkId, networkId), eq(networkVersions.privateNamespace, userId), eq(networkVersions.status, "active"), ), ) .limit(1); if (existing[0]) return existing[0]; const [network] = await db .select() .from(toolNetworks) .where(eq(toolNetworks.id, networkId)) .limit(1); if (!network) { throw new Error( `getOrCreateUserPrivateVariant: network ${networkId} not found`, ); } if (!network.activeVariantId) { throw new Error( `getOrCreateUserPrivateVariant: network ${networkId} has no public active variant to clone from`, ); } const [publicActive] = await db .select() .from(networkVersions) .where(eq(networkVersions.id, network.activeVariantId)) .limit(1); if (!publicActive) { throw new Error( `getOrCreateUserPrivateVariant: public active variant ${network.activeVariantId} not found`, ); } const versionLabel = `${publicActive.versionLabel}-priv-${userId.slice(0, 8)}-${Date.now()}-${attempt}`; try { const [inserted] = await db .insert(networkVersions) .values({ id: newId("nver"), networkId, versionLabel, internalGraph: publicActive.internalGraph, config: publicActive.config, status: "active", builtBy: `user:${userId}`, builderModelTier: publicActive.builderModelTier, privateNamespace: userId, }) .returning(); logger.info( { networkId, userId, fromVariantId: publicActive.id, newVariantId: inserted!.id, reason }, "tool-network: created user-private variant", ); return inserted!; } catch (err) { // Unique violation on the partial index (Postgres SQLSTATE 23505) // means a concurrent caller won the race. Loop back to the // initial select; that path will return the winner's row. const code = (err as { code?: string }).code; if (code !== "23505" || attempt >= 1) throw err; logger.info( { networkId, userId, reason }, "tool-network: race detected on user-private variant, retrying select", ); } } // Defensive — shouldn't reach here because the second attempt's select // must succeed if a 23505 fired on the first. throw new Error( "getOrCreateUserPrivateVariant: race resolution failed after retry", ); } export interface CreateVariantInput { networkId: string; versionLabel: string; config: Record; internalGraph: NetworkInternalGraph; builderModelTier: "weak" | "strong"; promote: boolean; reason: string; } export async function createVariant( input: CreateVariantInput, ): Promise { // Idempotent on (network_id, version_label) const existing = await db .select() .from(networkVersions) .where( and( eq(networkVersions.networkId, input.networkId), eq(networkVersions.versionLabel, input.versionLabel), ), ) .limit(1); let variant: NetworkVersionRow; if (existing[0]) { variant = existing[0]; } else { const [inserted] = await db .insert(networkVersions) .values({ id: newId("nver"), networkId: input.networkId, versionLabel: input.versionLabel, internalGraph: input.internalGraph, config: input.config, status: input.promote ? "active" : "shadow", builtBy: "system", builderModelTier: input.builderModelTier, }) .returning(); variant = inserted!; } if (input.promote) { await promoteVariant(input.networkId, variant.id, input.reason); } return variant; } /** * Promote a variant to active. Task #242 (B1) — 漏洞 3 修复:整体包进 * `db.transaction` 防止"network_promotions 已写但 active_variant_id 未 * 更新"的非一致状态。失败时全部回滚。 * * 注:`promoteVariant` **不动** privateNamespace 非空的行(那是用户私有 * variant,不参与全局 active 选举)。这里假设 `toVariantId` 是公共行; * 调用方(builder / admin)负责传入正确的 ID。 */ export async function promoteVariant( networkId: string, toVariantId: string, reason: string, ): Promise { try { await _promoteVariantTx(networkId, toVariantId, reason); } catch (err) { // Architect review: 事务回滚后必须留下 promotion_failed audit row, // 否则失败的升级既没改 active_variant_id 也无任何痕迹,运维侧完全失明。 // 复用 capability_lifecycle_events(capability_id 字段是 free-form text, // 这里塞 "network:" + networkId 与真 capability event 区分)。 try { const { capabilityLifecycleEvents } = await import("@workspace/db"); await db.insert(capabilityLifecycleEvents).values({ id: newId("clcyev"), capabilityId: `network:${networkId}`, fromState: null, toState: "promotion_failed", reason: "promotion_failed", payload: { networkId, toVariantId, requestedReason: reason, error: err instanceof Error ? err.message : String(err), }, }); } catch (auditErr) { logger.error( { auditErr, originalErr: err, networkId, toVariantId }, "tool-network: failed to write promotion_failed audit (original promotion already rolled back)", ); } throw err; } } async function _promoteVariantTx( networkId: string, toVariantId: string, reason: string, ): Promise { await db.transaction(async (tx) => { const [network] = await tx .select() .from(toolNetworks) .where(eq(toolNetworks.id, networkId)) .limit(1); if (!network) throw new Error(`tool-network: ${networkId} not found`); if (network.activeVariantId === toVariantId) return; // Verify target exists and is public (not user-private). const [target] = await tx .select() .from(networkVersions) .where(eq(networkVersions.id, toVariantId)) .limit(1); if (!target) { throw new Error( `tool-network: variant ${toVariantId} not found (cannot promote)`, ); } if (target.privateNamespace) { throw new Error( `tool-network: variant ${toVariantId} is user-private (namespace=${target.privateNamespace}); ` + `cannot promote to global active slot`, ); } if (target.networkId !== networkId) { throw new Error( `tool-network: variant ${toVariantId} belongs to network ${target.networkId}, not ${networkId}`, ); } await tx.insert(networkPromotions).values({ id: newId("nprm"), networkId, fromVariantId: network.activeVariantId, toVariantId, reason, decidedBy: "system", }); await tx .update(networkVersions) .set({ status: "active" }) .where(eq(networkVersions.id, toVariantId)); if (network.activeVariantId) { // Demote the previous active *only* if it is the public row // (private variants never sit in active_variant_id, so this is // a defensive no-op when the previous slot was unset). await tx .update(networkVersions) .set({ status: "shadow" }) .where( and( eq(networkVersions.id, network.activeVariantId), isNull(networkVersions.privateNamespace), ), ); } await tx .update(toolNetworks) .set({ activeVariantId: toVariantId, updatedAt: new Date() }) .where(eq(toolNetworks.id, networkId)); }); } export async function listPromotions( networkId: string, limit = 20, ): Promise> { return db .select() .from(networkPromotions) .where(eq(networkPromotions.networkId, networkId)) .orderBy(desc(networkPromotions.createdAt)) .limit(limit); } // --------------------------------------------------------------------------- // LLM-visible tool synthesis // --------------------------------------------------------------------------- /** * Convert a network row into the synthetic LLM tool the planner sees. * The tool's parameters mirror the network's input contract; the * description carries problem class + cost/latency hints. */ export function networkToLlmTool(network: ToolNetworkRow): LlmTool { const params = typeof network.inputContract === "object" && network.inputContract ? (network.inputContract as Record) : { type: "object", properties: {}, additionalProperties: true }; const hintParts: string[] = [`problem_class=${network.problemClassPath}`]; if (network.costHint != null) hintParts.push(`cost~${network.costHint}`); if (network.latencyHintMs != null) hintParts.push(`latency~${network.latencyHintMs}ms`); return { name: `run_${network.name}`, description: (network.description || `Run the ${network.name} tool network.`) + `\n[${hintParts.join(", ")}]`, parameters: params, }; } // --------------------------------------------------------------------------- // Executor entry point // --------------------------------------------------------------------------- export interface RunNetworkArgs { networkName: string; input: Record; ctx?: NetworkRuntimeContext; /** * Wave B — when set, run a specific variant instead of the network's * current `active_variant_id`. Used by the pre-promotion regression * suite and by shadow-A/B harnesses that need to exercise a candidate * without flipping the active slot. */ variantOverride?: string; /** * Optional audit string forwarded through evidence rows. Defaults to * "user" — set explicitly when the call originates from automation * (e.g. "regression_suite", "shadow_executor"). */ actor?: string; /** * Wave B / #182 — when true and no `variantOverride` is set, the * runner fires a fire-and-forget shadow A/B execution after the * active call returns. The chat synthetic-tool dispatcher passes * `true` so production traffic feeds `network_shadow_samples` and * unblocks the auto-promote N≥30 / CI gates. Regression-suite calls * leave it unset so they do not pollute live shadow data. */ enableShadow?: boolean; /** * Override the default `SHADOW_SAMPLE_RATE`. Useful for tests * (`shadowSampleRate: 1` to force) and for per-network tuning if a * particular network needs richer or sparser sampling. */ shadowSampleRate?: number; } export async function runNetwork(args: RunNetworkArgs): Promise { const network = await getNetworkByName(args.networkName); if (!network) { throw new Error(`tool-network: '${args.networkName}' not registered`); } let variant: NetworkVersionRow | null = null; if (args.variantOverride) { const rows = await db .select() .from(networkVersions) .where(eq(networkVersions.id, args.variantOverride)) .limit(1); variant = rows[0] ?? null; if (variant && variant.networkId !== network.id) { throw new Error( `tool-network: variantOverride ${args.variantOverride} does not belong to ${args.networkName}`, ); } } else { // Task #242 (B1) — 漏洞 4:派发尊重用户私有 namespace。 variant = await getActiveVariant(network, args.ctx?.ownerUserId ?? null); } if (!variant) { throw new Error( `tool-network: '${args.networkName}' has no active variant`, ); } const start = Date.now(); const ctx: NetworkRuntimeContext = args.ctx ?? { meta: args.actor ? { actor: args.actor } : {}, }; // Task #242 (B1) — 漏洞 1:graph executor 双路径开关。 // `variant.config.executor === 'graph'` 且 feature flag 未关闭 → // 用 `executeByGraph(internalGraph, handlers)` 跑 step 序列; // 否则继续走旧 `runners.get(network.name)` 路径(向后兼容)。 const variantConfig = (variant.config ?? {}) as Record; const useGraph = variantConfig["executor"] === "graph" && isGraphExecutorEnabled(); // Hoisted: shadow sampling below needs `runner` for the legacy path; on // graph executor path we leave it null and skip shadow (B6 will lift the // shadow harness onto graphs explicitly). const runner = useGraph ? null : runners.get(network.name); let result: NetworkRunResult; if (useGraph) { const graph = variant.internalGraph as | { nodes?: string[]; edges?: Array<{ from: string; to: string; relation?: string }> } | null; if (!graph || !Array.isArray(graph.nodes)) { throw new Error( `tool-network: '${args.networkName}' variant ${variant.id} has executor='graph' ` + `but internalGraph.nodes is missing`, ); } // Build per-call handlers map from the registered global registry, // narrowed to nodes this graph references. Unknown node names // surface as `GraphExecutorError` from `executeByGraph`. const handlers: Record = {}; for (const name of graph.nodes) { const h = graphStepHandlers.get(name); if (h) handlers[name] = h; } try { const ge = await executeByGraph({ graph: { nodes: graph.nodes, edges: graph.edges }, input: args.input, ctx, handlers, }); result = toNetworkRunResult(ge); result.durationMs = result.durationMs ?? Date.now() - start; } catch (err) { logger.error( { err, network: network.name, variantId: variant.id, executor: "graph" }, "tool-network: graph executor threw", ); throw err; } } else { if (!runner) { throw new Error( `tool-network: '${args.networkName}' runner not registered (call registerNetworkRunner at boot)`, ); } try { const r = await runner(args.input, variant, ctx); result = { ...r, durationMs: r.durationMs ?? Date.now() - start, }; } catch (err) { logger.error( { err, network: network.name, variantId: variant.id }, "tool-network: runner threw", ); throw err; } } // Fire-and-forget shadow sampling. Gated on the caller (chat path // sets `enableShadow=true`) and only ever runs against the *live* // active variant — when a `variantOverride` is in play the caller is // already exercising a non-active variant and a shadow on top would // double-charge the budget without producing comparable data. if (args.enableShadow && !args.variantOverride && runner) { // Lazy import keeps tool-network ↔ shadow free of a cyclic edge. // Skipped on graph-executor path (runner is null) — B6 will port the // shadow harness to the graph executor explicitly. void import("./evolution/shadow") .then(({ maybeRecordShadowFromActive }) => maybeRecordShadowFromActive({ network, activeVariant: variant!, input: args.input, ctx, activeResult: result, runner, sampleRate: args.shadowSampleRate, }), ) .catch((err) => logger.warn( { err, network: network.name }, "tool-network: shadow sampling threw (suppressed)", ), ); } return result; } /** * Convenience wrapper for the chat synthetic-tool dispatcher (#178). * Forces `enableShadow: true` so every chat-driven network call has a * chance to feed `network_shadow_samples`. Equivalent to * `runNetwork({ ...args, enableShadow: true, actor: args.actor ?? "chat" })`, * but kept as a named entry point so the chat code stays declarative * about its intent. */ export async function runNetworkForChat( args: Omit, ): Promise { return runNetwork({ ...args, actor: args.actor ?? "chat", enableShadow: true, }); } // --------------------------------------------------------------------------- // Chat dispatcher hookup (#182 / #178 hand-off) // --------------------------------------------------------------------------- /** * Convention: tools surfaced to the LLM for tool-networks are named * `run_` (see `networkToLlmTool`). The chat dispatcher * uses this prefix to detect a network call and route it through * `runNetworkForChat` (which forces `enableShadow: true`) instead of * looking the name up in the static `TOOLS` array. */ export const NETWORK_TOOL_PREFIX = "run_"; export interface DispatchNetworkToolResult { /** True when the tool name matched a registered network. */ matched: boolean; /** Network result when matched and successful. */ result?: NetworkRunResult; /** Error payload when matched but the runner threw. */ error?: { error: string; error_code: string; retryable: boolean }; /** Wall-clock duration of the active call in ms (for chat metrics). */ durationMs?: number; } /** * Chat-router hookup: if `toolName` matches the `run_` * convention AND the named network is registered with an active * variant, dispatch via `runNetworkForChat` (which fires the shadow * A/B sample post-active). Returns `{matched: false}` otherwise so * the caller can fall back to the static tool registry. * * Never throws — runner errors are folded into a structured `error` * payload that the dispatcher can surface to the LLM the same way * `findTool().invoke()` failures are. */ export async function dispatchNetworkTool( toolName: string, args: Record, ctx?: NetworkRuntimeContext, /** * Optional per-call shadow sample rate. Production callers omit this * and inherit `SHADOW_SAMPLE_RATE` (env-tunable, default 0.1). Tests * pass `1` to force-sample so the assertion is deterministic. */ shadowSampleRate?: number, ): Promise { if (!toolName.startsWith(NETWORK_TOOL_PREFIX)) { return { matched: false }; } const networkName = toolName.slice(NETWORK_TOOL_PREFIX.length); const network = await getNetworkByName(networkName).catch(() => null); if (!network || !network.activeVariantId) { return { matched: false }; } // Task #242 (B1) — 漏洞 2 修复:dispatchNetworkTool 之前先用 ajv // 校验 args 是否符合 network.input_contract。失败: // - 写一行 network_dispatch_violations 审计; // - DOATLAS_INPUT_CONTRACT_ENFORCE=true(dev 默认):返回结构化 // INPUT_CONTRACT_VIOLATION 错误,**不进 runner**; // - DOATLAS_INPUT_CONTRACT_ENFORCE=false(prod 默认):observed_only // 模式,只记录,继续派发(防止旧网络突然报错)。 const enforce = isInputContractEnforced(); const contract = network.inputContract as Record | null; if (contract && typeof contract === "object") { const v = ajvValidate(contract, args); if (!v.valid) { const argsHash = createHash("sha256") .update(JSON.stringify(args ?? {})) .digest("hex") .slice(0, 16); try { await db.insert(networkDispatchViolations).values({ id: newId("ndv"), networkName, toolName, argsHash, detailsJson: { errors: v.errors, contractTitle: contract["title"] ?? null }, mode: enforce ? "enforced" : "observed_only", conversationId: ctx?.conversationId ?? null, ownerUserId: ctx?.ownerUserId ?? null, }); } catch (err) { // Don't fail dispatch on telemetry write — log only. logger.warn( { err, networkName }, "tool-network: failed to persist dispatch violation", ); } if (enforce) { return { matched: true, error: { error: `INPUT_CONTRACT_VIOLATION: ${v.errors .map((e) => `${e.path}: ${e.message}`) .join("; ")}`, error_code: "INPUT_CONTRACT_VIOLATION", retryable: false, }, durationMs: 0, }; } // observed_only: fall through and continue dispatch. logger.warn( { networkName, errors: v.errors }, "tool-network: input_contract violation observed but not enforced", ); } } const start = Date.now(); try { const result = await runNetworkForChat({ networkName, input: args, ctx, shadowSampleRate, }); return { matched: true, result, durationMs: Date.now() - start }; } catch (err) { return { matched: true, error: { error: err instanceof Error ? err.message : String(err), error_code: "network_runner_threw", retryable: false, }, durationMs: Date.now() - start, }; } }