| """PubMed search tool using NCBI E-utilities.""" |
|
|
| import json |
| from typing import Any |
|
|
| import httpx |
| import structlog |
| import xmltodict |
| from tenacity import retry, stop_after_attempt, wait_exponential |
|
|
| from src.tools.query_utils import preprocess_query |
| from src.tools.rate_limiter import get_pubmed_limiter |
| from src.utils.config import settings |
| from src.utils.exceptions import RateLimitError, SearchError |
| from src.utils.models import Citation, Evidence |
|
|
| logger = structlog.get_logger() |
|
|
|
|
| class PubMedTool: |
| """Search tool for PubMed/NCBI.""" |
|
|
| BASE_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils" |
| HTTP_TOO_MANY_REQUESTS = 429 |
|
|
| def __init__(self, api_key: str | None = None) -> None: |
| self.api_key = api_key or settings.ncbi_api_key |
| |
| if self.api_key == "your-ncbi-key-here": |
| self.api_key = None |
|
|
| |
| self._limiter = get_pubmed_limiter(self.api_key) |
|
|
| @property |
| def name(self) -> str: |
| return "pubmed" |
|
|
| async def _rate_limit(self) -> None: |
| """Enforce NCBI rate limiting.""" |
| await self._limiter.acquire() |
|
|
| def _build_params(self, **kwargs: Any) -> dict[str, Any]: |
| """Build request params with optional API key.""" |
| params = {**kwargs, "retmode": "json"} |
| if self.api_key: |
| params["api_key"] = self.api_key |
| return params |
|
|
| @retry( |
| stop=stop_after_attempt(3), |
| wait=wait_exponential(multiplier=1, min=1, max=10), |
| reraise=True, |
| ) |
| async def search(self, query: str, max_results: int = 10) -> list[Evidence]: |
| """ |
| Search PubMed and return evidence. |
| |
| 1. ESearch: Get PMIDs matching query |
| 2. EFetch: Get abstracts for those PMIDs |
| 3. Parse and return Evidence objects |
| """ |
| await self._rate_limit() |
|
|
| |
| clean_query = preprocess_query(query) |
| final_query = clean_query if clean_query else query |
|
|
| async with httpx.AsyncClient(timeout=30.0) as client: |
| |
| search_params = self._build_params( |
| db="pubmed", |
| term=final_query, |
| retmax=max_results, |
| sort="relevance", |
| ) |
|
|
| try: |
| search_resp = await client.get( |
| f"{self.BASE_URL}/esearch.fcgi", |
| params=search_params, |
| ) |
| search_resp.raise_for_status() |
| search_data = search_resp.json() |
| except httpx.HTTPStatusError as e: |
| if e.response.status_code == self.HTTP_TOO_MANY_REQUESTS: |
| raise RateLimitError("PubMed rate limit exceeded") from e |
| raise SearchError(f"PubMed search failed: {e}") from e |
| except json.JSONDecodeError as e: |
| logger.warning( |
| "PubMed returned invalid JSON (possible maintenance page)", |
| error=str(e), |
| response_preview=search_resp.text[:200] if search_resp else "N/A", |
| ) |
| return [] |
|
|
| pmids = search_data.get("esearchresult", {}).get("idlist", []) |
|
|
| if not pmids: |
| return [] |
|
|
| |
| await self._rate_limit() |
| fetch_params = self._build_params( |
| db="pubmed", |
| id=",".join(pmids), |
| rettype="abstract", |
| ) |
| |
| fetch_params["retmode"] = "xml" |
|
|
| fetch_resp = await client.get( |
| f"{self.BASE_URL}/efetch.fcgi", |
| params=fetch_params, |
| ) |
| fetch_resp.raise_for_status() |
|
|
| |
| return self._parse_pubmed_xml(fetch_resp.text) |
|
|
| def _parse_pubmed_xml(self, xml_text: str) -> list[Evidence]: |
| """Parse PubMed XML into Evidence objects.""" |
| try: |
| data = xmltodict.parse(xml_text) |
| except Exception as e: |
| raise SearchError(f"Failed to parse PubMed XML: {e}") from e |
|
|
| articles = data.get("PubmedArticleSet", {}).get("PubmedArticle", []) |
|
|
| |
| if isinstance(articles, dict): |
| articles = [articles] |
|
|
| evidence_list = [] |
| for article in articles: |
| try: |
| evidence = self._article_to_evidence(article) |
| if evidence: |
| evidence_list.append(evidence) |
| except (KeyError, AttributeError, TypeError) as e: |
| logger.debug("Skipping malformed article", error=str(e)) |
| continue |
|
|
| return evidence_list |
|
|
| def _article_to_evidence(self, article: dict[str, Any]) -> Evidence | None: |
| """Convert a single PubMed article to Evidence.""" |
| medline = article.get("MedlineCitation", {}) |
| article_data = medline.get("Article", {}) |
|
|
| |
| pmid = medline.get("PMID", {}) |
| if isinstance(pmid, dict): |
| pmid = pmid.get("#text", "") |
|
|
| |
| title = article_data.get("ArticleTitle", "") |
| if isinstance(title, dict): |
| title = title.get("#text", str(title)) |
|
|
| |
| abstract_data = article_data.get("Abstract", {}).get("AbstractText", "") |
| if isinstance(abstract_data, list): |
| abstract = " ".join( |
| item.get("#text", str(item)) if isinstance(item, dict) else str(item) |
| for item in abstract_data |
| ) |
| elif isinstance(abstract_data, dict): |
| abstract = abstract_data.get("#text", str(abstract_data)) |
| else: |
| abstract = str(abstract_data) |
|
|
| if not abstract or not title: |
| return None |
|
|
| |
| pub_date = article_data.get("Journal", {}).get("JournalIssue", {}).get("PubDate", {}) |
| year = pub_date.get("Year", "Unknown") |
| month = pub_date.get("Month", "01") |
| day = pub_date.get("Day", "01") |
| date_str = f"{year}-{month}-{day}" if year != "Unknown" else "Unknown" |
|
|
| |
| author_list = article_data.get("AuthorList", {}).get("Author", []) |
| if isinstance(author_list, dict): |
| author_list = [author_list] |
| authors = [] |
| for author in author_list[:5]: |
| last = author.get("LastName", "") |
| first = author.get("ForeName", "") |
| if last: |
| authors.append(f"{last} {first}".strip()) |
|
|
| |
| |
| |
| return Evidence( |
| content=abstract[:2000], |
| citation=Citation( |
| source="pubmed", |
| title=title[:500], |
| url=f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/", |
| date=date_str, |
| authors=authors, |
| ), |
| ) |
|
|