/** * atomic-tools — Node-side LLM-visible atomic tools (Task #156). * * These are the planner-facing decompositions of the previously coarse * `summarize_literature_bucket` and `query_opentargets` composites. The * coarse composites stay registered as `composition_alias` in the graph * (so legacy callers and saved chats keep working) but the planner now * sees the longer atomic chain instead. * * Each handler: * - Conforms to the inputSchema / outputSchema declared on the * corresponding `tool_nodes` row in `tool-graph-seed.ts`. * - Is purely a function of its declared input — no reaching into * a sibling step's hidden state. If a step needs something a prior * step had, the contract on the `feeds` edge carries it. * - Wraps itself in `runAtomicStep` so each call accumulates an * evidence row (consumed by #157 seam-health). * * Python-owned atomic steps (validation / fulltext / task / drug joins) * are NOT exposed here — they remain graph-only labels until the * research-engine bridge gains atomic endpoints (#163). Their alias * composites continue to satisfy callers in the meantime. */ import type { ToolDef } from "./tools"; import { recordInvocation, resolveNodeForDirectReference, } from "../lib/tool-graph"; import { logger } from "../lib/logger"; const OPENTARGETS_BASE = "https://api.platform.opentargets.org/api/v4/graphql"; const FETCH_TIMEOUT_MS = 15_000; // --- shared helpers --------------------------------------------------------- async function runAtomicStep( nodeName: string, argsSummary: Record | null, fn: () => Promise, ): Promise { const start = Date.now(); // Pre-dispatch deprecation pre-check (#159). Mirrors the same hook // in src/mcp/tools.ts runAtomicStep — throws a structured error // referring to `replacedBy` when the node is deprecated. Resolver // failure is best-effort; only the deprecation case blocks dispatch. let resolved: Awaited> = null; try { resolved = await resolveNodeForDirectReference(nodeName); } catch (err) { logger.debug( { err, nodeName }, "atomic-tools runAtomicStep deprecation pre-resolve failed", ); } if (resolved && resolved.deprecated) { logger.warn( { nodeName: resolved.nodeName, replacedBy: resolved.replacedBy, kind: "tool_dispatch_deprecated", }, "tool dispatch: deprecated node invoked by direct reference — refusing dispatch", ); const err = new Error( `tool '${resolved.nodeName}' is deprecated; use '${resolved.replacedBy ?? "(no replacement set)"}' instead`, ) as Error & { code: string; nodeName: string; replacedBy: string | null; }; err.code = "TOOL_DEPRECATED"; err.nodeName = resolved.nodeName; err.replacedBy = resolved.replacedBy; throw err; } try { const result = await fn(); void recordInvocation({ nodeName, ok: true, argsSummary, resultPreview: result, durationMs: Date.now() - start, }).catch(() => undefined); return result; } catch (err) { void recordInvocation({ nodeName, ok: false, argsSummary, resultPreview: null, errorCode: err instanceof Error ? err.name : "error", durationMs: Date.now() - start, }).catch(() => undefined); throw err; } } async function postJson(url: string, body: unknown): Promise { const ctl = new AbortController(); const timer = setTimeout(() => ctl.abort(), FETCH_TIMEOUT_MS); try { const res = await fetch(url, { method: "POST", headers: { "Content-Type": "application/json", Accept: "application/json", "User-Agent": "DoAtlas/1.0 (research workbench)", }, body: JSON.stringify(body), signal: ctl.signal, }); if (!res.ok) throw new Error(`HTTP ${res.status} from ${url}`); return (await res.json()) as T; } finally { clearTimeout(timer); } } // --- summarize_literature_bucket atomic decomposition ----------------------- const selectPapersForBucket: ToolDef = { schema: { name: "select_papers_for_bucket", description: "Atomic step 1/3 of summarize_literature_bucket: resolve a bucket_key to its bucket_id and supporting paper_ids. Single read against the persisted causal network (PostgresCausalNetwork) with a JSONL fallback for buckets that haven't been ingested yet. Use this to enumerate the papers in a bucket before per-paper extraction.", parameters: { type: "object", properties: { bucket_key: { type: "string", description: "Either a bucket_id (e.g. 'B:...') or a composite bucket_key from the legacy JSONL store.", }, }, required: ["bucket_key"], }, }, async invoke(args) { const bucket_key = String(args?.bucket_key || "").trim(); if (!bucket_key) return { error: "bucket_key is required" }; return runAtomicStep("select_papers_for_bucket", { bucket_key }, async () => { const { causalGetBucket } = await import("../lib/causal-store"); const { getBucketByKey, ensureLoaded } = await import( "../lib/research-data" ); try { await ensureLoaded(); } catch { // JSONL fixtures absent — fall through to the persisted store. } const pg = await causalGetBucket(bucket_key); if (pg) { return { bucket_id: pg.bucket_id, bucket_key: pg.bucket_key, paper_ids: pg.paper_ids, supporting_edges: pg.supporting_edges, }; } const local = getBucketByKey(bucket_key); if (local) { return { bucket_id: local.bucket_key, bucket_key: { kappa_pi: local.population_bucket, X: local.x_norm, Y: local.y_norm, tau_hat: local.time_index, ctype: local.equation_type, kappa_mu: `${local.measure_type}:${local.measure_scale}`, }, paper_ids: [], supporting_edges: local.edge_count, }; } return { error: "Bucket not found", bucket_key }; }); }, }; const extractPaperSummary: ToolDef = { schema: { name: "extract_paper_summary", description: "Atomic step 2/3 of summarize_literature_bucket: for each paper_id in the bucket, read its supporting causal-edge rows from the persisted store. Returns the per-paper evidence array. Pure function of (bucket_id, paper_ids).", parameters: { type: "object", properties: { bucket_id: { type: "string" }, paper_ids: { type: "array", items: { type: "string" } }, }, required: ["bucket_id", "paper_ids"], }, }, async invoke(args) { const bucket_id = String(args?.bucket_id || "").trim(); const paper_ids = Array.isArray(args?.paper_ids) ? (args.paper_ids as unknown[]).map(String).slice(0, 200) : []; if (!bucket_id) return { error: "bucket_id is required" }; return runAtomicStep( "extract_paper_summary", { bucket_id, paper_count: paper_ids.length }, async () => { const { causalEdgesForPaper } = await import("../lib/causal-store"); const paper_evidence: Array<{ paper_id: string; edges: unknown[] }> = []; for (const pid of paper_ids) { try { const edges = await causalEdgesForPaper(pid); paper_evidence.push({ paper_id: pid, edges: edges ?? [] }); } catch (err) { logger.debug({ err, pid }, "extract_paper_summary: per-paper read failed"); paper_evidence.push({ paper_id: pid, edges: [] }); } } return { bucket_id, paper_ids, paper_evidence }; }, ); }, }; const aggregateBucketSummary: ToolDef = { schema: { name: "aggregate_bucket_summary", description: "Atomic step 3/3 of summarize_literature_bucket: combine the per-paper evidence into the bucket-level pooled estimate (θ̂, CI, method, I²). Reads the persisted bucket row to retrieve the canonical pooled estimate; falls back to a counts-only summary when the bucket has not been ingested yet.", parameters: { type: "object", properties: { bucket_id: { type: "string" }, paper_ids: { type: "array", items: { type: "string" } }, paper_evidence: { type: "array" }, }, required: ["bucket_id", "paper_ids"], }, }, async invoke(args) { const bucket_id = String(args?.bucket_id || "").trim(); const paper_ids = Array.isArray(args?.paper_ids) ? (args.paper_ids as unknown[]).map(String) : []; if (!bucket_id) return { error: "bucket_id is required" }; return runAtomicStep( "aggregate_bucket_summary", { bucket_id, paper_count: paper_ids.length }, async () => { const { causalGetBucket } = await import("../lib/causal-store"); const pg = await causalGetBucket(bucket_id); if (pg) { return { bucket_id: pg.bucket_id, supporting_edges: pg.supporting_edges, status: pg.status, pooled_estimate: pg.pooled_estimate, }; } return { bucket_id, supporting_edges: paper_ids.length, status: "review_required", pooled_estimate: null, note: "No pooled estimate available — bucket has not been ingested into the persisted causal network yet.", }; }, ); }, }; // --- query_opentargets atomic decomposition -------------------------------- const OT_FIND_QUERY = `query($q: String!) { search(queryString: $q, entityNames: ["target"]) { hits { id name entity } } }`; const OT_ASSOC_QUERY = `query($id: String!, $size: Int!) { target(ensemblId: $id) { id approvedSymbol approvedName associatedDiseases(page: { index: 0, size: $size }) { count rows { score datasourceScores { id score } disease { id name therapeuticAreas { id name } } } } } }`; const opentargetsFindTargetId: ToolDef = { schema: { name: "opentargets_find_target_id", description: "Atomic step 1/2 of query_opentargets: one Open Targets GraphQL `search` request that resolves an HGNC gene symbol to an Ensembl target id. Returns null when no target matches.", parameters: { type: "object", properties: { gene_symbol: { type: "string" }, }, required: ["gene_symbol"], }, }, async invoke(args) { const g = String(args?.gene_symbol || "").trim(); if (!g) return { error: "gene_symbol is required" }; return runAtomicStep("opentargets_find_target_id", { gene_symbol: g }, async () => { try { const lookup = await postJson<{ data?: { search?: { hits?: Array<{ id: string; name: string }> } }; }>(OPENTARGETS_BASE, { query: OT_FIND_QUERY, variables: { q: g }, }); const hit = lookup.data?.search?.hits?.[0]; if (!hit) return { error: `No target found for gene ${g}` }; return { ensembl_id: hit.id, name: hit.name }; } catch (err) { return { error: err instanceof Error ? err.message : String(err) }; } }); }, }; const opentargetsGetAssociations: ToolDef = { schema: { name: "opentargets_get_target_associations", description: "Atomic step 2/2 of query_opentargets: one Open Targets GraphQL request that fetches target↔disease associations for a known Ensembl id. Pair with opentargets_find_target_id when starting from a gene symbol.", parameters: { type: "object", properties: { ensembl_id: { type: "string" }, size: { type: "integer", minimum: 1, maximum: 25, default: 10 }, }, required: ["ensembl_id"], }, }, async invoke(args) { const id = String(args?.ensembl_id || "").trim(); const size = Math.max(1, Math.min(25, Number(args?.size) || 10)); if (!id) return { error: "ensembl_id is required" }; return runAtomicStep( "opentargets_get_target_associations", { ensembl_id: id, size }, async () => { try { const resp = await postJson<{ data?: { target?: { id: string; approvedSymbol: string; approvedName: string; associatedDiseases?: { count: number; rows: Array<{ score: number; datasourceScores?: Array<{ id: string; score: number }>; disease: { id: string; name: string }; }>; }; }; }; }>(OPENTARGETS_BASE, { query: OT_ASSOC_QUERY, variables: { id, size }, }); const t = resp.data?.target; if (!t) return { error: "Failed to fetch associations" }; return { ensembl_id: t.id, approvedSymbol: t.approvedSymbol, approvedName: t.approvedName, diseases: (t.associatedDiseases?.rows || []).map((r) => ({ id: r.disease.id, name: r.disease.name, score: r.score, datasources: r.datasourceScores?.length ?? 0, url: `https://platform.opentargets.org/disease/${r.disease.id}`, })), }; } catch (err) { return { error: err instanceof Error ? err.message : String(err) }; } }, ); }, }; // --- create_research_task atom 1/3 (Node-side input validator) ------------- const TASK_MODES = new Set([ "target_discovery", "target_ranking", "evidence_review", "fulltext_upgrade", "candidate_compare", ]); const validateTaskInputs: ToolDef = { schema: { name: "validate_task_inputs", description: "Atomic step 1/3 of create_research_task: validate a ResearchTaskSpec against the canonical schema before any side effect. Pure function — no I/O. Returns {validated, warnings}; on failure returns {error}. Use this to pre-flight a task spec before persisting it.", parameters: { type: "object", properties: { task_mode: { type: "string" }, disease: { type: "string" }, drug: { type: "string" }, target: { type: "string" }, mechanism: { type: "string" }, evidence_preference: { type: "string", enum: ["any", "human_only", "prefer_human"], }, notes: { type: "string" }, }, required: ["task_mode"], }, }, async invoke(args) { const task_mode = String(args?.task_mode || "").trim(); if (!TASK_MODES.has(task_mode)) { return { error: `task_mode must be one of ${Array.from(TASK_MODES).join(", ")}`, }; } return runAtomicStep("validate_task_inputs", { task_mode }, async () => { const validated: Record = { task_mode }; const warnings: string[] = []; for (const k of [ "disease", "drug", "target", "mechanism", "evidence_preference", "notes", ]) { const v = args?.[k]; if (typeof v === "string" && v.trim()) validated[k] = v.trim(); } if ( (task_mode === "target_discovery" || task_mode === "evidence_review") && !validated.disease ) { warnings.push( "task_mode '" + task_mode + "' typically expects a disease label", ); } if (task_mode === "candidate_compare" && !validated.drug) { warnings.push( "task_mode 'candidate_compare' typically expects a drug field", ); } return { validated, warnings }; }); }, }; export const ATOMIC_TOOLS: ToolDef[] = [ selectPapersForBucket, extractPaperSummary, aggregateBucketSummary, opentargetsFindTargetId, opentargetsGetAssociations, validateTaskInputs, ];