doatlas-2 / artifacts /api-server /src /mcp /atomic-tools.ts
Iostream-Li's picture
Add files using upload-large-folder tool
6d1fe92 verified
/**
* atomic-tools — Node-side LLM-visible atomic tools (Task #156).
*
* These are the planner-facing decompositions of the previously coarse
* `summarize_literature_bucket` and `query_opentargets` composites. The
* coarse composites stay registered as `composition_alias` in the graph
* (so legacy callers and saved chats keep working) but the planner now
* sees the longer atomic chain instead.
*
* Each handler:
* - Conforms to the inputSchema / outputSchema declared on the
* corresponding `tool_nodes` row in `tool-graph-seed.ts`.
* - Is purely a function of its declared input — no reaching into
* a sibling step's hidden state. If a step needs something a prior
* step had, the contract on the `feeds` edge carries it.
* - Wraps itself in `runAtomicStep` so each call accumulates an
* evidence row (consumed by #157 seam-health).
*
* Python-owned atomic steps (validation / fulltext / task / drug joins)
* are NOT exposed here — they remain graph-only labels until the
* research-engine bridge gains atomic endpoints (#163). Their alias
* composites continue to satisfy callers in the meantime.
*/
import type { ToolDef } from "./tools";
import {
recordInvocation,
resolveNodeForDirectReference,
} from "../lib/tool-graph";
import { logger } from "../lib/logger";
const OPENTARGETS_BASE = "https://api.platform.opentargets.org/api/v4/graphql";
const FETCH_TIMEOUT_MS = 15_000;
// --- shared helpers ---------------------------------------------------------
async function runAtomicStep<T>(
nodeName: string,
argsSummary: Record<string, unknown> | null,
fn: () => Promise<T>,
): Promise<T> {
const start = Date.now();
// Pre-dispatch deprecation pre-check (#159). Mirrors the same hook
// in src/mcp/tools.ts runAtomicStep — throws a structured error
// referring to `replacedBy` when the node is deprecated. Resolver
// failure is best-effort; only the deprecation case blocks dispatch.
let resolved: Awaited<ReturnType<typeof resolveNodeForDirectReference>> = null;
try {
resolved = await resolveNodeForDirectReference(nodeName);
} catch (err) {
logger.debug(
{ err, nodeName },
"atomic-tools runAtomicStep deprecation pre-resolve failed",
);
}
if (resolved && resolved.deprecated) {
logger.warn(
{
nodeName: resolved.nodeName,
replacedBy: resolved.replacedBy,
kind: "tool_dispatch_deprecated",
},
"tool dispatch: deprecated node invoked by direct reference — refusing dispatch",
);
const err = new Error(
`tool '${resolved.nodeName}' is deprecated; use '${resolved.replacedBy ?? "(no replacement set)"}' instead`,
) as Error & {
code: string;
nodeName: string;
replacedBy: string | null;
};
err.code = "TOOL_DEPRECATED";
err.nodeName = resolved.nodeName;
err.replacedBy = resolved.replacedBy;
throw err;
}
try {
const result = await fn();
void recordInvocation({
nodeName,
ok: true,
argsSummary,
resultPreview: result,
durationMs: Date.now() - start,
}).catch(() => undefined);
return result;
} catch (err) {
void recordInvocation({
nodeName,
ok: false,
argsSummary,
resultPreview: null,
errorCode: err instanceof Error ? err.name : "error",
durationMs: Date.now() - start,
}).catch(() => undefined);
throw err;
}
}
async function postJson<T>(url: string, body: unknown): Promise<T> {
const ctl = new AbortController();
const timer = setTimeout(() => ctl.abort(), FETCH_TIMEOUT_MS);
try {
const res = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "application/json",
"User-Agent": "DoAtlas/1.0 (research workbench)",
},
body: JSON.stringify(body),
signal: ctl.signal,
});
if (!res.ok) throw new Error(`HTTP ${res.status} from ${url}`);
return (await res.json()) as T;
} finally {
clearTimeout(timer);
}
}
// --- summarize_literature_bucket atomic decomposition -----------------------
const selectPapersForBucket: ToolDef = {
schema: {
name: "select_papers_for_bucket",
description:
"Atomic step 1/3 of summarize_literature_bucket: resolve a bucket_key to its bucket_id and supporting paper_ids. Single read against the persisted causal network (PostgresCausalNetwork) with a JSONL fallback for buckets that haven't been ingested yet. Use this to enumerate the papers in a bucket before per-paper extraction.",
parameters: {
type: "object",
properties: {
bucket_key: {
type: "string",
description:
"Either a bucket_id (e.g. 'B:...') or a composite bucket_key from the legacy JSONL store.",
},
},
required: ["bucket_key"],
},
},
async invoke(args) {
const bucket_key = String(args?.bucket_key || "").trim();
if (!bucket_key) return { error: "bucket_key is required" };
return runAtomicStep("select_papers_for_bucket", { bucket_key }, async () => {
const { causalGetBucket } = await import("../lib/causal-store");
const { getBucketByKey, ensureLoaded } = await import(
"../lib/research-data"
);
try {
await ensureLoaded();
} catch {
// JSONL fixtures absent — fall through to the persisted store.
}
const pg = await causalGetBucket(bucket_key);
if (pg) {
return {
bucket_id: pg.bucket_id,
bucket_key: pg.bucket_key,
paper_ids: pg.paper_ids,
supporting_edges: pg.supporting_edges,
};
}
const local = getBucketByKey(bucket_key);
if (local) {
return {
bucket_id: local.bucket_key,
bucket_key: {
kappa_pi: local.population_bucket,
X: local.x_norm,
Y: local.y_norm,
tau_hat: local.time_index,
ctype: local.equation_type,
kappa_mu: `${local.measure_type}:${local.measure_scale}`,
},
paper_ids: [],
supporting_edges: local.edge_count,
};
}
return { error: "Bucket not found", bucket_key };
});
},
};
const extractPaperSummary: ToolDef = {
schema: {
name: "extract_paper_summary",
description:
"Atomic step 2/3 of summarize_literature_bucket: for each paper_id in the bucket, read its supporting causal-edge rows from the persisted store. Returns the per-paper evidence array. Pure function of (bucket_id, paper_ids).",
parameters: {
type: "object",
properties: {
bucket_id: { type: "string" },
paper_ids: { type: "array", items: { type: "string" } },
},
required: ["bucket_id", "paper_ids"],
},
},
async invoke(args) {
const bucket_id = String(args?.bucket_id || "").trim();
const paper_ids = Array.isArray(args?.paper_ids)
? (args.paper_ids as unknown[]).map(String).slice(0, 200)
: [];
if (!bucket_id) return { error: "bucket_id is required" };
return runAtomicStep(
"extract_paper_summary",
{ bucket_id, paper_count: paper_ids.length },
async () => {
const { causalEdgesForPaper } = await import("../lib/causal-store");
const paper_evidence: Array<{ paper_id: string; edges: unknown[] }> = [];
for (const pid of paper_ids) {
try {
const edges = await causalEdgesForPaper(pid);
paper_evidence.push({ paper_id: pid, edges: edges ?? [] });
} catch (err) {
logger.debug({ err, pid }, "extract_paper_summary: per-paper read failed");
paper_evidence.push({ paper_id: pid, edges: [] });
}
}
return { bucket_id, paper_ids, paper_evidence };
},
);
},
};
const aggregateBucketSummary: ToolDef = {
schema: {
name: "aggregate_bucket_summary",
description:
"Atomic step 3/3 of summarize_literature_bucket: combine the per-paper evidence into the bucket-level pooled estimate (θ̂, CI, method, I²). Reads the persisted bucket row to retrieve the canonical pooled estimate; falls back to a counts-only summary when the bucket has not been ingested yet.",
parameters: {
type: "object",
properties: {
bucket_id: { type: "string" },
paper_ids: { type: "array", items: { type: "string" } },
paper_evidence: { type: "array" },
},
required: ["bucket_id", "paper_ids"],
},
},
async invoke(args) {
const bucket_id = String(args?.bucket_id || "").trim();
const paper_ids = Array.isArray(args?.paper_ids)
? (args.paper_ids as unknown[]).map(String)
: [];
if (!bucket_id) return { error: "bucket_id is required" };
return runAtomicStep(
"aggregate_bucket_summary",
{ bucket_id, paper_count: paper_ids.length },
async () => {
const { causalGetBucket } = await import("../lib/causal-store");
const pg = await causalGetBucket(bucket_id);
if (pg) {
return {
bucket_id: pg.bucket_id,
supporting_edges: pg.supporting_edges,
status: pg.status,
pooled_estimate: pg.pooled_estimate,
};
}
return {
bucket_id,
supporting_edges: paper_ids.length,
status: "review_required",
pooled_estimate: null,
note: "No pooled estimate available — bucket has not been ingested into the persisted causal network yet.",
};
},
);
},
};
// --- query_opentargets atomic decomposition --------------------------------
const OT_FIND_QUERY =
`query($q: String!) { search(queryString: $q, entityNames: ["target"]) { hits { id name entity } } }`;
const OT_ASSOC_QUERY = `query($id: String!, $size: Int!) {
target(ensemblId: $id) {
id
approvedSymbol
approvedName
associatedDiseases(page: { index: 0, size: $size }) {
count
rows {
score
datasourceScores { id score }
disease { id name therapeuticAreas { id name } }
}
}
}
}`;
const opentargetsFindTargetId: ToolDef = {
schema: {
name: "opentargets_find_target_id",
description:
"Atomic step 1/2 of query_opentargets: one Open Targets GraphQL `search` request that resolves an HGNC gene symbol to an Ensembl target id. Returns null when no target matches.",
parameters: {
type: "object",
properties: {
gene_symbol: { type: "string" },
},
required: ["gene_symbol"],
},
},
async invoke(args) {
const g = String(args?.gene_symbol || "").trim();
if (!g) return { error: "gene_symbol is required" };
return runAtomicStep("opentargets_find_target_id", { gene_symbol: g }, async () => {
try {
const lookup = await postJson<{
data?: { search?: { hits?: Array<{ id: string; name: string }> } };
}>(OPENTARGETS_BASE, {
query: OT_FIND_QUERY,
variables: { q: g },
});
const hit = lookup.data?.search?.hits?.[0];
if (!hit) return { error: `No target found for gene ${g}` };
return { ensembl_id: hit.id, name: hit.name };
} catch (err) {
return { error: err instanceof Error ? err.message : String(err) };
}
});
},
};
const opentargetsGetAssociations: ToolDef = {
schema: {
name: "opentargets_get_target_associations",
description:
"Atomic step 2/2 of query_opentargets: one Open Targets GraphQL request that fetches target↔disease associations for a known Ensembl id. Pair with opentargets_find_target_id when starting from a gene symbol.",
parameters: {
type: "object",
properties: {
ensembl_id: { type: "string" },
size: { type: "integer", minimum: 1, maximum: 25, default: 10 },
},
required: ["ensembl_id"],
},
},
async invoke(args) {
const id = String(args?.ensembl_id || "").trim();
const size = Math.max(1, Math.min(25, Number(args?.size) || 10));
if (!id) return { error: "ensembl_id is required" };
return runAtomicStep(
"opentargets_get_target_associations",
{ ensembl_id: id, size },
async () => {
try {
const resp = await postJson<{
data?: {
target?: {
id: string;
approvedSymbol: string;
approvedName: string;
associatedDiseases?: {
count: number;
rows: Array<{
score: number;
datasourceScores?: Array<{ id: string; score: number }>;
disease: { id: string; name: string };
}>;
};
};
};
}>(OPENTARGETS_BASE, {
query: OT_ASSOC_QUERY,
variables: { id, size },
});
const t = resp.data?.target;
if (!t) return { error: "Failed to fetch associations" };
return {
ensembl_id: t.id,
approvedSymbol: t.approvedSymbol,
approvedName: t.approvedName,
diseases: (t.associatedDiseases?.rows || []).map((r) => ({
id: r.disease.id,
name: r.disease.name,
score: r.score,
datasources: r.datasourceScores?.length ?? 0,
url: `https://platform.opentargets.org/disease/${r.disease.id}`,
})),
};
} catch (err) {
return { error: err instanceof Error ? err.message : String(err) };
}
},
);
},
};
// --- create_research_task atom 1/3 (Node-side input validator) -------------
const TASK_MODES = new Set([
"target_discovery",
"target_ranking",
"evidence_review",
"fulltext_upgrade",
"candidate_compare",
]);
const validateTaskInputs: ToolDef = {
schema: {
name: "validate_task_inputs",
description:
"Atomic step 1/3 of create_research_task: validate a ResearchTaskSpec against the canonical schema before any side effect. Pure function — no I/O. Returns {validated, warnings}; on failure returns {error}. Use this to pre-flight a task spec before persisting it.",
parameters: {
type: "object",
properties: {
task_mode: { type: "string" },
disease: { type: "string" },
drug: { type: "string" },
target: { type: "string" },
mechanism: { type: "string" },
evidence_preference: {
type: "string",
enum: ["any", "human_only", "prefer_human"],
},
notes: { type: "string" },
},
required: ["task_mode"],
},
},
async invoke(args) {
const task_mode = String(args?.task_mode || "").trim();
if (!TASK_MODES.has(task_mode)) {
return {
error: `task_mode must be one of ${Array.from(TASK_MODES).join(", ")}`,
};
}
return runAtomicStep("validate_task_inputs", { task_mode }, async () => {
const validated: Record<string, unknown> = { task_mode };
const warnings: string[] = [];
for (const k of [
"disease",
"drug",
"target",
"mechanism",
"evidence_preference",
"notes",
]) {
const v = args?.[k];
if (typeof v === "string" && v.trim()) validated[k] = v.trim();
}
if (
(task_mode === "target_discovery" || task_mode === "evidence_review") &&
!validated.disease
) {
warnings.push(
"task_mode '" + task_mode + "' typically expects a disease label",
);
}
if (task_mode === "candidate_compare" && !validated.drug) {
warnings.push(
"task_mode 'candidate_compare' typically expects a drug field",
);
}
return { validated, warnings };
});
},
};
export const ATOMIC_TOOLS: ToolDef[] = [
selectPapersForBucket,
extractPaperSummary,
aggregateBucketSummary,
opentargetsFindTargetId,
opentargetsGetAssociations,
validateTaskInputs,
];