hchevva's picture
Upload 4 files
02835d5 verified
import os
import re
import json
from typing import Any, Dict, List, Optional
from urllib.parse import quote
import httpx
from core.config import settings
CAS_RE = re.compile(r"^\d{2,7}-\d{2}-\d$")
DTXSID_RE = re.compile(r"DTXSID\\d{7,}")
def is_cas(s: str) -> bool:
return bool(CAS_RE.match((s or "").strip()))
def _pick_dtxsid(rows: List[Any]) -> Optional[str]:
for r in rows or []:
if not isinstance(r, dict):
continue
id_ = (
r.get("dtxsid")
or r.get("DTXSID")
or r.get("dtxSid")
or (r.get("identifier") or {}).get("dtxsid")
or (r.get("chemical") or {}).get("dtxsid")
or r.get("DTXSIDv2")
or r.get("dtxsidv2")
)
if id_:
return str(id_).strip()
return None
def _as_rows(data: Any) -> List[Any]:
if isinstance(data, list):
return data
if isinstance(data, dict):
if "identifier" in data or "chemical" in data or "DTXSID" in data or "dtxsid" in data:
return [data]
for key in ("data", "results", "items"):
v = data.get(key)
if isinstance(v, list):
return v
if isinstance(v, dict):
return [v]
return []
def _extract_dtxsid_any(data: Any) -> Optional[str]:
try:
text = json.dumps(data)
except Exception:
text = str(data)
m = DTXSID_RE.search(text)
return m.group(0) if m else None
def _ctx_headers() -> Dict[str, str]:
headers = {"accept": "application/json"}
key = (
settings.ctx_api_key
or os.getenv("CTX_API_KEY")
or os.getenv("COMPTOX_API_KEY")
or os.getenv("CTX_KEY")
)
if isinstance(key, str):
key = key.strip()
if key:
headers["x-api-key"] = key
headers["X-Api-Key"] = key
headers["user-agent"] = "toxrai-hf-demo"
return headers
async def _ctx_get(path: str, http: httpx.AsyncClient, params: Dict[str, Any] | None = None) -> Any:
url = settings.ctx_base_url.rstrip("/") + path
headers = _ctx_headers()
r = await http.get(url, params=params, headers=headers, timeout=25.0, follow_redirects=True)
r.raise_for_status()
# Some endpoints return JSON but with text/plain content-type
try:
return r.json()
except Exception:
return {"raw": r.text}
async def _resolve_from_cas(cas: str, http: httpx.AsyncClient) -> Optional[str]:
clean = (cas or "").strip()
if not clean:
return None
chem_tries = [
(f"/chemical/identifiers/by-cas/{quote(clean)}", None),
(f"/chemical/identifiers/search/by-cas/{quote(clean)}", None),
("/chemical/identifiers", {"cas": clean}),
("/chemical/identifiers", {"casrn": clean}),
("/chemical/identifiers/search", {"casrn": clean}),
("/chemical/search", {"query": clean, "type": "equals"}),
("/chemical/search", {"query": clean, "type": "contains"}),
("/chemical/search", {"searchType": "equals", "query": clean}),
("/chemical/search/equal", {"word": clean}),
("/chemical/search/contains", {"word": clean}),
("/chemical/search", {"matchType": "equal", "word": clean}),
("/chemical/search", {"matchType": "contains", "word": clean}),
("/chemical/search", {"casrn": clean}),
]
for path, params in chem_tries:
try:
data = await _ctx_get(path, http, params=params)
rows = _as_rows(data)
dtxsid = _pick_dtxsid(rows)
if dtxsid:
return dtxsid
dtxsid = _extract_dtxsid_any(data)
if dtxsid:
return dtxsid
except Exception:
pass
haz_tries = [
("/hazard/genetox/summary/search", {"cas": clean}),
(f"/hazard/genetox/summary/search/by-cas/{quote(clean)}", None),
]
for path, params in haz_tries:
try:
data = await _ctx_get(path, http, params=params)
rows = _as_rows(data)
dtxsid = _pick_dtxsid(rows)
if dtxsid:
return dtxsid
dtxsid = _extract_dtxsid_any(data)
if dtxsid:
return dtxsid
except Exception:
pass
return None
async def _resolve_from_name(name: str, http: httpx.AsyncClient) -> Optional[str]:
q = (name or "").strip()
if not q:
return None
chem_tries = [
(f"/chemical/identifiers/by-name/{quote(q)}", None),
(f"/chemical/identifiers/search/by-name/{quote(q)}", None),
("/chemical/identifiers", {"name": q}),
("/chemical/identifiers/search", {"name": q}),
("/chemical/search", {"query": q, "type": "equals"}),
("/chemical/search", {"query": q, "type": "contains"}),
("/chemical/search", {"searchType": "equals", "query": q}),
("/chemical/search/equal", {"word": q}),
("/chemical/search/contains", {"word": q}),
("/chemical/search", {"matchType": "equal", "word": q}),
("/chemical/search", {"matchType": "contains", "word": q}),
("/chemical/search", {"name": q}),
]
for path, params in chem_tries:
try:
data = await _ctx_get(path, http, params=params)
rows = _as_rows(data)
dtxsid = _pick_dtxsid(rows)
if dtxsid:
return dtxsid
dtxsid = _extract_dtxsid_any(data)
if dtxsid:
return dtxsid
except Exception:
pass
haz_tries = [
("/hazard/genetox/summary/search", {"name": q}),
(f"/hazard/genetox/summary/search/by-name/{quote(q)}", None),
]
for path, params in haz_tries:
try:
data = await _ctx_get(path, http, params=params)
rows = _as_rows(data)
dtxsid = _pick_dtxsid(rows)
if dtxsid:
return dtxsid
dtxsid = _extract_dtxsid_any(data)
if dtxsid:
return dtxsid
except Exception:
pass
return None
async def _resolve_dtxsid_via_pubchem(term: str, http: httpx.AsyncClient) -> Optional[str]:
q = (term or "").strip()
if not q:
return None
try:
cid_url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/name/{quote(q)}/cids/JSON"
r1 = await http.get(cid_url, timeout=20)
if r1.status_code >= 400:
return None
j1 = r1.json()
cid_list = (j1.get("IdentifierList") or {}).get("CID") or []
if not cid_list:
return None
cid = cid_list[0]
view_url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug_view/data/compound/{cid}/JSON"
r2 = await http.get(view_url, timeout=25)
if r2.status_code >= 400:
return None
text = r2.text
m = DTXSID_RE.search(text)
return m.group(0) if m else None
except Exception:
return None
async def _resolve_dtxsid_via_dashboard(term: str, http: httpx.AsyncClient) -> Optional[str]:
q = (term or "").strip()
if not q:
return None
targets = [
f"https://comptox.epa.gov/dashboard/dsstoxdb/results?search={quote(q)}",
f"https://comptox.epa.gov/dashboard/dsstoxdb/chemical/details?search={quote(q)}",
]
for url in targets:
try:
r = await http.get(
url,
timeout=20,
headers={
"accept": "text/html,application/xhtml+xml",
"user-agent": "Mozilla/5.0",
},
)
if r.status_code >= 400:
continue
m = DTXSID_RE.search(r.text)
if m:
return m.group(0)
except Exception:
pass
return None
async def resolve_dtxsid(query: str, http: httpx.AsyncClient) -> Optional[str]:
q = (query or "").strip()
if not q:
return None
if is_cas(q):
dtxsid = await _resolve_from_cas(q, http)
if dtxsid:
return dtxsid
else:
dtxsid = await _resolve_from_name(q, http)
if dtxsid:
return dtxsid
# Fallbacks: PubChem -> Dashboard
via_pc = await _resolve_dtxsid_via_pubchem(q, http)
if via_pc:
return via_pc
return await _resolve_dtxsid_via_dashboard(q, http)
def dashboard_search_url(query: str) -> str:
q = quote((query or "").strip())
return f"https://comptox.epa.gov/dashboard/dsstoxdb/results?search={q}"
def dashboard_details_url(dtxsid: str) -> str:
q = quote((dtxsid or "").strip())
return f"https://comptox.epa.gov/dashboard/dsstoxdb/results?search={q}"
async def fetch_ctx_genetox(cas_or_query: str, http: httpx.AsyncClient) -> Dict[str, Any]:
"""Fetch Genetox summary from EPA CompTox (CTX) similar to production Worker.
Returns:
{ ok, dtxsid, summary, dashboard_url }
"""
q = (cas_or_query or "").strip()
if not q:
return {"ok": False, "error": "Empty query"}
if not _ctx_headers().get("x-api-key"):
return {
"ok": False,
"error": "CTX_API_KEY not configured. Please set it in HF Secrets.",
"dashboard_search": dashboard_search_url(q),
}
if q.upper().startswith("DTXSID"):
dtxsid = q.strip()
else:
dtxsid = await resolve_dtxsid(q, http)
if not dtxsid:
# Attempt direct hazard search by CAS or name (some deployments return summary directly)
try:
data = await _ctx_get("/hazard/genetox/summary/search", http, params={"cas": q})
except Exception:
data = None
if not data:
try:
data = await _ctx_get("/hazard/genetox/summary/search", http, params={"name": q})
except Exception:
data = None
if data:
found = _extract_dtxsid_any(data)
return {
"ok": True,
"dtxsid": found,
"summary": data,
"dashboard_url": dashboard_details_url(found or q),
}
# Try one direct identifier call to surface CTX errors (auth, etc.)
try:
if is_cas(q):
await _ctx_get(f"/chemical/identifiers/by-cas/{quote(q)}", http)
else:
await _ctx_get(f"/chemical/identifiers/by-name/{quote(q)}", http)
except httpx.HTTPStatusError as e:
return {
"ok": False,
"error": f"CTX API error {e.response.status_code}: {e.response.text[:200]}",
"dashboard_search": dashboard_search_url(q),
}
except Exception as e:
return {
"ok": False,
"error": f"CTX request failed: {e}",
"dashboard_search": dashboard_search_url(q),
}
return {
"ok": False,
"error": "No DTXSID found for this query.",
"dashboard_search": dashboard_search_url(q),
}
try:
summary = await _ctx_get(
f"/hazard/genetox/summary/search/by-dtxsid/{quote(dtxsid)}", http
)
return {
"ok": True,
"dtxsid": dtxsid,
"summary": summary,
"dashboard_url": dashboard_details_url(dtxsid),
}
except Exception as e:
return {
"ok": False,
"dtxsid": dtxsid,
"error": f"CTX genetox summary fetch failed: {e}",
"dashboard_url": dashboard_details_url(dtxsid),
}