doatlas-2 / artifacts /api-server /src /lib /tool-network.ts
Iostream-Li's picture
Add files using upload-large-folder tool
5871090 verified
/**
* tool-network — registry + executor for first-class tool networks
* (Task #176, Wave A).
*
* A tool network is a curated, end-to-end runnable subgraph that solves
* a problem class. The planner LLM sees a single synthetic
* `run_<network_name>` service card whose JSON-Schema parameters are the
* network's input contract — internal atomic tool calls happen inside
* this module's executor and are invisible to the planner.
*
* This file owns:
* - CRUD against tool_networks / network_versions / network_promotions
* / problem_classes.
* - Active-variant resolution (single primary + zero-or-more shadows).
* - Internal subgraph registration (executor lookup table).
* - `runNetwork()` — entry point used by the synthetic tool dispatcher.
*
* The executor is intentionally a small dispatch layer; per-network
* runners (e.g. drugclip/runner) register themselves at boot via
* `registerNetworkRunner()` and the registry hands them validated input
* + the chosen variant config.
*/
import { and, desc, eq, isNull } from "drizzle-orm";
import { createHash } from "node:crypto";
import {
db,
toolNetworks,
networkVersions,
networkPromotions,
problemClasses,
networkDispatchViolations,
type ToolNetworkRow,
type NetworkVersionRow,
type ProblemClassRow,
} from "@workspace/db";
import { newId } from "./ids";
import { logger } from "./logger";
import type { LlmTool } from "../llm/types";
import { validate as ajvValidate } from "@workspace/networks";
import {
executeByGraph,
toNetworkRunResult,
type StepHandler,
} from "./tool-network/graph-executor.js";
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
export interface NetworkInternalGraph {
/** Atomic tool node names this network executes, in declared order. */
nodes: string[];
/** Optional edges between nodes (informational, executor uses `nodes`). */
edges?: Array<{ from: string; to: string; relation?: string }>;
}
export interface NetworkRuntimeContext {
conversationId?: string | null;
messageId?: string | null;
ownerUserId?: string | null;
/** Optional planId, populated when invoked through the blueprint runner. */
planId?: string | null;
/** Free-form per-call metadata propagated to evidence rows. */
meta?: Record<string, unknown>;
/**
* Optional cooperative-cancellation signal. The shadow A/B sampler
* (`evolution/shadow.executeShadowSample`) plumbs an `AbortSignal`
* here that fires when the 1.5× wall-clock budget cap is breached.
* Runners and atomic tools SHOULD check `signal?.aborted` (or pass
* `signal` into `fetch`/timeouts) at I/O boundaries so an over-budget
* shadow run actually stops billing instead of merely being ignored
* by the caller. Runners that do not honour the signal still get
* timeout-bookkeeping protection — the shadow row is marked
* `budget_skipped` regardless — but the underlying work continues.
*/
signal?: AbortSignal;
}
export interface NetworkRunResult {
output: Record<string, unknown>;
/** Per-step traces for log + reviewer. */
steps: Array<{
name: string;
durationMs: number;
status: "ok" | "error";
summary?: string;
}>;
/** Optional path to a primary artifact this run produced. */
artifactPath?: string | null;
/** Total executor latency in ms. */
durationMs: number;
/**
* Optional runner-side metrics. The Reviewer pipeline reads two
* conventional fields here:
*
* - `runnerScore` — a 0..1 self-evaluation proxy (e.g. retrieval
* coherence, evidence consistency, recall@K). Networks define
* their own normalisation; the framework just feeds it into the
* `runner_signal` Reviewer channel via `gradeChannels`.
* - `channelBreakdown` — pre-graded channel scores the runner
* wants to override the framework defaults with. Keys SHOULD
* match `REVIEWER_CHANNEL_KEYS` from `lib/reviewer/channels.ts`;
* unknown keys are still merged but the fitness gate ignores
* them unless the matching `problem_classes.reviewer_weights`
* entry exists.
*
* Other arbitrary metrics (e.g. raw `aggregateEf1` for audit) MAY
* be carried alongside but are not consumed by the gate.
*/
metrics?: {
runnerScore?: number;
channelBreakdown?: Record<string, number>;
[k: string]: unknown;
};
}
export type NetworkRunner = (
input: Record<string, unknown>,
variant: NetworkVersionRow,
ctx: NetworkRuntimeContext,
) => Promise<NetworkRunResult>;
const runners = new Map<string, NetworkRunner>();
/**
* Register an executor for a given network name. Called at boot from
* each network's seed module (e.g. drugclip/seed).
*/
export function registerNetworkRunner(name: string, runner: NetworkRunner): void {
if (runners.has(name)) {
logger.warn({ name }, "tool-network: runner re-registered (overwriting)");
}
runners.set(name, runner);
}
export function listRegisteredRunners(): string[] {
return [...runners.keys()].sort();
}
// ---------------------------------------------------------------------------
// Task #242 (B1) — graph-executor step handler registry
// ---------------------------------------------------------------------------
/**
* 漏洞 1 修复:graph-executor 路径。当 `variant.config.executor === 'graph'`
* 时 runNetwork 走 `executeByGraph(internalGraph, ...)`,按 node 名查这张
* map 找 step handler。这是与 `runners`(per-network runner)正交的注册:
* 一个 step handler 可以被多个 capability/network 复用(例如 "fetch_pocket"
* 同时被 drugclip 和未来 dockingv2 用)。
*
* B8 之后 drugclip 才迁移到 graph executor;本任务只把通路打通。
*/
const graphStepHandlers = new Map<string, StepHandler>();
export function registerGraphStepHandler(name: string, handler: StepHandler): void {
if (graphStepHandlers.has(name)) {
logger.warn(
{ name },
"tool-network: graph step handler re-registered (overwriting)",
);
}
graphStepHandlers.set(name, handler);
}
export function listRegisteredGraphStepHandlers(): string[] {
return [...graphStepHandlers.keys()].sort();
}
export function clearGraphStepHandlersForTest(): void {
graphStepHandlers.clear();
}
function isGraphExecutorEnabled(): boolean {
const v = process.env["DOATLAS_GRAPH_EXECUTOR_ENABLED"];
if (v === undefined) return true;
return v !== "0" && v.toLowerCase() !== "false";
}
function isInputContractEnforced(): boolean {
const v = process.env["DOATLAS_INPUT_CONTRACT_ENFORCE"];
if (v === undefined) {
// dev=true / prod=false (per task #242 plan, soft-launch)
return process.env["NODE_ENV"] !== "production";
}
return v !== "0" && v.toLowerCase() !== "false";
}
// ---------------------------------------------------------------------------
// problem_classes
// ---------------------------------------------------------------------------
export interface UpsertProblemClassInput {
path: string;
parentPath?: string | null;
label: string;
description?: string;
capabilityTags?: string[];
/**
* Task #227 — 接受两种 jsonb 形态:
* - 旧扁平:`{contract_validity: 0.25, ...}`(直接 channel→weight)
* - 新双套:`{weightsLabelFree: {...}, weightsWithTruth: {...}}`
* 由 fitness.weightedScore 按 metric 行级 ledger 状态切换。
* 用 unknown 而非 number 是因为新形态值是嵌套对象;resolveChannelWeights
* 在读出时再做 narrow / fallback。
*/
reviewerWeights?: Record<string, unknown> | null;
}
export async function upsertProblemClass(
input: UpsertProblemClassInput,
): Promise<ProblemClassRow> {
const existing = await db
.select()
.from(problemClasses)
.where(eq(problemClasses.path, input.path))
.limit(1);
if (existing[0]) {
const [updated] = await db
.update(problemClasses)
.set({
parentPath: input.parentPath ?? existing[0].parentPath,
label: input.label,
description: input.description ?? existing[0].description,
capabilityTags: input.capabilityTags ?? (existing[0].capabilityTags as string[]),
reviewerWeights: input.reviewerWeights ?? existing[0].reviewerWeights,
updatedAt: new Date(),
})
.where(eq(problemClasses.id, existing[0].id))
.returning();
return updated!;
}
const [inserted] = await db
.insert(problemClasses)
.values({
id: newId("pcls"),
path: input.path,
parentPath: input.parentPath ?? null,
label: input.label,
description: input.description ?? "",
capabilityTags: input.capabilityTags ?? [],
reviewerWeights: input.reviewerWeights ?? null,
})
.returning();
return inserted!;
}
export async function listProblemClasses(): Promise<ProblemClassRow[]> {
return db.select().from(problemClasses);
}
export async function getProblemClassByPath(
path: string,
): Promise<ProblemClassRow | null> {
const rows = await db
.select()
.from(problemClasses)
.where(eq(problemClasses.path, path))
.limit(1);
return rows[0] ?? null;
}
// ---------------------------------------------------------------------------
// tool_networks + network_versions
// ---------------------------------------------------------------------------
export interface UpsertNetworkInput {
name: string;
problemClassPath: string;
description?: string;
inputContract: Record<string, unknown>;
outputContract: Record<string, unknown>;
internalGraph: NetworkInternalGraph;
capabilityTags?: string[];
costHint?: number | null;
latencyHintMs?: number | null;
builderModelTier?: "weak" | "strong";
legacyAliasNodeId?: string | null;
/**
* Initial variant config — when supplied (and no active variant
* exists) this is persisted as v1 and promoted to active.
*/
initialVariant?: {
versionLabel: string;
config: Record<string, unknown>;
};
}
/**
* Idempotent upsert of a network row + (optional) initial variant.
* Returns the network row with `activeVariantId` populated.
*/
export async function upsertNetwork(
input: UpsertNetworkInput,
): Promise<ToolNetworkRow> {
const existing = await db
.select()
.from(toolNetworks)
.where(eq(toolNetworks.name, input.name))
.limit(1);
let row: ToolNetworkRow;
if (existing[0]) {
const [updated] = await db
.update(toolNetworks)
.set({
problemClassPath: input.problemClassPath,
description: input.description ?? existing[0].description,
inputContract: input.inputContract,
outputContract: input.outputContract,
internalGraph: input.internalGraph,
capabilityTags: input.capabilityTags ?? (existing[0].capabilityTags as string[]),
costHint: input.costHint ?? existing[0].costHint,
latencyHintMs: input.latencyHintMs ?? existing[0].latencyHintMs,
builderModelTier: input.builderModelTier ?? existing[0].builderModelTier,
legacyAliasNodeId: input.legacyAliasNodeId ?? existing[0].legacyAliasNodeId,
updatedAt: new Date(),
})
.where(eq(toolNetworks.id, existing[0].id))
.returning();
row = updated!;
} else {
const [inserted] = await db
.insert(toolNetworks)
.values({
id: newId("tnet"),
name: input.name,
problemClassPath: input.problemClassPath,
description: input.description ?? "",
inputContract: input.inputContract,
outputContract: input.outputContract,
internalGraph: input.internalGraph,
capabilityTags: input.capabilityTags ?? [],
costHint: input.costHint ?? null,
latencyHintMs: input.latencyHintMs ?? null,
builderModelTier: input.builderModelTier ?? "strong",
legacyAliasNodeId: input.legacyAliasNodeId ?? null,
status: "active",
})
.returning();
row = inserted!;
}
if (!row.activeVariantId && input.initialVariant) {
const variant = await createVariant({
networkId: row.id,
versionLabel: input.initialVariant.versionLabel,
config: input.initialVariant.config,
internalGraph: input.internalGraph,
builderModelTier: input.builderModelTier ?? "strong",
promote: true,
reason: "initial_seed",
});
row = { ...row, activeVariantId: variant.id };
}
return row;
}
export async function getNetworkByName(
name: string,
): Promise<ToolNetworkRow | null> {
const rows = await db
.select()
.from(toolNetworks)
.where(eq(toolNetworks.name, name))
.limit(1);
return rows[0] ?? null;
}
export async function listActiveNetworks(): Promise<ToolNetworkRow[]> {
return db
.select()
.from(toolNetworks)
.where(eq(toolNetworks.status, "active"));
}
export async function listNetworksForClass(
problemClassPath: string,
): Promise<ToolNetworkRow[]> {
return db
.select()
.from(toolNetworks)
.where(
and(
eq(toolNetworks.problemClassPath, problemClassPath),
eq(toolNetworks.status, "active"),
),
);
}
/**
* Resolve the active variant for a network, with optional user-private
* namespace override.
*
* Task #242 (B1) — 漏洞 4 修复:`network_versions.private_namespace` 字段
* 已经存在但 `getActiveVariant` 完全忽略它,意味着用户 fork 出来的私有
* variant 永远派发不到。现在的策略:
*
* 1. 当 userId 给出且非空 → 先查 `private_namespace = userId AND
* status='active' AND networkId = network.id`,命中即返回(私有覆盖)。
* 2. 否则(或私有未命中)→ 回落到公共 `network.activeVariantId`。
*
* 注:私有 variant 不更新 `tool_networks.active_variant_id`(那是全局
* 共享槽);只在自己 namespace 内 active。
*/
export async function getActiveVariant(
network: ToolNetworkRow,
userId?: string | null,
): Promise<NetworkVersionRow | null> {
if (userId) {
const privateRows = await db
.select()
.from(networkVersions)
.where(
and(
eq(networkVersions.networkId, network.id),
eq(networkVersions.privateNamespace, userId),
eq(networkVersions.status, "active"),
),
)
.limit(1);
if (privateRows[0]) return privateRows[0];
}
if (!network.activeVariantId) return null;
const rows = await db
.select()
.from(networkVersions)
.where(eq(networkVersions.id, network.activeVariantId))
.limit(1);
return rows[0] ?? null;
}
/**
* Task #242 (B1) — 漏洞 4 配套 API:首次 fork 时 clone 公共 active
* variant 到用户私有 namespace。B9 用。
*
* - 已有 `(networkId, privateNamespace=userId, status='active')` 的私有行 → 直接返回。
* - 否则:读公共 active 的 config + internalGraph,clone 一行 status='active'
* privateNamespace=userId,**不动** `tool_networks.active_variant_id`。
*
* Throws 当公共 active variant 不存在(无法 clone 源)。
*/
export async function getOrCreateUserPrivateVariant(
networkId: string,
userId: string,
reason: string,
): Promise<NetworkVersionRow> {
if (!userId) {
throw new Error("getOrCreateUserPrivateVariant: userId required");
}
// Architect review: real concurrency-safe idempotency. Two requests
// racing for the same (networkId, userId) must converge on a single
// active private variant. We rely on the partial unique index
// `network_versions_uniq_active_private` (created in raw SQL, see
// schema/toolNetwork.ts) so the DB rejects the second insert; the
// loser retries and re-selects the winner's row.
//
// 注意:Postgres 在 statement 失败后会把整个 tx 标 aborted,无法
// 在同一 tx 内继续 select。所以 race 检测必须在 tx 外层 retry。
// 最多 2 次:第 1 次 select-then-insert,第 2 次必命中 winner。
for (let attempt = 0; attempt < 2; attempt += 1) {
const existing = await db
.select()
.from(networkVersions)
.where(
and(
eq(networkVersions.networkId, networkId),
eq(networkVersions.privateNamespace, userId),
eq(networkVersions.status, "active"),
),
)
.limit(1);
if (existing[0]) return existing[0];
const [network] = await db
.select()
.from(toolNetworks)
.where(eq(toolNetworks.id, networkId))
.limit(1);
if (!network) {
throw new Error(
`getOrCreateUserPrivateVariant: network ${networkId} not found`,
);
}
if (!network.activeVariantId) {
throw new Error(
`getOrCreateUserPrivateVariant: network ${networkId} has no public active variant to clone from`,
);
}
const [publicActive] = await db
.select()
.from(networkVersions)
.where(eq(networkVersions.id, network.activeVariantId))
.limit(1);
if (!publicActive) {
throw new Error(
`getOrCreateUserPrivateVariant: public active variant ${network.activeVariantId} not found`,
);
}
const versionLabel = `${publicActive.versionLabel}-priv-${userId.slice(0, 8)}-${Date.now()}-${attempt}`;
try {
const [inserted] = await db
.insert(networkVersions)
.values({
id: newId("nver"),
networkId,
versionLabel,
internalGraph: publicActive.internalGraph,
config: publicActive.config,
status: "active",
builtBy: `user:${userId}`,
builderModelTier: publicActive.builderModelTier,
privateNamespace: userId,
})
.returning();
logger.info(
{ networkId, userId, fromVariantId: publicActive.id, newVariantId: inserted!.id, reason },
"tool-network: created user-private variant",
);
return inserted!;
} catch (err) {
// Unique violation on the partial index (Postgres SQLSTATE 23505)
// means a concurrent caller won the race. Loop back to the
// initial select; that path will return the winner's row.
const code = (err as { code?: string }).code;
if (code !== "23505" || attempt >= 1) throw err;
logger.info(
{ networkId, userId, reason },
"tool-network: race detected on user-private variant, retrying select",
);
}
}
// Defensive — shouldn't reach here because the second attempt's select
// must succeed if a 23505 fired on the first.
throw new Error(
"getOrCreateUserPrivateVariant: race resolution failed after retry",
);
}
export interface CreateVariantInput {
networkId: string;
versionLabel: string;
config: Record<string, unknown>;
internalGraph: NetworkInternalGraph;
builderModelTier: "weak" | "strong";
promote: boolean;
reason: string;
}
export async function createVariant(
input: CreateVariantInput,
): Promise<NetworkVersionRow> {
// Idempotent on (network_id, version_label)
const existing = await db
.select()
.from(networkVersions)
.where(
and(
eq(networkVersions.networkId, input.networkId),
eq(networkVersions.versionLabel, input.versionLabel),
),
)
.limit(1);
let variant: NetworkVersionRow;
if (existing[0]) {
variant = existing[0];
} else {
const [inserted] = await db
.insert(networkVersions)
.values({
id: newId("nver"),
networkId: input.networkId,
versionLabel: input.versionLabel,
internalGraph: input.internalGraph,
config: input.config,
status: input.promote ? "active" : "shadow",
builtBy: "system",
builderModelTier: input.builderModelTier,
})
.returning();
variant = inserted!;
}
if (input.promote) {
await promoteVariant(input.networkId, variant.id, input.reason);
}
return variant;
}
/**
* Promote a variant to active. Task #242 (B1) — 漏洞 3 修复:整体包进
* `db.transaction` 防止"network_promotions 已写但 active_variant_id 未
* 更新"的非一致状态。失败时全部回滚。
*
* 注:`promoteVariant` **不动** privateNamespace 非空的行(那是用户私有
* variant,不参与全局 active 选举)。这里假设 `toVariantId` 是公共行;
* 调用方(builder / admin)负责传入正确的 ID。
*/
export async function promoteVariant(
networkId: string,
toVariantId: string,
reason: string,
): Promise<void> {
try {
await _promoteVariantTx(networkId, toVariantId, reason);
} catch (err) {
// Architect review: 事务回滚后必须留下 promotion_failed audit row,
// 否则失败的升级既没改 active_variant_id 也无任何痕迹,运维侧完全失明。
// 复用 capability_lifecycle_events(capability_id 字段是 free-form text,
// 这里塞 "network:" + networkId 与真 capability event 区分)。
try {
const { capabilityLifecycleEvents } = await import("@workspace/db");
await db.insert(capabilityLifecycleEvents).values({
id: newId("clcyev"),
capabilityId: `network:${networkId}`,
fromState: null,
toState: "promotion_failed",
reason: "promotion_failed",
payload: {
networkId,
toVariantId,
requestedReason: reason,
error: err instanceof Error ? err.message : String(err),
},
});
} catch (auditErr) {
logger.error(
{ auditErr, originalErr: err, networkId, toVariantId },
"tool-network: failed to write promotion_failed audit (original promotion already rolled back)",
);
}
throw err;
}
}
async function _promoteVariantTx(
networkId: string,
toVariantId: string,
reason: string,
): Promise<void> {
await db.transaction(async (tx) => {
const [network] = await tx
.select()
.from(toolNetworks)
.where(eq(toolNetworks.id, networkId))
.limit(1);
if (!network) throw new Error(`tool-network: ${networkId} not found`);
if (network.activeVariantId === toVariantId) return;
// Verify target exists and is public (not user-private).
const [target] = await tx
.select()
.from(networkVersions)
.where(eq(networkVersions.id, toVariantId))
.limit(1);
if (!target) {
throw new Error(
`tool-network: variant ${toVariantId} not found (cannot promote)`,
);
}
if (target.privateNamespace) {
throw new Error(
`tool-network: variant ${toVariantId} is user-private (namespace=${target.privateNamespace}); ` +
`cannot promote to global active slot`,
);
}
if (target.networkId !== networkId) {
throw new Error(
`tool-network: variant ${toVariantId} belongs to network ${target.networkId}, not ${networkId}`,
);
}
await tx.insert(networkPromotions).values({
id: newId("nprm"),
networkId,
fromVariantId: network.activeVariantId,
toVariantId,
reason,
decidedBy: "system",
});
await tx
.update(networkVersions)
.set({ status: "active" })
.where(eq(networkVersions.id, toVariantId));
if (network.activeVariantId) {
// Demote the previous active *only* if it is the public row
// (private variants never sit in active_variant_id, so this is
// a defensive no-op when the previous slot was unset).
await tx
.update(networkVersions)
.set({ status: "shadow" })
.where(
and(
eq(networkVersions.id, network.activeVariantId),
isNull(networkVersions.privateNamespace),
),
);
}
await tx
.update(toolNetworks)
.set({ activeVariantId: toVariantId, updatedAt: new Date() })
.where(eq(toolNetworks.id, networkId));
});
}
export async function listPromotions(
networkId: string,
limit = 20,
): Promise<Array<typeof networkPromotions.$inferSelect>> {
return db
.select()
.from(networkPromotions)
.where(eq(networkPromotions.networkId, networkId))
.orderBy(desc(networkPromotions.createdAt))
.limit(limit);
}
// ---------------------------------------------------------------------------
// LLM-visible tool synthesis
// ---------------------------------------------------------------------------
/**
* Convert a network row into the synthetic LLM tool the planner sees.
* The tool's parameters mirror the network's input contract; the
* description carries problem class + cost/latency hints.
*/
export function networkToLlmTool(network: ToolNetworkRow): LlmTool {
const params =
typeof network.inputContract === "object" && network.inputContract
? (network.inputContract as Record<string, unknown>)
: { type: "object", properties: {}, additionalProperties: true };
const hintParts: string[] = [`problem_class=${network.problemClassPath}`];
if (network.costHint != null) hintParts.push(`cost~${network.costHint}`);
if (network.latencyHintMs != null) hintParts.push(`latency~${network.latencyHintMs}ms`);
return {
name: `run_${network.name}`,
description:
(network.description || `Run the ${network.name} tool network.`) +
`\n[${hintParts.join(", ")}]`,
parameters: params,
};
}
// ---------------------------------------------------------------------------
// Executor entry point
// ---------------------------------------------------------------------------
export interface RunNetworkArgs {
networkName: string;
input: Record<string, unknown>;
ctx?: NetworkRuntimeContext;
/**
* Wave B — when set, run a specific variant instead of the network's
* current `active_variant_id`. Used by the pre-promotion regression
* suite and by shadow-A/B harnesses that need to exercise a candidate
* without flipping the active slot.
*/
variantOverride?: string;
/**
* Optional audit string forwarded through evidence rows. Defaults to
* "user" — set explicitly when the call originates from automation
* (e.g. "regression_suite", "shadow_executor").
*/
actor?: string;
/**
* Wave B / #182 — when true and no `variantOverride` is set, the
* runner fires a fire-and-forget shadow A/B execution after the
* active call returns. The chat synthetic-tool dispatcher passes
* `true` so production traffic feeds `network_shadow_samples` and
* unblocks the auto-promote N≥30 / CI gates. Regression-suite calls
* leave it unset so they do not pollute live shadow data.
*/
enableShadow?: boolean;
/**
* Override the default `SHADOW_SAMPLE_RATE`. Useful for tests
* (`shadowSampleRate: 1` to force) and for per-network tuning if a
* particular network needs richer or sparser sampling.
*/
shadowSampleRate?: number;
}
export async function runNetwork(args: RunNetworkArgs): Promise<NetworkRunResult> {
const network = await getNetworkByName(args.networkName);
if (!network) {
throw new Error(`tool-network: '${args.networkName}' not registered`);
}
let variant: NetworkVersionRow | null = null;
if (args.variantOverride) {
const rows = await db
.select()
.from(networkVersions)
.where(eq(networkVersions.id, args.variantOverride))
.limit(1);
variant = rows[0] ?? null;
if (variant && variant.networkId !== network.id) {
throw new Error(
`tool-network: variantOverride ${args.variantOverride} does not belong to ${args.networkName}`,
);
}
} else {
// Task #242 (B1) — 漏洞 4:派发尊重用户私有 namespace。
variant = await getActiveVariant(network, args.ctx?.ownerUserId ?? null);
}
if (!variant) {
throw new Error(
`tool-network: '${args.networkName}' has no active variant`,
);
}
const start = Date.now();
const ctx: NetworkRuntimeContext = args.ctx ?? {
meta: args.actor ? { actor: args.actor } : {},
};
// Task #242 (B1) — 漏洞 1:graph executor 双路径开关。
// `variant.config.executor === 'graph'` 且 feature flag 未关闭 →
// 用 `executeByGraph(internalGraph, handlers)` 跑 step 序列;
// 否则继续走旧 `runners.get(network.name)` 路径(向后兼容)。
const variantConfig = (variant.config ?? {}) as Record<string, unknown>;
const useGraph =
variantConfig["executor"] === "graph" && isGraphExecutorEnabled();
// Hoisted: shadow sampling below needs `runner` for the legacy path; on
// graph executor path we leave it null and skip shadow (B6 will lift the
// shadow harness onto graphs explicitly).
const runner = useGraph ? null : runners.get(network.name);
let result: NetworkRunResult;
if (useGraph) {
const graph = variant.internalGraph as
| { nodes?: string[]; edges?: Array<{ from: string; to: string; relation?: string }> }
| null;
if (!graph || !Array.isArray(graph.nodes)) {
throw new Error(
`tool-network: '${args.networkName}' variant ${variant.id} has executor='graph' ` +
`but internalGraph.nodes is missing`,
);
}
// Build per-call handlers map from the registered global registry,
// narrowed to nodes this graph references. Unknown node names
// surface as `GraphExecutorError` from `executeByGraph`.
const handlers: Record<string, StepHandler> = {};
for (const name of graph.nodes) {
const h = graphStepHandlers.get(name);
if (h) handlers[name] = h;
}
try {
const ge = await executeByGraph({
graph: { nodes: graph.nodes, edges: graph.edges },
input: args.input,
ctx,
handlers,
});
result = toNetworkRunResult(ge);
result.durationMs = result.durationMs ?? Date.now() - start;
} catch (err) {
logger.error(
{ err, network: network.name, variantId: variant.id, executor: "graph" },
"tool-network: graph executor threw",
);
throw err;
}
} else {
if (!runner) {
throw new Error(
`tool-network: '${args.networkName}' runner not registered (call registerNetworkRunner at boot)`,
);
}
try {
const r = await runner(args.input, variant, ctx);
result = {
...r,
durationMs: r.durationMs ?? Date.now() - start,
};
} catch (err) {
logger.error(
{ err, network: network.name, variantId: variant.id },
"tool-network: runner threw",
);
throw err;
}
}
// Fire-and-forget shadow sampling. Gated on the caller (chat path
// sets `enableShadow=true`) and only ever runs against the *live*
// active variant — when a `variantOverride` is in play the caller is
// already exercising a non-active variant and a shadow on top would
// double-charge the budget without producing comparable data.
if (args.enableShadow && !args.variantOverride && runner) {
// Lazy import keeps tool-network ↔ shadow free of a cyclic edge.
// Skipped on graph-executor path (runner is null) — B6 will port the
// shadow harness to the graph executor explicitly.
void import("./evolution/shadow")
.then(({ maybeRecordShadowFromActive }) =>
maybeRecordShadowFromActive({
network,
activeVariant: variant!,
input: args.input,
ctx,
activeResult: result,
runner,
sampleRate: args.shadowSampleRate,
}),
)
.catch((err) =>
logger.warn(
{ err, network: network.name },
"tool-network: shadow sampling threw (suppressed)",
),
);
}
return result;
}
/**
* Convenience wrapper for the chat synthetic-tool dispatcher (#178).
* Forces `enableShadow: true` so every chat-driven network call has a
* chance to feed `network_shadow_samples`. Equivalent to
* `runNetwork({ ...args, enableShadow: true, actor: args.actor ?? "chat" })`,
* but kept as a named entry point so the chat code stays declarative
* about its intent.
*/
export async function runNetworkForChat(
args: Omit<RunNetworkArgs, "enableShadow">,
): Promise<NetworkRunResult> {
return runNetwork({
...args,
actor: args.actor ?? "chat",
enableShadow: true,
});
}
// ---------------------------------------------------------------------------
// Chat dispatcher hookup (#182 / #178 hand-off)
// ---------------------------------------------------------------------------
/**
* Convention: tools surfaced to the LLM for tool-networks are named
* `run_<network_name>` (see `networkToLlmTool`). The chat dispatcher
* uses this prefix to detect a network call and route it through
* `runNetworkForChat` (which forces `enableShadow: true`) instead of
* looking the name up in the static `TOOLS` array.
*/
export const NETWORK_TOOL_PREFIX = "run_";
export interface DispatchNetworkToolResult {
/** True when the tool name matched a registered network. */
matched: boolean;
/** Network result when matched and successful. */
result?: NetworkRunResult;
/** Error payload when matched but the runner threw. */
error?: { error: string; error_code: string; retryable: boolean };
/** Wall-clock duration of the active call in ms (for chat metrics). */
durationMs?: number;
}
/**
* Chat-router hookup: if `toolName` matches the `run_<network_name>`
* convention AND the named network is registered with an active
* variant, dispatch via `runNetworkForChat` (which fires the shadow
* A/B sample post-active). Returns `{matched: false}` otherwise so
* the caller can fall back to the static tool registry.
*
* Never throws — runner errors are folded into a structured `error`
* payload that the dispatcher can surface to the LLM the same way
* `findTool().invoke()` failures are.
*/
export async function dispatchNetworkTool(
toolName: string,
args: Record<string, unknown>,
ctx?: NetworkRuntimeContext,
/**
* Optional per-call shadow sample rate. Production callers omit this
* and inherit `SHADOW_SAMPLE_RATE` (env-tunable, default 0.1). Tests
* pass `1` to force-sample so the assertion is deterministic.
*/
shadowSampleRate?: number,
): Promise<DispatchNetworkToolResult> {
if (!toolName.startsWith(NETWORK_TOOL_PREFIX)) {
return { matched: false };
}
const networkName = toolName.slice(NETWORK_TOOL_PREFIX.length);
const network = await getNetworkByName(networkName).catch(() => null);
if (!network || !network.activeVariantId) {
return { matched: false };
}
// Task #242 (B1) — 漏洞 2 修复:dispatchNetworkTool 之前先用 ajv
// 校验 args 是否符合 network.input_contract。失败:
// - 写一行 network_dispatch_violations 审计;
// - DOATLAS_INPUT_CONTRACT_ENFORCE=true(dev 默认):返回结构化
// INPUT_CONTRACT_VIOLATION 错误,**不进 runner**;
// - DOATLAS_INPUT_CONTRACT_ENFORCE=false(prod 默认):observed_only
// 模式,只记录,继续派发(防止旧网络突然报错)。
const enforce = isInputContractEnforced();
const contract = network.inputContract as Record<string, unknown> | null;
if (contract && typeof contract === "object") {
const v = ajvValidate(contract, args);
if (!v.valid) {
const argsHash = createHash("sha256")
.update(JSON.stringify(args ?? {}))
.digest("hex")
.slice(0, 16);
try {
await db.insert(networkDispatchViolations).values({
id: newId("ndv"),
networkName,
toolName,
argsHash,
detailsJson: { errors: v.errors, contractTitle: contract["title"] ?? null },
mode: enforce ? "enforced" : "observed_only",
conversationId: ctx?.conversationId ?? null,
ownerUserId: ctx?.ownerUserId ?? null,
});
} catch (err) {
// Don't fail dispatch on telemetry write — log only.
logger.warn(
{ err, networkName },
"tool-network: failed to persist dispatch violation",
);
}
if (enforce) {
return {
matched: true,
error: {
error: `INPUT_CONTRACT_VIOLATION: ${v.errors
.map((e) => `${e.path}: ${e.message}`)
.join("; ")}`,
error_code: "INPUT_CONTRACT_VIOLATION",
retryable: false,
},
durationMs: 0,
};
}
// observed_only: fall through and continue dispatch.
logger.warn(
{ networkName, errors: v.errors },
"tool-network: input_contract violation observed but not enforced",
);
}
}
const start = Date.now();
try {
const result = await runNetworkForChat({
networkName,
input: args,
ctx,
shadowSampleRate,
});
return { matched: true, result, durationMs: Date.now() - start };
} catch (err) {
return {
matched: true,
error: {
error: err instanceof Error ? err.message : String(err),
error_code: "network_runner_threw",
retryable: false,
},
durationMs: Date.now() - start,
};
}
}