import os import re import json from typing import Any, Dict, List, Optional from urllib.parse import quote import httpx from core.config import settings CAS_RE = re.compile(r"^\d{2,7}-\d{2}-\d$") DTXSID_RE = re.compile(r"DTXSID\\d{7,}") def is_cas(s: str) -> bool: return bool(CAS_RE.match((s or "").strip())) def _pick_dtxsid(rows: List[Any]) -> Optional[str]: for r in rows or []: if not isinstance(r, dict): continue id_ = ( r.get("dtxsid") or r.get("DTXSID") or r.get("dtxSid") or (r.get("identifier") or {}).get("dtxsid") or (r.get("chemical") or {}).get("dtxsid") or r.get("DTXSIDv2") or r.get("dtxsidv2") ) if id_: return str(id_).strip() return None def _as_rows(data: Any) -> List[Any]: if isinstance(data, list): return data if isinstance(data, dict): if "identifier" in data or "chemical" in data or "DTXSID" in data or "dtxsid" in data: return [data] for key in ("data", "results", "items"): v = data.get(key) if isinstance(v, list): return v if isinstance(v, dict): return [v] return [] def _extract_dtxsid_any(data: Any) -> Optional[str]: try: text = json.dumps(data) except Exception: text = str(data) m = DTXSID_RE.search(text) return m.group(0) if m else None def _ctx_headers() -> Dict[str, str]: headers = {"accept": "application/json"} key = ( settings.ctx_api_key or os.getenv("CTX_API_KEY") or os.getenv("COMPTOX_API_KEY") or os.getenv("CTX_KEY") ) if isinstance(key, str): key = key.strip() if key: headers["x-api-key"] = key headers["X-Api-Key"] = key headers["user-agent"] = "toxrai-hf-demo" return headers async def _ctx_get(path: str, http: httpx.AsyncClient, params: Dict[str, Any] | None = None) -> Any: url = settings.ctx_base_url.rstrip("/") + path headers = _ctx_headers() r = await http.get(url, params=params, headers=headers, timeout=25.0, follow_redirects=True) r.raise_for_status() # Some endpoints return JSON but with text/plain content-type try: return r.json() except Exception: return {"raw": r.text} async def _resolve_from_cas(cas: str, http: httpx.AsyncClient) -> Optional[str]: clean = (cas or "").strip() if not clean: return None chem_tries = [ (f"/chemical/identifiers/by-cas/{quote(clean)}", None), (f"/chemical/identifiers/search/by-cas/{quote(clean)}", None), ("/chemical/identifiers", {"cas": clean}), ("/chemical/identifiers", {"casrn": clean}), ("/chemical/identifiers/search", {"casrn": clean}), ("/chemical/search", {"query": clean, "type": "equals"}), ("/chemical/search", {"query": clean, "type": "contains"}), ("/chemical/search", {"searchType": "equals", "query": clean}), ("/chemical/search/equal", {"word": clean}), ("/chemical/search/contains", {"word": clean}), ("/chemical/search", {"matchType": "equal", "word": clean}), ("/chemical/search", {"matchType": "contains", "word": clean}), ("/chemical/search", {"casrn": clean}), ] for path, params in chem_tries: try: data = await _ctx_get(path, http, params=params) rows = _as_rows(data) dtxsid = _pick_dtxsid(rows) if dtxsid: return dtxsid dtxsid = _extract_dtxsid_any(data) if dtxsid: return dtxsid except Exception: pass haz_tries = [ ("/hazard/genetox/summary/search", {"cas": clean}), (f"/hazard/genetox/summary/search/by-cas/{quote(clean)}", None), ] for path, params in haz_tries: try: data = await _ctx_get(path, http, params=params) rows = _as_rows(data) dtxsid = _pick_dtxsid(rows) if dtxsid: return dtxsid dtxsid = _extract_dtxsid_any(data) if dtxsid: return dtxsid except Exception: pass return None async def _resolve_from_name(name: str, http: httpx.AsyncClient) -> Optional[str]: q = (name or "").strip() if not q: return None chem_tries = [ (f"/chemical/identifiers/by-name/{quote(q)}", None), (f"/chemical/identifiers/search/by-name/{quote(q)}", None), ("/chemical/identifiers", {"name": q}), ("/chemical/identifiers/search", {"name": q}), ("/chemical/search", {"query": q, "type": "equals"}), ("/chemical/search", {"query": q, "type": "contains"}), ("/chemical/search", {"searchType": "equals", "query": q}), ("/chemical/search/equal", {"word": q}), ("/chemical/search/contains", {"word": q}), ("/chemical/search", {"matchType": "equal", "word": q}), ("/chemical/search", {"matchType": "contains", "word": q}), ("/chemical/search", {"name": q}), ] for path, params in chem_tries: try: data = await _ctx_get(path, http, params=params) rows = _as_rows(data) dtxsid = _pick_dtxsid(rows) if dtxsid: return dtxsid dtxsid = _extract_dtxsid_any(data) if dtxsid: return dtxsid except Exception: pass haz_tries = [ ("/hazard/genetox/summary/search", {"name": q}), (f"/hazard/genetox/summary/search/by-name/{quote(q)}", None), ] for path, params in haz_tries: try: data = await _ctx_get(path, http, params=params) rows = _as_rows(data) dtxsid = _pick_dtxsid(rows) if dtxsid: return dtxsid dtxsid = _extract_dtxsid_any(data) if dtxsid: return dtxsid except Exception: pass return None async def _resolve_dtxsid_via_pubchem(term: str, http: httpx.AsyncClient) -> Optional[str]: q = (term or "").strip() if not q: return None try: cid_url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/name/{quote(q)}/cids/JSON" r1 = await http.get(cid_url, timeout=20) if r1.status_code >= 400: return None j1 = r1.json() cid_list = (j1.get("IdentifierList") or {}).get("CID") or [] if not cid_list: return None cid = cid_list[0] view_url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug_view/data/compound/{cid}/JSON" r2 = await http.get(view_url, timeout=25) if r2.status_code >= 400: return None text = r2.text m = DTXSID_RE.search(text) return m.group(0) if m else None except Exception: return None async def _resolve_dtxsid_via_dashboard(term: str, http: httpx.AsyncClient) -> Optional[str]: q = (term or "").strip() if not q: return None targets = [ f"https://comptox.epa.gov/dashboard/dsstoxdb/results?search={quote(q)}", f"https://comptox.epa.gov/dashboard/dsstoxdb/chemical/details?search={quote(q)}", ] for url in targets: try: r = await http.get( url, timeout=20, headers={ "accept": "text/html,application/xhtml+xml", "user-agent": "Mozilla/5.0", }, ) if r.status_code >= 400: continue m = DTXSID_RE.search(r.text) if m: return m.group(0) except Exception: pass return None async def resolve_dtxsid(query: str, http: httpx.AsyncClient) -> Optional[str]: q = (query or "").strip() if not q: return None if is_cas(q): dtxsid = await _resolve_from_cas(q, http) if dtxsid: return dtxsid else: dtxsid = await _resolve_from_name(q, http) if dtxsid: return dtxsid # Fallbacks: PubChem -> Dashboard via_pc = await _resolve_dtxsid_via_pubchem(q, http) if via_pc: return via_pc return await _resolve_dtxsid_via_dashboard(q, http) def dashboard_search_url(query: str) -> str: q = quote((query or "").strip()) return f"https://comptox.epa.gov/dashboard/dsstoxdb/results?search={q}" def dashboard_details_url(dtxsid: str) -> str: q = quote((dtxsid or "").strip()) return f"https://comptox.epa.gov/dashboard/dsstoxdb/results?search={q}" async def fetch_ctx_genetox(cas_or_query: str, http: httpx.AsyncClient) -> Dict[str, Any]: """Fetch Genetox summary from EPA CompTox (CTX) similar to production Worker. Returns: { ok, dtxsid, summary, dashboard_url } """ q = (cas_or_query or "").strip() if not q: return {"ok": False, "error": "Empty query"} if not _ctx_headers().get("x-api-key"): return { "ok": False, "error": "CTX_API_KEY not configured. Please set it in HF Secrets.", "dashboard_search": dashboard_search_url(q), } if q.upper().startswith("DTXSID"): dtxsid = q.strip() else: dtxsid = await resolve_dtxsid(q, http) if not dtxsid: # Attempt direct hazard search by CAS or name (some deployments return summary directly) try: data = await _ctx_get("/hazard/genetox/summary/search", http, params={"cas": q}) except Exception: data = None if not data: try: data = await _ctx_get("/hazard/genetox/summary/search", http, params={"name": q}) except Exception: data = None if data: found = _extract_dtxsid_any(data) return { "ok": True, "dtxsid": found, "summary": data, "dashboard_url": dashboard_details_url(found or q), } # Try one direct identifier call to surface CTX errors (auth, etc.) try: if is_cas(q): await _ctx_get(f"/chemical/identifiers/by-cas/{quote(q)}", http) else: await _ctx_get(f"/chemical/identifiers/by-name/{quote(q)}", http) except httpx.HTTPStatusError as e: return { "ok": False, "error": f"CTX API error {e.response.status_code}: {e.response.text[:200]}", "dashboard_search": dashboard_search_url(q), } except Exception as e: return { "ok": False, "error": f"CTX request failed: {e}", "dashboard_search": dashboard_search_url(q), } return { "ok": False, "error": "No DTXSID found for this query.", "dashboard_search": dashboard_search_url(q), } try: summary = await _ctx_get( f"/hazard/genetox/summary/search/by-dtxsid/{quote(dtxsid)}", http ) return { "ok": True, "dtxsid": dtxsid, "summary": summary, "dashboard_url": dashboard_details_url(dtxsid), } except Exception as e: return { "ok": False, "dtxsid": dtxsid, "error": f"CTX genetox summary fetch failed: {e}", "dashboard_url": dashboard_details_url(dtxsid), }