| |
| |
| """MedGenesis – saved‑query alert helper (async). |
| |
| Monitors previously saved queries (PubMed/arXiv) and notifies when new |
| papers appear. Stores a lightweight JSON DB on disk (**locally inside |
| Space**), keeping the latest *N* paper links per query. |
| |
| Changes vs. legacy version |
| ~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| * **Thread‑safe JSON IO** with `asyncio.Lock` (handles simultaneous |
| Streamlit sessions). |
| * Exponential‑back‑off retry around `orchestrate_search` so one bad |
| request doesn’t kill the whole loop. |
| * Maximum DB size of 100 queries; oldest evicted automatically. |
| * Ensures atomic file write via `Path.write_text` to a temp file. |
| |
| DB location: `<project_root>/data/medgen_alerts.json` (created on first run). |
| """ |
| from __future__ import annotations |
|
|
| import asyncio, json, tempfile |
| from pathlib import Path |
| from typing import Dict, List |
|
|
| from mcp.orchestrator import orchestrate_search |
|
|
| _DB_PATH = (Path(__file__).parent / "data" / "medgen_alerts.json").resolve() |
| _DB_PATH.parent.mkdir(exist_ok=True) |
|
|
| _MAX_IDS = 30 |
| _MAX_QUERIES = 100 |
|
|
| _lock = asyncio.Lock() |
|
|
| |
| |
| |
|
|
| def _read_db() -> Dict[str, List[str]]: |
| if _DB_PATH.exists(): |
| try: |
| return json.loads(_DB_PATH.read_text()) |
| except json.JSONDecodeError: |
| return {} |
| return {} |
|
|
|
|
| def _write_db(data: Dict[str, List[str]]): |
| |
| tmp = tempfile.NamedTemporaryFile("w", delete=False, dir=_DB_PATH.parent) |
| json.dump(data, tmp, indent=2) |
| tmp.flush() |
| Path(tmp.name).replace(_DB_PATH) |
|
|
| |
| |
| |
| async def check_alerts(queries: List[str]) -> Dict[str, List[str]]: |
| """Return `{query: [fresh_links]}` for queries with new papers.""" |
| async with _lock: |
| db = _read_db() |
|
|
| new_links_map: Dict[str, List[str]] = {} |
|
|
| async def _process(q: str): |
| |
| delay = 2 |
| for _ in range(3): |
| try: |
| res = await orchestrate_search(q) |
| break |
| except Exception: |
| await asyncio.sleep(delay) |
| delay *= 2 |
| else: |
| return |
|
|
| links = [p["link"] for p in res["papers"]] |
| prev = set(db.get(q, [])) |
| fresh = [l for l in links if l not in prev] |
| if fresh: |
| new_links_map[q] = fresh |
| db[q] = links[:_MAX_IDS] |
|
|
| await asyncio.gather(*[_process(q) for q in queries]) |
|
|
| |
| if len(db) > _MAX_QUERIES: |
| for key in list(db.keys())[ _MAX_QUERIES: ]: |
| db.pop(key, None) |
|
|
| async with _lock: |
| _write_db(db) |
|
|
| return new_links_map |
|
|
|
|
| |
| |
| |
| if __name__ == "__main__": |
| async def _demo(): |
| demo_map = await check_alerts(["glioblastoma CRISPR"]) |
| print("Fresh:", demo_map) |
| asyncio.run(_demo()) |
|
|