| |
| """mcp/ctgov.py β ClinicalTrials helper (Modernized Julyβ2025βready) |
| |
| Strategy |
| ======== |
| 1. **Primary** β Modernized OASΒ v2 endpoint (betaβut) announced by CT.gov for Julyβ―2025. |
| 2. **Fallbackβ1** β Production v2 (`/api/v2/studies`). |
| 3. **Fallbackβ2** β Legacy v1 (`/api/query/study_fields`). |
| 4. If all failΒ β return empty list so UI never crashes. |
| |
| Features |
| -------- |
| * 12βsecond timeout, 3βstep backβoff (2Β βΒ 4Β βΒ 8β―s) on `403/429/5xx`. |
| * Explicit `Accept: application/json` header (passes WAF). |
| * Realistic ChromeΒ UA. |
| * LRUβcached for 24Β h. |
| * Exports `search_trials` **and** `search_trials_v2` for backβcompat. |
| """ |
| from __future__ import annotations |
|
|
| import asyncio, httpx |
| from functools import lru_cache |
| from typing import List, Dict, Any |
|
|
| |
| _BETA = "https://beta-ut.clinicaltrials.gov/api/v2/studies" |
| _V2 = "https://clinicaltrials.gov/api/v2/studies" |
| _V1 = "https://clinicaltrials.gov/api/query/study_fields" |
|
|
| _HEADERS = { |
| "User-Agent": ( |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
| "AppleWebKit/537.36 (KHTML, like Gecko) " |
| "Chrome/126.0 Safari/537.36" |
| ), |
| "Accept": "application/json", |
| } |
| _TIMEOUT = 12 |
|
|
| |
| _MAX = 100 |
|
|
| |
| async def _get(url: str, params: Dict[str, Any]) -> httpx.Response: |
| async with httpx.AsyncClient(timeout=_TIMEOUT, headers=_HEADERS) as cli: |
| return await cli.get(url, params=params) |
|
|
| async def _try_beta(term: str, limit: int) -> List[Dict]: |
| params = { |
| "query": term, |
| "pageSize": limit, |
| "fields": ( |
| "nctId,briefTitle,phase,status,startDate,conditions,interventions" |
| ), |
| } |
| r = await _get(_BETA, params) |
| if r.status_code == 200: |
| return r.json().get("studies", []) |
| raise httpx.HTTPStatusError("beta failed", request=r.request, response=r) |
|
|
| async def _try_v2(term: str, limit: int) -> List[Dict]: |
| params = { |
| "query": term, |
| "pageSize": limit, |
| "fields": ( |
| "nctId,briefTitle,phase,status,startDate,conditions,interventions" |
| ), |
| } |
| r = await _get(_V2, params) |
| if r.status_code == 200: |
| return r.json().get("studies", []) |
| raise httpx.HTTPStatusError("v2 failed", request=r.request, response=r) |
|
|
| async def _try_v1(term: str, limit: int) -> List[Dict]: |
| params = { |
| "expr": term, |
| "fields": ( |
| "NCTId,BriefTitle,Phase,OverallStatus,StartDate,Condition,InterventionName" |
| ), |
| "max_rnk": limit, |
| "min_rnk": 1, |
| "fmt": "json", |
| } |
| r = await _get(_V1, params) |
| if r.status_code == 200: |
| return ( |
| r.json() |
| .get("StudyFieldsResponse", {}) |
| .get("StudyFields", []) |
| ) |
| raise httpx.HTTPStatusError("v1 failed", request=r.request, response=r) |
|
|
| |
| @lru_cache(maxsize=512) |
| async def search_trials(term: str, *, max_studies: int = 20) -> List[Dict]: |
| """Return β€ *max_studies* trials using BETAβV2βV1 cascade (never raises).""" |
| limit = max(1, min(max_studies, _MAX)) |
|
|
| |
| for fetch in (_try_beta, _try_v2, _try_v1): |
| delay = 0 |
| for attempt in range(3): |
| try: |
| if delay: |
| await asyncio.sleep(delay) |
| return await fetch(term, limit) |
| except httpx.HTTPStatusError as e: |
| |
| if e.response.status_code not in {403, 429, 500, 502, 503, 504}: |
| break |
| delay = 2 if delay == 0 else delay * 2 |
| except (httpx.ReadTimeout, httpx.ConnectTimeout): |
| delay = 2 if delay == 0 else delay * 2 |
| return [] |
|
|
| |
| async def search_trials_v2(term: str, *, max_studies: int = 20): |
| return await search_trials(term, max_studies=max_studies) |
|
|