import type { LlmTool } from "../llm/types"; import { recordInvocation, captureOutgoingSeamSamples, resolveNodeForDirectReference, } from "../lib/tool-graph"; import { logger } from "../lib/logger"; /** * MCP-style biomedical tool host. * * Each tool is keyless — they call public open APIs (NCBI E-utilities, * UniProt REST, Open Targets GraphQL). Network errors and non-200 responses * are swallowed into a structured `{ error: string }` payload so the model * can recover gracefully on the next turn. */ export interface ToolDef { schema: LlmTool; invoke(args: Record): Promise; } const PUBMED_BASE = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils"; const UNIPROT_BASE = "https://rest.uniprot.org"; const OPENTARGETS_BASE = "https://api.platform.opentargets.org/api/v4/graphql"; /** * HTTP-aware error so callers (and the LLM, via the structured tool * result) can distinguish a transient upstream issue (rate-limit, 5xx, * network blip, timeout) from a terminal one (400/404). Retryable errors * have already been retried internally — the flag tells the model that * an alternative query or tool may succeed where a verbatim retry will * not. */ class FetchError extends Error { status: number | null; retryable: boolean; attempts: number; constructor( message: string, opts: { status: number | null; retryable: boolean; attempts: number }, ) { super(message); this.name = "FetchError"; this.status = opts.status; this.retryable = opts.retryable; this.attempts = opts.attempts; } } const FETCH_TIMEOUT_MS = 15_000; const FETCH_MAX_ATTEMPTS = 4; const FETCH_BACKOFF_MS = [400, 1200, 3000]; function isTransientStatus(status: number): boolean { return status === 408 || status === 425 || status === 429 || status >= 500; } async function fetchJson(url: string, init?: RequestInit): Promise { let lastErr: unknown = null; let lastStatus: number | null = null; for (let attempt = 1; attempt <= FETCH_MAX_ATTEMPTS; attempt++) { const ctl = new AbortController(); const timer = setTimeout(() => ctl.abort(), FETCH_TIMEOUT_MS); try { const res = await fetch(url, { ...init, signal: init?.signal ?? ctl.signal, headers: { Accept: "application/json", "User-Agent": "DoAtlas/1.0 (research workbench)", ...(init?.headers || {}), }, }); clearTimeout(timer); if (res.ok) return res.json(); lastStatus = res.status; if (isTransientStatus(res.status) && attempt < FETCH_MAX_ATTEMPTS) { const retryAfter = Number(res.headers.get("retry-after")); const wait = Number.isFinite(retryAfter) && retryAfter > 0 ? Math.min(10_000, retryAfter * 1000) : FETCH_BACKOFF_MS[attempt - 1] ?? 3000; await new Promise((r) => setTimeout(r, wait)); lastErr = new Error(`HTTP ${res.status} from ${url}`); continue; } throw new FetchError(`HTTP ${res.status} from ${url}`, { status: res.status, retryable: isTransientStatus(res.status), attempts: attempt, }); } catch (err) { clearTimeout(timer); if (err instanceof FetchError) throw err; lastErr = err; const msg = err instanceof Error ? err.message : String(err); const isAbort = msg.includes("aborted") || msg.includes("timeout"); // Network-level failures (DNS, ECONNRESET, fetch failed, abort) are // assumed transient; back off and try again unless we are out of // budget. if (attempt < FETCH_MAX_ATTEMPTS) { const wait = FETCH_BACKOFF_MS[attempt - 1] ?? 3000; await new Promise((r) => setTimeout(r, wait)); continue; } throw new FetchError( `network error after ${attempt} attempts: ${msg}`, { status: lastStatus, retryable: true, attempts: attempt, }, ); } } throw new FetchError( `exhausted retries: ${lastErr instanceof Error ? lastErr.message : String(lastErr)}`, { status: lastStatus, retryable: true, attempts: FETCH_MAX_ATTEMPTS }, ); } function fetchErrorPayload(err: unknown): { error: string; error_code: string; retryable: boolean; attempts: number; status: number | null; hint: string; } { if (err instanceof FetchError) { const code = err.status === 429 ? "rate_limited" : err.status && err.status >= 500 ? "upstream_unavailable" : err.status ? `http_${err.status}` : "network_error"; const hint = err.status === 429 ? "Upstream rate limit hit even after backoff retries. Try a narrower or differently-worded query, or fall back to a different source tool." : err.status && err.status >= 500 ? "Upstream service is failing. Wait briefly or try an alternative tool." : !err.status ? "Network/timeout error. Retry with a smaller scope, or switch to an alternative tool." : "Upstream rejected the request. Adjust the query and try again, or use a different tool."; return { error: err.message, error_code: code, retryable: err.retryable, attempts: err.attempts, status: err.status, hint, }; } const msg = err instanceof Error ? err.message : String(err); return { error: msg, error_code: "unknown", retryable: false, attempts: 1, status: null, hint: "Inspect the error message; consider an alternative tool or query.", }; } function safe>(args: unknown): T { return (args && typeof args === "object" ? args : {}) as T; } /** * Run one atomic step inside a composite handler and record an evidence * row against the matching atomic graph node. The result of `fn` is * returned as-is so the caller's flow is unchanged. Recording is * best-effort: a missing graph node, an offline DB or any logging error * never propagates into the tool's normal error path. Used by the * decomposition introduced in Task #156 — the LLM still calls the * composite alias, but each internal substep accumulates its own * success/failure counter so #157 can compute per-seam health. */ async function runAtomicStep( nodeName: string, argsSummary: Record | null, fn: () => Promise, ): Promise { const start = Date.now(); // Pre-dispatch resolution (#159): if a direct-reference name resolves // to a node whose status is `deprecated`, throw a structured error // with the canonical replacement. The agent / caller is expected to // re-dispatch against `replacedBy`. Resolver failure is best-effort // (debug log) and never blocks dispatch. let resolved: Awaited> = null; try { resolved = await resolveNodeForDirectReference(nodeName); } catch (err) { logger.debug( { err, nodeName }, "runAtomicStep deprecation pre-resolve failed", ); } if (resolved && resolved.deprecated) { logger.warn( { nodeName: resolved.nodeName, replacedBy: resolved.replacedBy, kind: "tool_dispatch_deprecated", }, "tool dispatch: deprecated node invoked by direct reference — refusing dispatch", ); const err = new Error( `tool '${resolved.nodeName}' is deprecated; use '${resolved.replacedBy ?? "(no replacement set)"}' instead`, ) as Error & { code: string; nodeName: string; replacedBy: string | null; }; err.code = "TOOL_DEPRECATED"; err.nodeName = resolved.nodeName; err.replacedBy = resolved.replacedBy; throw err; } try { const result = await fn(); void recordInvocation({ nodeName, ok: true, argsSummary, resultPreview: result, durationMs: Date.now() - start, }).catch((err) => logger.debug({ err, nodeName }, "runAtomicStep ok-record failed"), ); // Seam-health capture (Task #157): fold the actual result against // every declared downstream consumer's contract. Best-effort — // capture failure never propagates into the tool flow. void captureOutgoingSeamSamples(nodeName, result).catch((err) => logger.debug({ err, nodeName }, "runAtomicStep seam-capture failed"), ); return result; } catch (err) { void recordInvocation({ nodeName, ok: false, argsSummary, resultPreview: null, errorCode: err instanceof Error ? err.name : "error", durationMs: Date.now() - start, }).catch((logErr) => logger.debug({ err: logErr, nodeName }, "runAtomicStep err-record failed"), ); throw err; } } const searchPubmed: ToolDef = { schema: { name: "search_pubmed", description: "在 NCBI PubMed 检索生物医学文献,返回前 N 条命中(含 PMID、标题、作者、期刊、年份、DOI、摘要片段)。仅检索英文文献。", parameters: { type: "object", properties: { query: { type: "string", description: "PubMed 检索语句,支持 MeSH、字段限定、布尔运算,例如 'HER2 AND trastuzumab AND 2024[dp]'。", }, max_results: { type: "integer", minimum: 1, maximum: 20, default: 10, }, }, required: ["query"], }, }, async invoke(args) { const a = safe<{ query: string; max_results?: number }>(args); const q = a.query?.trim(); if (!q) return { error: "query is required" }; const max = Math.max(1, Math.min(20, a.max_results || 10)); try { const search = (await fetchJson( `${PUBMED_BASE}/esearch.fcgi?db=pubmed&retmode=json&retmax=${max}&term=${encodeURIComponent(q)}`, )) as { esearchresult?: { idlist?: string[]; count?: string }; }; const ids = search.esearchresult?.idlist || []; if (ids.length === 0) { return { count: 0, hits: [], source: "pubmed" }; } const summary = (await fetchJson( `${PUBMED_BASE}/esummary.fcgi?db=pubmed&retmode=json&id=${ids.join(",")}`, )) as { result?: Record }; const result = summary.result || {}; const hits = ids.map((pmid) => { const it = (result[pmid] || {}) as { title?: string; source?: string; pubdate?: string; authors?: Array<{ name: string }>; articleids?: Array<{ idtype: string; value: string }>; }; const doi = (it.articleids || []).find((a) => a.idtype === "doi")?.value; return { pmid, title: it.title || "", journal: it.source || "", year: (it.pubdate || "").split(" ")[0] || "", authors: (it.authors || []).slice(0, 6).map((a) => a.name), doi, url: `https://pubmed.ncbi.nlm.nih.gov/${pmid}/`, }; }); return { count: Number(search.esearchresult?.count || ids.length), hits, source: "pubmed" }; } catch (err) { return fetchErrorPayload(err); } }, }; const lookupUniprot: ToolDef = { schema: { name: "lookup_uniprot", description: "在 UniProt 检索蛋白质条目,返回基础信息(accession、推荐名、基因名、物种、序列长度、功能描述)。", parameters: { type: "object", properties: { query: { type: "string", description: "UniProt 检索语句,例如 'gene:BRCA1 AND organism_id:9606' 或 'P38398'。", }, max_results: { type: "integer", minimum: 1, maximum: 10, default: 5, }, }, required: ["query"], }, }, async invoke(args) { const a = safe<{ query: string; max_results?: number }>(args); const q = a.query?.trim(); if (!q) return { error: "query is required" }; const size = Math.max(1, Math.min(10, a.max_results || 5)); try { const data = (await fetchJson( `${UNIPROT_BASE}/uniprotkb/search?query=${encodeURIComponent(q)}&format=json&size=${size}&fields=accession,id,protein_name,gene_names,organism_name,length,cc_function`, )) as { results?: Array<{ primaryAccession?: string; uniProtkbId?: string; proteinDescription?: { recommendedName?: { fullName?: { value?: string } }; }; genes?: Array<{ geneName?: { value?: string } }>; organism?: { scientificName?: string }; sequence?: { length?: number }; comments?: Array<{ commentType?: string; texts?: Array<{ value?: string }>; }>; }>; }; const hits = (data.results || []).map((it) => ({ accession: it.primaryAccession, entry_name: it.uniProtkbId, recommended_name: it.proteinDescription?.recommendedName?.fullName?.value, gene: (it.genes || []).map((g) => g.geneName?.value).filter(Boolean), organism: it.organism?.scientificName, length: it.sequence?.length, function: (it.comments || []) .filter((c) => c.commentType === "FUNCTION") .flatMap((c) => (c.texts || []).map((t) => t.value)) .filter(Boolean) .join(" ") .slice(0, 800), url: it.primaryAccession ? `https://www.uniprot.org/uniprotkb/${it.primaryAccession}` : undefined, })); return { count: hits.length, hits, source: "uniprot" }; } catch (err) { return fetchErrorPayload(err); } }, }; const queryOpenTargets: ToolDef = { schema: { name: "query_opentargets", description: "在 Open Targets Platform 查询「靶点 ↔ 疾病」证据:给定基因符号,返回与该靶点关联最强的疾病列表(含整体打分、数据来源数)。", parameters: { type: "object", properties: { gene_symbol: { type: "string", description: "HGNC 基因符号,例如 'BRCA1'、'EGFR'。", }, max_results: { type: "integer", minimum: 1, maximum: 25, default: 10, }, }, required: ["gene_symbol"], }, }, async invoke(args) { const a = safe<{ gene_symbol: string; max_results?: number }>(args); const g = a.gene_symbol?.trim(); if (!g) return { error: "gene_symbol is required" }; const size = Math.max(1, Math.min(25, a.max_results || 10)); try { // Atomic step 1: resolve gene symbol → Ensembl id (one GraphQL call). const target = await runAtomicStep( "opentargets_find_target_id", { gene_symbol: g }, () => opentargetsFindTargetId(g), ); if (!target) return { error: `No target found for gene ${g}` }; // Atomic step 2: fetch associations by Ensembl id (one GraphQL call). const assoc = await runAtomicStep( "opentargets_get_target_associations", { ensembl_id: target.id, size }, () => opentargetsGetAssociations(target.id, size), ); const t = assoc.data?.target; if (!t) return { error: "Failed to fetch associations" }; return { target: { ensembl_id: t.id, symbol: t.approvedSymbol, name: t.approvedName, }, count: t.associatedDiseases?.count || 0, diseases: (t.associatedDiseases?.rows || []).map((r) => ({ id: r.disease.id, name: r.disease.name, score: r.score, datasources: r.datasourceScores?.length ?? 0, url: `https://platform.opentargets.org/disease/${r.disease.id}`, })), source: "opentargets", }; } catch (err) { return fetchErrorPayload(err); } }, }; // --- query_opentargets atomic constituents -------------------------------- // These are invoked inline by the composite handler above and instrumented // via `runAtomicStep` so each one accumulates its own evidence row in the // tool capability graph (Task #156). They are intentionally not exported // as standalone ToolDefs in this task — the LLM keeps calling the alias. const OT_FIND_QUERY = `query($q: String!) { search(queryString: $q, entityNames: ["target"]) { hits { id name entity } } }`; const OT_ASSOC_QUERY = `query($id: String!, $size: Int!) { target(ensemblId: $id) { id approvedSymbol approvedName associatedDiseases(page: { index: 0, size: $size }) { count rows { score datasourceScores { id score } disease { id name therapeuticAreas { id name } } } } } }`; async function opentargetsFindTargetId( gene_symbol: string, ): Promise<{ id: string; name: string } | null> { const lookup = (await fetchJson(OPENTARGETS_BASE, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ query: OT_FIND_QUERY, variables: { q: gene_symbol }, }), })) as { data?: { search?: { hits?: Array<{ id: string; name: string }> } }; }; return lookup.data?.search?.hits?.[0] ?? null; } interface OpentargetsAssociationsResponse { data?: { target?: { id: string; approvedSymbol: string; approvedName: string; associatedDiseases?: { count: number; rows: Array<{ score: number; datasourceScores?: Array<{ id: string; score: number }>; disease: { id: string; name: string }; }>; }; }; }; } async function opentargetsGetAssociations( ensembl_id: string, size: number, ): Promise { return (await fetchJson(OPENTARGETS_BASE, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ query: OT_ASSOC_QUERY, variables: { id: ensembl_id, size }, }), })) as OpentargetsAssociationsResponse; } // Bucket-summary tool: surfaces the meta-analytic pooled estimate // (θ̂, CI, method, I²) for a single literature bucket so the chat // agent can cite "the literature pools to X with CI [a,b], I²=Y%" // directly in its answers. Sources from PostgresCausalNetwork via // causalGetBucket; falls back to the legacy JSONL bucket store when // the persisted bucket isn't yet ingested. const summarizeLiteratureBucket: ToolDef = { schema: { name: "summarize_literature_bucket", description: "Get the meta-analytic pooled literature estimate for a single bucket (θ̂, CI, method, I², supporting paper/edge counts). Use this to cite the pooled effect across the literature for a (X, Y, population, measure, time) bucket.", parameters: { type: "object", properties: { bucket_key: { type: "string", description: "Bucket id from PostgresCausalNetwork (e.g. 'B:...') or composite bucket_key from the legacy JSONL store.", }, }, required: ["bucket_key"], }, }, async invoke(args) { const bucket_key = String(args?.bucket_key || "").trim(); if (!bucket_key) return { error: "bucket_key is required" }; const { causalGetBucket } = await import("../lib/causal-store"); const { getBucketByKey, ensureLoaded } = await import( "../lib/research-data" ); try { await ensureLoaded(); } catch { // ensureLoaded throws when the JSONL fixtures are absent; that's // fine — we still try the persisted store below. } // Atomic step: persisted bucket read. Telemetry attributes to the // seeded atomic node `aggregate_bucket_summary` so per-atom // evidence aligns with the contract graph (per #156: invocation // node names must match seeded atomic node names). const pg = await runAtomicStep( "aggregate_bucket_summary", { bucket_key }, () => causalGetBucket(bucket_key), ); if (pg) { return { bucket_id: pg.bucket_id, bucket_key: pg.bucket_key, supporting_edges: pg.supporting_edges, paper_count: pg.paper_ids.length, status: pg.status, pooled_estimate: pg.pooled_estimate, }; } // Atomic step: legacy JSONL fallback read. Same seeded atomic // node — both paths produce a bucket-summary record; telemetry // distinguishes them via the args/result payload, not the node // name (which must align with the contract graph). const local = await runAtomicStep( "aggregate_bucket_summary", { bucket_key, source: "local_jsonl" }, async () => getBucketByKey(bucket_key), ); if (local) { return { bucket_id: local.bucket_key, bucket_key: { kappa_pi: local.population_bucket, X: local.x_norm, Y: local.y_norm, tau_hat: local.time_index, ctype: local.equation_type, kappa_mu: `${local.measure_type}:${local.measure_scale}`, }, supporting_edges: local.edge_count, paper_count: local.paper_count, status: "review_required", pooled_estimate: null, note: "No pooled estimate available — bucket has not been ingested into the persisted causal network yet.", }; } return { error: "Bucket not found", bucket_key }; }, }; // The keyless biomedical lookups (kept available alongside the engine tools). export const PUBLIC_API_TOOLS: ToolDef[] = [ searchPubmed, lookupUniprot, queryOpenTargets, summarizeLiteratureBucket, ]; // Research-engine tool wrappers (HMAC-signed bridge invocations). Imported // here so the chat ReAct loop sees the full unified catalog through // TOOL_SCHEMAS / findTool without callers having to know about two sources. import { RESEARCH_ENGINE_TOOLS } from "./research-engine-tools"; import { ATOMIC_TOOLS } from "./atomic-tools"; import { performWebSearch } from "./web-search"; // Unified web search — replaces the previous per-provider native search // (e.g. MiniMax `{type:"web_search"}`) with a single function tool every // adapter can call. Two schemas point at the same handler: // - `web_search` — the canonical name used by every provider. // - `plugin_web_search` — alias kept for MiniMax M2, which emits this // name back to us when its prompt mentions web grounding. const webSearchInvoker = async ( args: Record, ): Promise => { // Accept either the canonical `query` or MiniMax's `query_key` / // `query_tag_list` shape so both schemas hit the same backend. const query = typeof args["query"] === "string" ? (args["query"] as string) : typeof args["query_key"] === "string" ? (args["query_key"] as string) : Array.isArray(args["query_tag_list"]) ? (args["query_tag_list"] as unknown[]).map(String).join(" ") : ""; const max = typeof args["max_results"] === "number" ? (args["max_results"] as number) : 6; return performWebSearch({ query, max_results: max }); }; const webSearch: ToolDef = { schema: { name: "web_search", description: "Search the live web and return a list of {title, url, snippet, source} results. Use for current events, recent papers, drug news, regulatory filings, or anything where the in-house biomedical APIs do not have an answer.", parameters: { type: "object", properties: { query: { type: "string", description: "Search query in natural language." }, max_results: { type: "integer", minimum: 1, maximum: 10, description: "Number of results to return; default 6.", }, }, required: ["query"], additionalProperties: false, }, }, invoke: webSearchInvoker, }; const pluginWebSearch: ToolDef = { schema: { name: "plugin_web_search", description: "MiniMax-compatible alias of `web_search`. Same semantics; provided so MiniMax M2 can call its expected tool name without a separate handler.", parameters: { type: "object", properties: { query_key: { type: "string", description: "Search query string." }, query_tag_list: { type: "array", items: { type: "string" }, description: "Optional list of keywords; joined into one query.", }, max_results: { type: "integer", minimum: 1, maximum: 10 }, }, additionalProperties: true, }, }, invoke: webSearchInvoker, }; export const TOOLS: ToolDef[] = [ ...PUBLIC_API_TOOLS, ...ATOMIC_TOOLS, ...RESEARCH_ENGINE_TOOLS, webSearch, pluginWebSearch, ]; export const TOOL_SCHEMAS: LlmTool[] = TOOLS.map((t) => t.schema); export function findTool(name: string): ToolDef | undefined { return TOOLS.find((t) => t.schema.name === name); }