Update mcp/orchestrator.py
Browse files- mcp/orchestrator.py +41 -178
mcp/orchestrator.py
CHANGED
|
@@ -1,184 +1,47 @@
|
|
| 1 |
-
#
|
| 2 |
-
|
| 3 |
-
|
| 4 |
-
|
| 5 |
-
|
| 6 |
-
|
| 7 |
-
|
| 8 |
-
|
| 9 |
-
|
| 10 |
-
|
| 11 |
-
|
| 12 |
-
|
| 13 |
-
|
| 14 |
-
|
| 15 |
-
|
| 16 |
-
|
| 17 |
-
|
| 18 |
-
|
| 19 |
-
|
| 20 |
-
|
| 21 |
-
|
| 22 |
-
|
| 23 |
-
|
| 24 |
-
|
| 25 |
-
|
| 26 |
-
|
| 27 |
-
|
| 28 |
-
|
| 29 |
-
_DEFAULT_LLM = "openai"
|
| 30 |
-
|
| 31 |
-
|
| 32 |
-
def _llm_router(engine: str = _DEFAULT_LLM) -> Tuple:
|
| 33 |
-
"""Choose summarization and QA functions based on engine name."""
|
| 34 |
-
if engine.lower() == "gemini":
|
| 35 |
-
return gemini_summarize, gemini_qa, "gemini"
|
| 36 |
-
return ai_summarize, ai_qa, "openai"
|
| 37 |
-
|
| 38 |
-
|
| 39 |
-
async def _safe_gather(*tasks, return_exceptions: bool = False):
|
| 40 |
-
"""
|
| 41 |
-
Await multiple coroutines, log any exceptions, and optionally return them.
|
| 42 |
-
"""
|
| 43 |
-
results = await asyncio.gather(*tasks, return_exceptions=True)
|
| 44 |
-
cleaned: List[Any] = []
|
| 45 |
-
for r in results:
|
| 46 |
-
if isinstance(r, Exception):
|
| 47 |
-
log.warning("Task failed: %s", r)
|
| 48 |
-
if return_exceptions:
|
| 49 |
-
cleaned.append(r)
|
| 50 |
-
else:
|
| 51 |
-
cleaned.append(r)
|
| 52 |
-
return cleaned
|
| 53 |
-
|
| 54 |
-
|
| 55 |
-
async def _gene_enrichment(keys: List[str]) -> Dict[str, Any]:
|
| 56 |
-
"""
|
| 57 |
-
Fan-out gene-related endpoints for each seed keyword:
|
| 58 |
-
- NCBI gene lookup
|
| 59 |
-
- MeSH definition
|
| 60 |
-
- MyGene.info
|
| 61 |
-
- Ensembl cross-refs
|
| 62 |
-
- OpenTargets associations
|
| 63 |
-
Returns a dict of results.
|
| 64 |
-
"""
|
| 65 |
-
jobs: List[asyncio.Task] = []
|
| 66 |
-
for k in keys:
|
| 67 |
-
jobs.extend([
|
| 68 |
-
asyncio.create_task(search_gene(k)),
|
| 69 |
-
asyncio.create_task(get_mesh_definition(k)),
|
| 70 |
-
asyncio.create_task(fetch_gene_info(k)),
|
| 71 |
-
asyncio.create_task(fetch_ensembl(k)),
|
| 72 |
-
asyncio.create_task(fetch_ot(k)),
|
| 73 |
-
])
|
| 74 |
-
results = await _safe_gather(*jobs, return_exceptions=True)
|
| 75 |
-
|
| 76 |
-
def bucket(idx: int) -> List[Any]:
|
| 77 |
-
return [res for i, res in enumerate(results) if i % 5 == idx and not isinstance(res, Exception)]
|
| 78 |
-
|
| 79 |
-
return {
|
| 80 |
-
"ncbi": bucket(0),
|
| 81 |
-
"mesh": bucket(1),
|
| 82 |
-
"mygene": bucket(2),
|
| 83 |
-
"ensembl": bucket(3),
|
| 84 |
-
"ot_assoc": bucket(4),
|
| 85 |
-
}
|
| 86 |
-
|
| 87 |
-
|
| 88 |
-
async def orchestrate_search(query: str, llm: str = _DEFAULT_LLM) -> Dict[str, Any]:
|
| 89 |
-
"""
|
| 90 |
-
Main entry point. Performs:
|
| 91 |
-
1. Literature fetch (PubMed + arXiv)
|
| 92 |
-
2. Keyword seed extraction
|
| 93 |
-
3. Bio-enrichment (UMLS, OpenFDA, gene services)
|
| 94 |
-
4. Clinical trials lookup
|
| 95 |
-
5. cBioPortal variants
|
| 96 |
-
6. AI LLM summary
|
| 97 |
-
Returns a unified dict for the UI.
|
| 98 |
-
"""
|
| 99 |
-
# 1) Literature
|
| 100 |
-
pubmed_t = asyncio.create_task(fetch_pubmed(query, max_results=7))
|
| 101 |
-
arxiv_t = asyncio.create_task(fetch_arxiv(query, max_results=7))
|
| 102 |
-
papers_raw = await _safe_gather(pubmed_t, arxiv_t)
|
| 103 |
-
papers = list(itertools.chain.from_iterable(papers_raw))[:30]
|
| 104 |
-
|
| 105 |
-
# 2) Seed keywords
|
| 106 |
-
seeds = {
|
| 107 |
-
w.strip()
|
| 108 |
-
for p in papers
|
| 109 |
-
for w in p.get("summary", "")[:500].split()
|
| 110 |
-
if w.isalpha()
|
| 111 |
-
}
|
| 112 |
-
seeds = list(seeds)[:10]
|
| 113 |
-
|
| 114 |
-
# 3) Bio-enrichment fan-out
|
| 115 |
-
umls_tasks = [asyncio.create_task(lookup_umls(k)) for k in seeds]
|
| 116 |
-
fda_tasks = [asyncio.create_task(fetch_drug_safety(k)) for k in seeds]
|
| 117 |
-
gene_task = asyncio.create_task(_gene_enrichment(seeds))
|
| 118 |
-
trials_t = asyncio.create_task(fetch_clinical_trials(query, max_studies=10))
|
| 119 |
-
cbio_t = asyncio.create_task(
|
| 120 |
-
fetch_cbio(seeds[0]) if seeds else asyncio.sleep(0, result=[])
|
| 121 |
-
)
|
| 122 |
-
|
| 123 |
-
umls_list, fda_list, gene_data, trials, variants = await asyncio.gather(
|
| 124 |
-
_safe_gather(*umls_tasks, return_exceptions=True),
|
| 125 |
-
_safe_gather(*fda_tasks, return_exceptions=True),
|
| 126 |
-
gene_task,
|
| 127 |
-
trials_t,
|
| 128 |
-
cbio_t,
|
| 129 |
)
|
| 130 |
|
| 131 |
-
#
|
| 132 |
-
|
| 133 |
-
|
| 134 |
-
|
| 135 |
-
|
| 136 |
-
|
| 137 |
-
genes = list(genes)
|
| 138 |
-
|
| 139 |
-
# 5) Deduplicate variants by genomic coordinates
|
| 140 |
-
seen: set = set()
|
| 141 |
-
unique_vars: List[dict] = []
|
| 142 |
-
for v in variants or []:
|
| 143 |
-
key = (
|
| 144 |
-
v.get("chromosome"),
|
| 145 |
-
v.get("startPosition"),
|
| 146 |
-
v.get("referenceAllele"),
|
| 147 |
-
v.get("variantAllele"),
|
| 148 |
-
)
|
| 149 |
-
if key not in seen:
|
| 150 |
-
seen.add(key)
|
| 151 |
-
unique_vars.append(v)
|
| 152 |
-
|
| 153 |
-
# 6) LLM-driven summary
|
| 154 |
-
summarize_fn, _, engine_used = _llm_router(llm)
|
| 155 |
-
combined = " ".join(p.get("summary", "") for p in papers)
|
| 156 |
-
ai_summary = await summarize_fn(combined[:12000])
|
| 157 |
|
| 158 |
return {
|
| 159 |
"papers": papers,
|
| 160 |
-
"umls": [u for u in
|
| 161 |
-
"
|
| 162 |
-
|
| 163 |
-
),
|
| 164 |
-
"
|
| 165 |
-
"
|
| 166 |
-
"
|
| 167 |
-
"mesh_defs": gene_data["mesh"],
|
| 168 |
-
"gene_disease": gene_data["ot_assoc"],
|
| 169 |
-
"ai_summary": ai_summary,
|
| 170 |
-
"llm_used": engine_used,
|
| 171 |
}
|
| 172 |
-
|
| 173 |
-
|
| 174 |
-
async def answer_ai_question(question: str, context: str, llm: str = _DEFAULT_LLM) -> Dict[str, str]:
|
| 175 |
-
"""
|
| 176 |
-
Follow-up QA: uses the chosen LLM’s QA function.
|
| 177 |
-
"""
|
| 178 |
-
_, qa_fn, _ = _llm_router(llm)
|
| 179 |
-
prompt = f"Q: {question}\nContext: {context}\nA:"
|
| 180 |
-
try:
|
| 181 |
-
answer = await qa_fn(prompt)
|
| 182 |
-
except Exception as e:
|
| 183 |
-
answer = f"LLM error: {e}"
|
| 184 |
-
return {"answer": answer}
|
|
|
|
| 1 |
+
# mcp/orchestrator.py
|
| 2 |
+
import asyncio
|
| 3 |
+
from mcp.mygene import mygene
|
| 4 |
+
from mcp.opentargets import ot
|
| 5 |
+
from mcp.cbio import cbio
|
| 6 |
+
# … import pubmed, umls, clinicaltrials, etc …
|
| 7 |
+
from typing import Dict, Any
|
| 8 |
+
|
| 9 |
+
async def orchestrate_search(query: str, *, llm: str="openai") -> Dict[str,Any]:
|
| 10 |
+
# 1) fetch papers + abstracts
|
| 11 |
+
papers_task = asyncio.create_task(fetch_papers(query))
|
| 12 |
+
# 2) pull UMLS concepts
|
| 13 |
+
from mcp.nlp import extract_keywords
|
| 14 |
+
kws = extract_keywords(query)[:5]
|
| 15 |
+
umls_tasks = [lookup_umls(k) for k in kws]
|
| 16 |
+
# 3) fetch gene info + associations
|
| 17 |
+
gene_task = asyncio.create_task(mygene.fetch(query))
|
| 18 |
+
ot_task = asyncio.create_task(ot.fetch(query))
|
| 19 |
+
# 4) fetch variants
|
| 20 |
+
cbio_task = asyncio.create_task(cbio.fetch_variants(query))
|
| 21 |
+
# 5) clinical trials
|
| 22 |
+
trials_task = asyncio.create_task(search_trials(query))
|
| 23 |
+
|
| 24 |
+
# wait all
|
| 25 |
+
papers = await papers_task
|
| 26 |
+
umls = await asyncio.gather(*umls_tasks, return_exceptions=True)
|
| 27 |
+
gene, assoc, vars_, trials = await asyncio.gather(
|
| 28 |
+
gene_task, ot_task, cbio_task, trials_task, return_exceptions=True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 29 |
)
|
| 30 |
|
| 31 |
+
# 6) call your chosen LLM
|
| 32 |
+
from mcp.ai import ai_summarize, gemini_summarize
|
| 33 |
+
if llm=="openai":
|
| 34 |
+
summary = await ai_summarize("\n\n".join(p["summary"] for p in papers))
|
| 35 |
+
else:
|
| 36 |
+
summary = await gemini_summarize("\n\n".join(p["summary"] for p in papers))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 37 |
|
| 38 |
return {
|
| 39 |
"papers": papers,
|
| 40 |
+
"umls": [u for u in umls if not isinstance(u, Exception)],
|
| 41 |
+
"gene": gene if not isinstance(gene, Exception) else {},
|
| 42 |
+
"associations": assoc if not isinstance(assoc, Exception) else [],
|
| 43 |
+
"variants": vars_ if not isinstance(vars_, Exception) else [],
|
| 44 |
+
"trials": trials if not isinstance(trials, Exception) else [],
|
| 45 |
+
"ai_summary": summary,
|
| 46 |
+
"llm_used": llm
|
|
|
|
|
|
|
|
|
|
|
|
|
| 47 |
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|