Iostream-Li's picture
Add files using upload-large-folder tool
6d1fe92 verified
import type { LlmTool } from "../llm/types";
import {
recordInvocation,
captureOutgoingSeamSamples,
resolveNodeForDirectReference,
} from "../lib/tool-graph";
import { logger } from "../lib/logger";
/**
* MCP-style biomedical tool host.
*
* Each tool is keyless — they call public open APIs (NCBI E-utilities,
* UniProt REST, Open Targets GraphQL). Network errors and non-200 responses
* are swallowed into a structured `{ error: string }` payload so the model
* can recover gracefully on the next turn.
*/
export interface ToolDef {
schema: LlmTool;
invoke(args: Record<string, unknown>): Promise<unknown>;
}
const PUBMED_BASE = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils";
const UNIPROT_BASE = "https://rest.uniprot.org";
const OPENTARGETS_BASE = "https://api.platform.opentargets.org/api/v4/graphql";
/**
* HTTP-aware error so callers (and the LLM, via the structured tool
* result) can distinguish a transient upstream issue (rate-limit, 5xx,
* network blip, timeout) from a terminal one (400/404). Retryable errors
* have already been retried internally — the flag tells the model that
* an alternative query or tool may succeed where a verbatim retry will
* not.
*/
class FetchError extends Error {
status: number | null;
retryable: boolean;
attempts: number;
constructor(
message: string,
opts: { status: number | null; retryable: boolean; attempts: number },
) {
super(message);
this.name = "FetchError";
this.status = opts.status;
this.retryable = opts.retryable;
this.attempts = opts.attempts;
}
}
const FETCH_TIMEOUT_MS = 15_000;
const FETCH_MAX_ATTEMPTS = 4;
const FETCH_BACKOFF_MS = [400, 1200, 3000];
function isTransientStatus(status: number): boolean {
return status === 408 || status === 425 || status === 429 || status >= 500;
}
async function fetchJson(url: string, init?: RequestInit): Promise<unknown> {
let lastErr: unknown = null;
let lastStatus: number | null = null;
for (let attempt = 1; attempt <= FETCH_MAX_ATTEMPTS; attempt++) {
const ctl = new AbortController();
const timer = setTimeout(() => ctl.abort(), FETCH_TIMEOUT_MS);
try {
const res = await fetch(url, {
...init,
signal: init?.signal ?? ctl.signal,
headers: {
Accept: "application/json",
"User-Agent": "DoAtlas/1.0 (research workbench)",
...(init?.headers || {}),
},
});
clearTimeout(timer);
if (res.ok) return res.json();
lastStatus = res.status;
if (isTransientStatus(res.status) && attempt < FETCH_MAX_ATTEMPTS) {
const retryAfter = Number(res.headers.get("retry-after"));
const wait = Number.isFinite(retryAfter) && retryAfter > 0
? Math.min(10_000, retryAfter * 1000)
: FETCH_BACKOFF_MS[attempt - 1] ?? 3000;
await new Promise((r) => setTimeout(r, wait));
lastErr = new Error(`HTTP ${res.status} from ${url}`);
continue;
}
throw new FetchError(`HTTP ${res.status} from ${url}`, {
status: res.status,
retryable: isTransientStatus(res.status),
attempts: attempt,
});
} catch (err) {
clearTimeout(timer);
if (err instanceof FetchError) throw err;
lastErr = err;
const msg = err instanceof Error ? err.message : String(err);
const isAbort = msg.includes("aborted") || msg.includes("timeout");
// Network-level failures (DNS, ECONNRESET, fetch failed, abort) are
// assumed transient; back off and try again unless we are out of
// budget.
if (attempt < FETCH_MAX_ATTEMPTS) {
const wait = FETCH_BACKOFF_MS[attempt - 1] ?? 3000;
await new Promise((r) => setTimeout(r, wait));
continue;
}
throw new FetchError(
`network error after ${attempt} attempts: ${msg}`,
{
status: lastStatus,
retryable: true,
attempts: attempt,
},
);
}
}
throw new FetchError(
`exhausted retries: ${lastErr instanceof Error ? lastErr.message : String(lastErr)}`,
{ status: lastStatus, retryable: true, attempts: FETCH_MAX_ATTEMPTS },
);
}
function fetchErrorPayload(err: unknown): {
error: string;
error_code: string;
retryable: boolean;
attempts: number;
status: number | null;
hint: string;
} {
if (err instanceof FetchError) {
const code =
err.status === 429
? "rate_limited"
: err.status && err.status >= 500
? "upstream_unavailable"
: err.status
? `http_${err.status}`
: "network_error";
const hint =
err.status === 429
? "Upstream rate limit hit even after backoff retries. Try a narrower or differently-worded query, or fall back to a different source tool."
: err.status && err.status >= 500
? "Upstream service is failing. Wait briefly or try an alternative tool."
: !err.status
? "Network/timeout error. Retry with a smaller scope, or switch to an alternative tool."
: "Upstream rejected the request. Adjust the query and try again, or use a different tool.";
return {
error: err.message,
error_code: code,
retryable: err.retryable,
attempts: err.attempts,
status: err.status,
hint,
};
}
const msg = err instanceof Error ? err.message : String(err);
return {
error: msg,
error_code: "unknown",
retryable: false,
attempts: 1,
status: null,
hint: "Inspect the error message; consider an alternative tool or query.",
};
}
function safe<T extends Record<string, unknown>>(args: unknown): T {
return (args && typeof args === "object" ? args : {}) as T;
}
/**
* Run one atomic step inside a composite handler and record an evidence
* row against the matching atomic graph node. The result of `fn` is
* returned as-is so the caller's flow is unchanged. Recording is
* best-effort: a missing graph node, an offline DB or any logging error
* never propagates into the tool's normal error path. Used by the
* decomposition introduced in Task #156 — the LLM still calls the
* composite alias, but each internal substep accumulates its own
* success/failure counter so #157 can compute per-seam health.
*/
async function runAtomicStep<T>(
nodeName: string,
argsSummary: Record<string, unknown> | null,
fn: () => Promise<T>,
): Promise<T> {
const start = Date.now();
// Pre-dispatch resolution (#159): if a direct-reference name resolves
// to a node whose status is `deprecated`, throw a structured error
// with the canonical replacement. The agent / caller is expected to
// re-dispatch against `replacedBy`. Resolver failure is best-effort
// (debug log) and never blocks dispatch.
let resolved: Awaited<ReturnType<typeof resolveNodeForDirectReference>> = null;
try {
resolved = await resolveNodeForDirectReference(nodeName);
} catch (err) {
logger.debug(
{ err, nodeName },
"runAtomicStep deprecation pre-resolve failed",
);
}
if (resolved && resolved.deprecated) {
logger.warn(
{
nodeName: resolved.nodeName,
replacedBy: resolved.replacedBy,
kind: "tool_dispatch_deprecated",
},
"tool dispatch: deprecated node invoked by direct reference — refusing dispatch",
);
const err = new Error(
`tool '${resolved.nodeName}' is deprecated; use '${resolved.replacedBy ?? "(no replacement set)"}' instead`,
) as Error & {
code: string;
nodeName: string;
replacedBy: string | null;
};
err.code = "TOOL_DEPRECATED";
err.nodeName = resolved.nodeName;
err.replacedBy = resolved.replacedBy;
throw err;
}
try {
const result = await fn();
void recordInvocation({
nodeName,
ok: true,
argsSummary,
resultPreview: result,
durationMs: Date.now() - start,
}).catch((err) =>
logger.debug({ err, nodeName }, "runAtomicStep ok-record failed"),
);
// Seam-health capture (Task #157): fold the actual result against
// every declared downstream consumer's contract. Best-effort —
// capture failure never propagates into the tool flow.
void captureOutgoingSeamSamples(nodeName, result).catch((err) =>
logger.debug({ err, nodeName }, "runAtomicStep seam-capture failed"),
);
return result;
} catch (err) {
void recordInvocation({
nodeName,
ok: false,
argsSummary,
resultPreview: null,
errorCode: err instanceof Error ? err.name : "error",
durationMs: Date.now() - start,
}).catch((logErr) =>
logger.debug({ err: logErr, nodeName }, "runAtomicStep err-record failed"),
);
throw err;
}
}
const searchPubmed: ToolDef = {
schema: {
name: "search_pubmed",
description:
"在 NCBI PubMed 检索生物医学文献,返回前 N 条命中(含 PMID、标题、作者、期刊、年份、DOI、摘要片段)。仅检索英文文献。",
parameters: {
type: "object",
properties: {
query: {
type: "string",
description:
"PubMed 检索语句,支持 MeSH、字段限定、布尔运算,例如 'HER2 AND trastuzumab AND 2024[dp]'。",
},
max_results: {
type: "integer",
minimum: 1,
maximum: 20,
default: 10,
},
},
required: ["query"],
},
},
async invoke(args) {
const a = safe<{ query: string; max_results?: number }>(args);
const q = a.query?.trim();
if (!q) return { error: "query is required" };
const max = Math.max(1, Math.min(20, a.max_results || 10));
try {
const search = (await fetchJson(
`${PUBMED_BASE}/esearch.fcgi?db=pubmed&retmode=json&retmax=${max}&term=${encodeURIComponent(q)}`,
)) as {
esearchresult?: { idlist?: string[]; count?: string };
};
const ids = search.esearchresult?.idlist || [];
if (ids.length === 0) {
return { count: 0, hits: [], source: "pubmed" };
}
const summary = (await fetchJson(
`${PUBMED_BASE}/esummary.fcgi?db=pubmed&retmode=json&id=${ids.join(",")}`,
)) as { result?: Record<string, unknown> };
const result = summary.result || {};
const hits = ids.map((pmid) => {
const it = (result[pmid] || {}) as {
title?: string;
source?: string;
pubdate?: string;
authors?: Array<{ name: string }>;
articleids?: Array<{ idtype: string; value: string }>;
};
const doi = (it.articleids || []).find((a) => a.idtype === "doi")?.value;
return {
pmid,
title: it.title || "",
journal: it.source || "",
year: (it.pubdate || "").split(" ")[0] || "",
authors: (it.authors || []).slice(0, 6).map((a) => a.name),
doi,
url: `https://pubmed.ncbi.nlm.nih.gov/${pmid}/`,
};
});
return { count: Number(search.esearchresult?.count || ids.length), hits, source: "pubmed" };
} catch (err) {
return fetchErrorPayload(err);
}
},
};
const lookupUniprot: ToolDef = {
schema: {
name: "lookup_uniprot",
description:
"在 UniProt 检索蛋白质条目,返回基础信息(accession、推荐名、基因名、物种、序列长度、功能描述)。",
parameters: {
type: "object",
properties: {
query: {
type: "string",
description:
"UniProt 检索语句,例如 'gene:BRCA1 AND organism_id:9606' 或 'P38398'。",
},
max_results: {
type: "integer",
minimum: 1,
maximum: 10,
default: 5,
},
},
required: ["query"],
},
},
async invoke(args) {
const a = safe<{ query: string; max_results?: number }>(args);
const q = a.query?.trim();
if (!q) return { error: "query is required" };
const size = Math.max(1, Math.min(10, a.max_results || 5));
try {
const data = (await fetchJson(
`${UNIPROT_BASE}/uniprotkb/search?query=${encodeURIComponent(q)}&format=json&size=${size}&fields=accession,id,protein_name,gene_names,organism_name,length,cc_function`,
)) as {
results?: Array<{
primaryAccession?: string;
uniProtkbId?: string;
proteinDescription?: {
recommendedName?: { fullName?: { value?: string } };
};
genes?: Array<{ geneName?: { value?: string } }>;
organism?: { scientificName?: string };
sequence?: { length?: number };
comments?: Array<{
commentType?: string;
texts?: Array<{ value?: string }>;
}>;
}>;
};
const hits = (data.results || []).map((it) => ({
accession: it.primaryAccession,
entry_name: it.uniProtkbId,
recommended_name:
it.proteinDescription?.recommendedName?.fullName?.value,
gene: (it.genes || []).map((g) => g.geneName?.value).filter(Boolean),
organism: it.organism?.scientificName,
length: it.sequence?.length,
function: (it.comments || [])
.filter((c) => c.commentType === "FUNCTION")
.flatMap((c) => (c.texts || []).map((t) => t.value))
.filter(Boolean)
.join(" ")
.slice(0, 800),
url: it.primaryAccession
? `https://www.uniprot.org/uniprotkb/${it.primaryAccession}`
: undefined,
}));
return { count: hits.length, hits, source: "uniprot" };
} catch (err) {
return fetchErrorPayload(err);
}
},
};
const queryOpenTargets: ToolDef = {
schema: {
name: "query_opentargets",
description:
"在 Open Targets Platform 查询「靶点 ↔ 疾病」证据:给定基因符号,返回与该靶点关联最强的疾病列表(含整体打分、数据来源数)。",
parameters: {
type: "object",
properties: {
gene_symbol: {
type: "string",
description: "HGNC 基因符号,例如 'BRCA1'、'EGFR'。",
},
max_results: {
type: "integer",
minimum: 1,
maximum: 25,
default: 10,
},
},
required: ["gene_symbol"],
},
},
async invoke(args) {
const a = safe<{ gene_symbol: string; max_results?: number }>(args);
const g = a.gene_symbol?.trim();
if (!g) return { error: "gene_symbol is required" };
const size = Math.max(1, Math.min(25, a.max_results || 10));
try {
// Atomic step 1: resolve gene symbol → Ensembl id (one GraphQL call).
const target = await runAtomicStep(
"opentargets_find_target_id",
{ gene_symbol: g },
() => opentargetsFindTargetId(g),
);
if (!target) return { error: `No target found for gene ${g}` };
// Atomic step 2: fetch associations by Ensembl id (one GraphQL call).
const assoc = await runAtomicStep(
"opentargets_get_target_associations",
{ ensembl_id: target.id, size },
() => opentargetsGetAssociations(target.id, size),
);
const t = assoc.data?.target;
if (!t) return { error: "Failed to fetch associations" };
return {
target: {
ensembl_id: t.id,
symbol: t.approvedSymbol,
name: t.approvedName,
},
count: t.associatedDiseases?.count || 0,
diseases: (t.associatedDiseases?.rows || []).map((r) => ({
id: r.disease.id,
name: r.disease.name,
score: r.score,
datasources: r.datasourceScores?.length ?? 0,
url: `https://platform.opentargets.org/disease/${r.disease.id}`,
})),
source: "opentargets",
};
} catch (err) {
return fetchErrorPayload(err);
}
},
};
// --- query_opentargets atomic constituents --------------------------------
// These are invoked inline by the composite handler above and instrumented
// via `runAtomicStep` so each one accumulates its own evidence row in the
// tool capability graph (Task #156). They are intentionally not exported
// as standalone ToolDefs in this task — the LLM keeps calling the alias.
const OT_FIND_QUERY = `query($q: String!) { search(queryString: $q, entityNames: ["target"]) { hits { id name entity } } }`;
const OT_ASSOC_QUERY = `query($id: String!, $size: Int!) {
target(ensemblId: $id) {
id
approvedSymbol
approvedName
associatedDiseases(page: { index: 0, size: $size }) {
count
rows {
score
datasourceScores { id score }
disease { id name therapeuticAreas { id name } }
}
}
}
}`;
async function opentargetsFindTargetId(
gene_symbol: string,
): Promise<{ id: string; name: string } | null> {
const lookup = (await fetchJson(OPENTARGETS_BASE, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
query: OT_FIND_QUERY,
variables: { q: gene_symbol },
}),
})) as {
data?: { search?: { hits?: Array<{ id: string; name: string }> } };
};
return lookup.data?.search?.hits?.[0] ?? null;
}
interface OpentargetsAssociationsResponse {
data?: {
target?: {
id: string;
approvedSymbol: string;
approvedName: string;
associatedDiseases?: {
count: number;
rows: Array<{
score: number;
datasourceScores?: Array<{ id: string; score: number }>;
disease: { id: string; name: string };
}>;
};
};
};
}
async function opentargetsGetAssociations(
ensembl_id: string,
size: number,
): Promise<OpentargetsAssociationsResponse> {
return (await fetchJson(OPENTARGETS_BASE, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
query: OT_ASSOC_QUERY,
variables: { id: ensembl_id, size },
}),
})) as OpentargetsAssociationsResponse;
}
// Bucket-summary tool: surfaces the meta-analytic pooled estimate
// (θ̂, CI, method, I²) for a single literature bucket so the chat
// agent can cite "the literature pools to X with CI [a,b], I²=Y%"
// directly in its answers. Sources from PostgresCausalNetwork via
// causalGetBucket; falls back to the legacy JSONL bucket store when
// the persisted bucket isn't yet ingested.
const summarizeLiteratureBucket: ToolDef = {
schema: {
name: "summarize_literature_bucket",
description:
"Get the meta-analytic pooled literature estimate for a single bucket (θ̂, CI, method, I², supporting paper/edge counts). Use this to cite the pooled effect across the literature for a (X, Y, population, measure, time) bucket.",
parameters: {
type: "object",
properties: {
bucket_key: {
type: "string",
description:
"Bucket id from PostgresCausalNetwork (e.g. 'B:...') or composite bucket_key from the legacy JSONL store.",
},
},
required: ["bucket_key"],
},
},
async invoke(args) {
const bucket_key = String(args?.bucket_key || "").trim();
if (!bucket_key) return { error: "bucket_key is required" };
const { causalGetBucket } = await import("../lib/causal-store");
const { getBucketByKey, ensureLoaded } = await import(
"../lib/research-data"
);
try {
await ensureLoaded();
} catch {
// ensureLoaded throws when the JSONL fixtures are absent; that's
// fine — we still try the persisted store below.
}
// Atomic step: persisted bucket read. Telemetry attributes to the
// seeded atomic node `aggregate_bucket_summary` so per-atom
// evidence aligns with the contract graph (per #156: invocation
// node names must match seeded atomic node names).
const pg = await runAtomicStep(
"aggregate_bucket_summary",
{ bucket_key },
() => causalGetBucket(bucket_key),
);
if (pg) {
return {
bucket_id: pg.bucket_id,
bucket_key: pg.bucket_key,
supporting_edges: pg.supporting_edges,
paper_count: pg.paper_ids.length,
status: pg.status,
pooled_estimate: pg.pooled_estimate,
};
}
// Atomic step: legacy JSONL fallback read. Same seeded atomic
// node — both paths produce a bucket-summary record; telemetry
// distinguishes them via the args/result payload, not the node
// name (which must align with the contract graph).
const local = await runAtomicStep(
"aggregate_bucket_summary",
{ bucket_key, source: "local_jsonl" },
async () => getBucketByKey(bucket_key),
);
if (local) {
return {
bucket_id: local.bucket_key,
bucket_key: {
kappa_pi: local.population_bucket,
X: local.x_norm,
Y: local.y_norm,
tau_hat: local.time_index,
ctype: local.equation_type,
kappa_mu: `${local.measure_type}:${local.measure_scale}`,
},
supporting_edges: local.edge_count,
paper_count: local.paper_count,
status: "review_required",
pooled_estimate: null,
note: "No pooled estimate available — bucket has not been ingested into the persisted causal network yet.",
};
}
return { error: "Bucket not found", bucket_key };
},
};
// The keyless biomedical lookups (kept available alongside the engine tools).
export const PUBLIC_API_TOOLS: ToolDef[] = [
searchPubmed,
lookupUniprot,
queryOpenTargets,
summarizeLiteratureBucket,
];
// Research-engine tool wrappers (HMAC-signed bridge invocations). Imported
// here so the chat ReAct loop sees the full unified catalog through
// TOOL_SCHEMAS / findTool without callers having to know about two sources.
import { RESEARCH_ENGINE_TOOLS } from "./research-engine-tools";
import { ATOMIC_TOOLS } from "./atomic-tools";
import { performWebSearch } from "./web-search";
// Unified web search — replaces the previous per-provider native search
// (e.g. MiniMax `{type:"web_search"}`) with a single function tool every
// adapter can call. Two schemas point at the same handler:
// - `web_search` — the canonical name used by every provider.
// - `plugin_web_search` — alias kept for MiniMax M2, which emits this
// name back to us when its prompt mentions web grounding.
const webSearchInvoker = async (
args: Record<string, unknown>,
): Promise<unknown> => {
// Accept either the canonical `query` or MiniMax's `query_key` /
// `query_tag_list` shape so both schemas hit the same backend.
const query =
typeof args["query"] === "string"
? (args["query"] as string)
: typeof args["query_key"] === "string"
? (args["query_key"] as string)
: Array.isArray(args["query_tag_list"])
? (args["query_tag_list"] as unknown[]).map(String).join(" ")
: "";
const max = typeof args["max_results"] === "number"
? (args["max_results"] as number)
: 6;
return performWebSearch({ query, max_results: max });
};
const webSearch: ToolDef = {
schema: {
name: "web_search",
description:
"Search the live web and return a list of {title, url, snippet, source} results. Use for current events, recent papers, drug news, regulatory filings, or anything where the in-house biomedical APIs do not have an answer.",
parameters: {
type: "object",
properties: {
query: { type: "string", description: "Search query in natural language." },
max_results: {
type: "integer",
minimum: 1,
maximum: 10,
description: "Number of results to return; default 6.",
},
},
required: ["query"],
additionalProperties: false,
},
},
invoke: webSearchInvoker,
};
const pluginWebSearch: ToolDef = {
schema: {
name: "plugin_web_search",
description:
"MiniMax-compatible alias of `web_search`. Same semantics; provided so MiniMax M2 can call its expected tool name without a separate handler.",
parameters: {
type: "object",
properties: {
query_key: { type: "string", description: "Search query string." },
query_tag_list: {
type: "array",
items: { type: "string" },
description: "Optional list of keywords; joined into one query.",
},
max_results: { type: "integer", minimum: 1, maximum: 10 },
},
additionalProperties: true,
},
},
invoke: webSearchInvoker,
};
export const TOOLS: ToolDef[] = [
...PUBLIC_API_TOOLS,
...ATOMIC_TOOLS,
...RESEARCH_ENGINE_TOOLS,
webSearch,
pluginWebSearch,
];
export const TOOL_SCHEMAS: LlmTool[] = TOOLS.map((t) => t.schema);
export function findTool(name: string): ToolDef | undefined {
return TOOLS.find((t) => t.schema.name === name);
}