hchevva commited on
Commit
426090f
·
verified ·
1 Parent(s): e1f40f0

Upload 5 files

Browse files
core/sources/ctx.py CHANGED
@@ -1,114 +1,146 @@
1
- # core/sources/ctx.py
2
- from __future__ import annotations
3
-
4
  import re
5
- from typing import Any, Dict, Optional
6
  from urllib.parse import quote
7
 
8
- DTXSID_RE = re.compile(r"DTXSID\d+", re.IGNORECASE)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9
 
10
 
11
- async def resolve_dtxsid_from_dashboard(search: str, http) -> Optional[str]:
12
- search = (search or "").strip()
13
- if not search:
14
  return None
15
 
16
- url = f"https://comptox.epa.gov/dashboard/dsstoxdb/results?search={quote(search, safe='')}"
17
- r = await http.get(url, follow_redirects=True, timeout=30.0)
18
- if r.status_code != 200:
19
- return None
 
 
20
 
21
- m = DTXSID_RE.search(r.text)
22
- return m.group(0).upper() if m else None
 
 
 
 
 
 
 
 
 
 
 
 
23
 
 
 
 
 
 
 
 
 
 
24
 
25
- async def _try_json(http, url: str) -> Optional[Dict[str, Any]]:
26
- try:
27
- r = await http.get(url, follow_redirects=True, timeout=30.0)
28
- if r.status_code != 200:
29
- return None
30
- return r.json()
31
- except Exception:
32
- return None
33
 
34
 
35
- def _extract_counts(raw: Any) -> Dict[str, Any]:
36
- """
37
- Best-effort mapping to production-like fields:
38
- Reports: + / - / other ; Ames ; MN (Micronucleus)
 
 
 
 
 
 
 
 
 
 
39
  """
40
- out = {"pos": None, "neg": None, "other": None, "ames": None, "mn": None}
41
-
42
- if not isinstance(raw, (dict, list)):
43
- return out
44
-
45
- # Search for common keys in whatever JSON we get back.
46
- def scan(obj):
47
- if isinstance(obj, dict):
48
- for k, v in obj.items():
49
- lk = str(k).lower()
50
- if lk in ("positive", "pos", "positivecount", "reportpositive", "positivereports"):
51
- if isinstance(v, int):
52
- out["pos"] = v
53
- if lk in ("negative", "neg", "negativecount", "reportnegative", "negativereports"):
54
- if isinstance(v, int):
55
- out["neg"] = v
56
- if lk in ("other", "othercount", "unknown", "uncertain"):
57
- if isinstance(v, int):
58
- out["other"] = v
59
- if "ames" in lk and out["ames"] is None:
60
- if isinstance(v, (str, bool, int)):
61
- out["ames"] = v
62
- if ("micronucleus" in lk or lk == "mn") and out["mn"] is None:
63
- if isinstance(v, (str, bool, int)):
64
- out["mn"] = v
65
- scan(v)
66
- elif isinstance(obj, list):
67
- for it in obj:
68
- scan(it)
69
-
70
- scan(raw)
71
- return out
72
-
73
-
74
- async def fetch_ctx_genetox(cas: str, http) -> Dict[str, Any]:
75
- cas = (cas or "").strip()
76
- resolve_url = f"https://comptox.epa.gov/dashboard/dsstoxdb/results?search={quote(cas, safe='')}"
77
- dtxsid = await resolve_dtxsid_from_dashboard(cas, http)
78
 
 
79
  if not dtxsid:
80
  return {
81
- "ok": True,
82
- "dtxsid": None,
83
- "message": "No DTXSID found for this query.",
84
- "resolveUrl": resolve_url,
85
- "raw": None,
86
  }
87
 
88
- # Best-effort: CompTox APIs vary; try a few common patterns.
89
- candidates = [
90
- f"https://comptox.epa.gov/dashboard/api/genetox?dtxsid={quote(dtxsid, safe='')}",
91
- f"https://comptox.epa.gov/dashboard/api/genetox/{quote(dtxsid, safe='')}",
92
- f"https://comptox.epa.gov/dashboard/api/assay/genetox?dtxsid={quote(dtxsid, safe='')}",
93
- f"https://comptox.epa.gov/dashboard/api/chemical/{quote(dtxsid, safe='')}",
94
- ]
95
-
96
- raw = None
97
- used = None
98
- for u in candidates:
99
- j = await _try_json(http, u)
100
- if j is not None:
101
- raw = j
102
- used = u
103
- break
104
-
105
- counts = _extract_counts(raw) if raw is not None else {"pos": None, "neg": None, "other": None, "ames": None, "mn": None}
106
-
107
- return {
108
- "ok": True,
109
- "dtxsid": dtxsid,
110
- "resolveUrl": resolve_url,
111
- "apiUrl": used,
112
- "counts": counts,
113
- "raw": raw,
114
- }
 
1
+ import os
 
 
2
  import re
3
+ from typing import Any, Dict, List, Optional
4
  from urllib.parse import quote
5
 
6
+ import httpx
7
+
8
+ # Matches production worker default
9
+ CTX_BASE_URL = os.getenv("CTX_BASE_URL", "https://comptox.epa.gov/ctx-api")
10
+ CTX_API_KEY = os.getenv("CTX_API_KEY") or os.getenv("COMPTOX_API_KEY") or os.getenv("CTX_KEY")
11
+
12
+ CAS_RE = re.compile(r"^\d{2,7}-\d{2}-\d$")
13
+
14
+
15
+ def is_cas(s: str) -> bool:
16
+ return bool(CAS_RE.match((s or "").strip()))
17
+
18
+
19
+ def _pick_dtxsid(rows: List[Any]) -> Optional[str]:
20
+ for r in rows or []:
21
+ if not isinstance(r, dict):
22
+ continue
23
+ id_ = (
24
+ r.get("dtxsid")
25
+ or r.get("DTXSID")
26
+ or r.get("dtxSid")
27
+ or (r.get("identifier") or {}).get("dtxsid")
28
+ or (r.get("chemical") or {}).get("dtxsid")
29
+ or r.get("DTXSIDv2")
30
+ or r.get("dtxsidv2")
31
+ )
32
+ if id_:
33
+ return str(id_).strip()
34
+ return None
35
+
36
+
37
+ def _as_rows(data: Any) -> List[Any]:
38
+ if isinstance(data, list):
39
+ return data
40
+ if isinstance(data, dict):
41
+ for key in ("data", "results", "items"):
42
+ v = data.get(key)
43
+ if isinstance(v, list):
44
+ return v
45
+ return []
46
+
47
+
48
+ async def _ctx_get(path: str, http: httpx.AsyncClient, params: Dict[str, Any] | None = None) -> Any:
49
+ url = CTX_BASE_URL.rstrip("/") + path
50
+ headers = {"accept": "application/json"}
51
+ if CTX_API_KEY:
52
+ headers["x-api-key"] = CTX_API_KEY
53
+
54
+ r = await http.get(url, params=params, headers=headers, timeout=25.0, follow_redirects=True)
55
+ r.raise_for_status()
56
+ # Some endpoints return JSON but with text/plain content-type
57
+ try:
58
+ return r.json()
59
+ except Exception:
60
+ return {"raw": r.text}
61
 
62
 
63
+ async def resolve_dtxsid(query: str, http: httpx.AsyncClient) -> Optional[str]:
64
+ q = (query or "").strip()
65
+ if not q:
66
  return None
67
 
68
+ chem_tries = [
69
+ ("/chemical/search", {"casrn": q}),
70
+ ("/chemical/search", {"name": q}),
71
+ (f"/chemical/search/by-cas/{quote(q)}", None),
72
+ (f"/chemical/search/by-name/{quote(q)}", None),
73
+ ]
74
 
75
+ for path, params in chem_tries:
76
+ try:
77
+ data = await _ctx_get(path, http, params=params)
78
+ rows = _as_rows(data)
79
+ dtxsid = _pick_dtxsid(rows)
80
+ if dtxsid:
81
+ return dtxsid
82
+ except Exception:
83
+ pass
84
+
85
+ haz_tries = [
86
+ ("/hazard/genetox/summary/search", {"name": q}),
87
+ (f"/hazard/genetox/summary/search/by-name/{quote(q)}", None),
88
+ ]
89
 
90
+ for path, params in haz_tries:
91
+ try:
92
+ data = await _ctx_get(path, http, params=params)
93
+ rows = _as_rows(data)
94
+ dtxsid = _pick_dtxsid(rows)
95
+ if dtxsid:
96
+ return dtxsid
97
+ except Exception:
98
+ pass
99
 
100
+ return None
 
 
 
 
 
 
 
101
 
102
 
103
+ def dashboard_search_url(query: str) -> str:
104
+ q = quote((query or "").strip())
105
+ return f"https://comptox.epa.gov/dashboard/chemical/search?query={q}"
106
+
107
+
108
+ def dashboard_details_url(dtxsid: str) -> str:
109
+ return f"https://comptox.epa.gov/dashboard/chemical/details/{quote((dtxsid or '').strip())}"
110
+
111
+
112
+ async def fetch_ctx_genetox(cas_or_query: str, http: httpx.AsyncClient) -> Dict[str, Any]:
113
+ """Fetch Genetox summary from EPA CompTox (CTX) similar to production Worker.
114
+
115
+ Returns:
116
+ { ok, dtxsid, summary, dashboard_url }
117
  """
118
+ q = (cas_or_query or "").strip()
119
+ if not q:
120
+ return {"ok": False, "error": "Empty query"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
121
 
122
+ dtxsid = await resolve_dtxsid(q, http)
123
  if not dtxsid:
124
  return {
125
+ "ok": False,
126
+ "error": "No DTXSID found for this query.",
127
+ "dashboard_search": dashboard_search_url(q),
 
 
128
  }
129
 
130
+ try:
131
+ summary = await _ctx_get(
132
+ f"/hazard/genetox/summary/search/by-dtxsid/{quote(dtxsid)}", http
133
+ )
134
+ return {
135
+ "ok": True,
136
+ "dtxsid": dtxsid,
137
+ "summary": summary,
138
+ "dashboard_url": dashboard_details_url(dtxsid),
139
+ }
140
+ except Exception as e:
141
+ return {
142
+ "ok": False,
143
+ "dtxsid": dtxsid,
144
+ "error": f"CTX genetox summary fetch failed: {e}",
145
+ "dashboard_url": dashboard_details_url(dtxsid),
146
+ }
 
 
 
 
 
 
 
 
 
 
core/sources/fema.py CHANGED
@@ -1,15 +1,17 @@
1
- # core/sources/fema.py
2
- from __future__ import annotations
3
 
4
- from urllib.parse import quote
5
 
 
 
6
 
7
- def fema_link(query: str):
8
- q = (query or "").strip()
9
- return {
10
- "ok": True,
11
- "url": (
12
- "https://fragrancematerialssafetyresource.elsevier.com/"
13
- f"?field_cas_tid_1={quote(q, safe='')}&field_chemical_synonym_tid="
14
- ),
15
- }
 
 
 
1
+ from urllib.parse import quote_plus
 
2
 
 
3
 
4
+ def fema_link(cas_or_query: str) -> dict:
5
+ """Build the FEMA / Fragrance Materials Safety Resource search URL.
6
 
7
+ Production uses Elsevier's Fragrance Materials Safety Resource with CAS query params.
8
+ """
9
+
10
+ q = (cas_or_query or "").strip()
11
+ if not q:
12
+ return {"ok": False, "error": "Empty query"}
13
+
14
+ # NOTE: domain spelling matters; the older '...materialssafety...' variant often 404s.
15
+ base = "https://fragrancematerialsafetyresource.elsevier.com/"
16
+ url = f"{base}?field_cas_tid_1={quote_plus(q)}&field_chemical_synonym_tid="
17
+ return {"ok": True, "url": url}
core/sources/iarc.py CHANGED
@@ -1,13 +1,8 @@
1
- # core/sources/iarc.py
2
  from __future__ import annotations
3
-
4
  from urllib.parse import quote
 
5
 
6
-
7
- def bookshelf_link(query: str):
8
  q = (query or "").strip()
9
- return {
10
- "ok": True,
11
- "label": f'IARC Monographs — results for "{q}" (Bookshelf)',
12
- "url": f"https://www.ncbi.nlm.nih.gov/books/?term={quote(q, safe='')}",
13
- }
 
 
1
  from __future__ import annotations
 
2
  from urllib.parse import quote
3
+ import httpx
4
 
5
+ def bookshelf_link(query: str) -> dict:
 
6
  q = (query or "").strip()
7
+ url = "https://www.ncbi.nlm.nih.gov/books/?term=" + quote(f'{q} "IARC Monographs"', safe="")
8
+ return {"ok": True, "results": [{"title": f'IARC Monographs — results for “{q}” (Bookshelf)', "url": url}]}
 
 
 
core/sources/ntp.py CHANGED
@@ -1,83 +1,133 @@
1
- # core/sources/ntp.py
2
- from __future__ import annotations
3
-
4
  import re
5
- from typing import Any, Dict, List
6
- from urllib.parse import quote
7
 
8
- TR_RE = re.compile(r"\bTR[-\s]?(\d{2,4})\b", re.IGNORECASE)
9
- URL_RE = re.compile(r'href="([^"]+)"', re.IGNORECASE)
10
 
 
 
11
 
12
- def _abs(url: str) -> str:
13
- if url.startswith("http"):
14
- return url
15
- return "https://ntp.niehs.nih.gov" + (url if url.startswith("/") else "/" + url)
16
 
17
 
18
- async def _get_text(http, url: str) -> str:
19
- r = await http.get(url, follow_redirects=True, timeout=30.0)
20
- return r.text if r.status_code == 200 else ""
 
 
 
 
 
21
 
22
 
23
- def _extract_tr_hits(html: str, cas: str) -> List[Dict[str, Any]]:
24
- """
25
- Parse search result HTML and keep only entries where CAS appears near the TR listing.
26
- This is best-effort but enforces: ONLY TR hits for that CAS.
27
- """
28
- cas = (cas or "").strip()
29
- hits: List[Dict[str, Any]] = []
 
 
30
 
31
- # crude block split: many NTP pages separate results into <article> or <div class="search-result">
32
- blocks = re.split(r"(<article\b|<div[^>]+search[^>]*>)", html, flags=re.IGNORECASE)
33
- if len(blocks) <= 1:
34
- blocks = [html]
35
 
36
- for b in blocks:
37
- if cas and cas not in b:
 
 
 
38
  continue
39
-
40
- urls = URL_RE.findall(b)
41
- tr_nums = TR_RE.findall(b)
42
-
43
- if not tr_nums:
44
  continue
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45
 
46
- # pick a likely report page URL
47
- report_url = None
48
- for u in urls:
49
- au = _abs(u)
50
- if "/publications/reports/tr/" in au or "/reports/tr/" in au:
51
- report_url = au
52
- break
53
 
54
- # title: best-effort from <a> text
55
- title = re.sub(r"\s+", " ", re.sub(r"<[^>]+>", " ", b)).strip()
56
- title = title[:220]
 
 
 
57
 
58
- tr_label = f"TR-{tr_nums[0]}"
59
- hits.append({"tr": tr_label, "title": title, "url": report_url})
60
 
61
- # de-dupe by TR
62
  seen = set()
63
- out = []
64
- for h in hits:
65
- if h["tr"] in seen:
66
- continue
67
- seen.add(h["tr"])
68
- out.append(h)
69
- return out
70
 
71
-
72
- async def search_technical_reports(cas: str, http, limit: int = 8) -> Dict[str, Any]:
73
- cas = (cas or "").strip()
74
- if not cas:
75
- return {"ok": False, "error": "Missing CAS.", "items": []}
76
-
77
- # Use NTP site search; we filter to TR + exact CAS presence in result blocks.
78
- search_url = f"https://ntp.niehs.nih.gov/search?query={quote(cas, safe='')}"
79
- html = await _get_text(http, search_url)
80
-
81
- items = _extract_tr_hits(html, cas)[: max(1, int(limit or 8))]
82
-
83
- return {"ok": True, "query": cas, "searchUrl": search_url, "items": items}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import html
 
 
2
  import re
3
+ from typing import Any, Dict, List, Optional
4
+ from urllib.parse import urljoin
5
 
6
+ import httpx
 
7
 
8
+ REPORTS_URL = "https://ntp.niehs.nih.gov/publications/reports"
9
+ BASE = "https://ntp.niehs.nih.gov"
10
 
11
+ TR_RE = re.compile(r"\bTR-?(\d{3,4})\b", re.IGNORECASE)
12
+ HREF_RE = re.compile(r"href=[\"']([^\"']+)[\"']", re.IGNORECASE)
13
+ TITLE_RE = re.compile(r"<title[^>]*>(.*?)</title>", re.IGNORECASE | re.DOTALL)
 
14
 
15
 
16
+ def _strip_tags(html_text: str) -> str:
17
+ # crude but robust enough for the NTP index page
18
+ text = re.sub(r"<script[\s\S]*?</script>", " ", html_text, flags=re.IGNORECASE)
19
+ text = re.sub(r"<style[\s\S]*?</style>", " ", text, flags=re.IGNORECASE)
20
+ text = re.sub(r"<[^>]+>", " ", text)
21
+ text = html.unescape(text)
22
+ text = re.sub(r"\s+", " ", text).strip()
23
+ return text
24
 
25
 
26
+ def _extract_title(page_html: str) -> str:
27
+ m = TITLE_RE.search(page_html or "")
28
+ if not m:
29
+ return ""
30
+ t = html.unescape(m.group(1))
31
+ t = re.sub(r"\s+", " ", t).strip()
32
+ # common suffix
33
+ t = re.sub(r"\s*\|\s*NTP.*$", "", t, flags=re.IGNORECASE).strip()
34
+ return t
35
 
 
 
 
 
36
 
37
+ def _extract_pdf_url(page_html: str, page_url: str) -> Optional[str]:
38
+ # Look for any href ending in .pdf
39
+ hrefs = HREF_RE.findall(page_html or "")
40
+ for href in hrefs:
41
+ if ".pdf" not in href.lower():
42
  continue
43
+ if href.startswith("#"):
 
 
 
 
44
  continue
45
+ return urljoin(page_url, href)
46
+ return None
47
+
48
+
49
+ async def _fetch_tr_page(num: str, http: httpx.AsyncClient) -> Optional[Dict[str, Any]]:
50
+ page_url = f"{BASE}/publications/reports/tr{num}"
51
+ try:
52
+ r = await http.get(page_url, timeout=25, follow_redirects=True)
53
+ if r.status_code >= 400:
54
+ return None
55
+ page_html = r.text
56
+ except Exception:
57
+ return None
58
+
59
+ title = _extract_title(page_html)
60
+ pdf_url = _extract_pdf_url(page_html, str(r.url))
61
+
62
+ # Try to find a year in the title
63
+ year = None
64
+ if title:
65
+ years = re.findall(r"\b(19\d{2}|20\d{2})\b", title)
66
+ if years:
67
+ year = years[-1]
68
+
69
+ item = {
70
+ "num": num,
71
+ "tr": f"TR-{num}",
72
+ "report_page": str(r.url),
73
+ "title": title,
74
+ "year": year,
75
+ "pdf": pdf_url,
76
+ }
77
+ return item
78
+
79
+
80
+ async def search_technical_reports(query: str, http: httpx.AsyncClient, limit: int = 8) -> Dict[str, Any]:
81
+ """Search NTP Technical Reports and return ONLY TR hits relevant to the query.
82
+
83
+ Implementation mirrors production (Cloudflare worker):
84
+ - download the NTP reports index HTML
85
+ - locate TR-### occurrences
86
+ - keep a TR if the query appears in the surrounding neighborhood text
87
+ - fetch each TR page to obtain report page + PDF
88
+ """
89
 
90
+ q = (query or "").strip()
91
+ if not q:
92
+ return {"ok": False, "error": "Empty query", "items": []}
 
 
 
 
93
 
94
+ try:
95
+ r = await http.get(REPORTS_URL, timeout=25, follow_redirects=True)
96
+ r.raise_for_status()
97
+ index_html = r.text
98
+ except Exception as e:
99
+ return {"ok": False, "error": f"Failed to fetch NTP index: {e}", "items": []}
100
 
101
+ plain = _strip_tags(index_html)
102
+ q_low = q.lower()
103
 
104
+ nums: List[str] = []
105
  seen = set()
 
 
 
 
 
 
 
106
 
107
+ for m in TR_RE.finditer(plain):
108
+ num = m.group(1)
109
+ # neighborhood window similar to production
110
+ start = max(0, m.start() - 250)
111
+ end = min(len(plain), m.end() + 250)
112
+ neighborhood = plain[start:end].lower()
113
+ if q_low not in neighborhood:
114
+ continue
115
+ if num in seen:
116
+ continue
117
+ seen.add(num)
118
+ nums.append(num)
119
+ if len(nums) >= max(1, int(limit)):
120
+ break
121
+
122
+ if not nums:
123
+ return {"ok": True, "query": q, "items": []}
124
+
125
+ items: List[Dict[str, Any]] = []
126
+ for num in nums:
127
+ item = await _fetch_tr_page(num, http)
128
+ if item:
129
+ items.append(item)
130
+ if len(items) >= max(1, int(limit)):
131
+ break
132
+
133
+ return {"ok": True, "query": q, "items": items}
core/sources/pubchem.py CHANGED
@@ -1,13 +1,12 @@
1
- # core/sources/pubchem.py
2
- from __future__ import annotations
3
-
4
  import re
5
  from typing import Any, Dict, List, Optional
6
  from urllib.parse import quote
7
 
8
- PUBCHEM = "https://pubchem.ncbi.nlm.nih.gov"
9
- PUG = f"{PUBCHEM}/rest/pug"
10
- PUG_VIEW = f"{PUBCHEM}/rest/pug_view"
 
11
 
12
  CAS_RE = re.compile(r"^\d{2,7}-\d{2}-\d$")
13
 
@@ -16,199 +15,195 @@ def is_cas(s: str) -> bool:
16
  return bool(CAS_RE.match((s or "").strip()))
17
 
18
 
19
- async def _get_json(http, url: str) -> Dict[str, Any]:
20
- try:
21
- r = await http.get(url, follow_redirects=True, timeout=30.0)
22
- if r.status_code != 200:
23
- return {"ok": False, "status": r.status_code, "url": url, "error": r.text[:500]}
24
- return {"ok": True, "url": url, "data": r.json()}
25
- except Exception as e:
26
- return {"ok": False, "url": url, "error": str(e)}
27
 
28
 
29
- async def _get_text(http, url: str) -> Dict[str, Any]:
30
- try:
31
- r = await http.get(url, follow_redirects=True, timeout=30.0)
32
- if r.status_code != 200:
33
- return {"ok": False, "status": r.status_code, "url": url, "error": r.text[:500]}
34
- return {"ok": True, "url": url, "text": r.text}
35
- except Exception as e:
36
- return {"ok": False, "url": url, "error": str(e)}
37
-
38
-
39
- async def cid_from_cas(cas: str, http) -> Optional[int]:
40
- cas = (cas or "").strip()
41
- url = f"{PUG}/compound/xref/RN/{quote(cas, safe='')}/cids/JSON"
42
- j = await _get_json(http, url)
43
- if not j.get("ok"):
44
- return None
45
- data = j["data"]
46
- cids = (data.get("IdentifierList") or {}).get("CID") or []
47
- return int(cids[0]) if cids else None
48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
 
50
- async def cid_from_name(name: str, http) -> Optional[int]:
51
- name = (name or "").strip()
52
- url = f"{PUG}/compound/name/{quote(name, safe='')}/cids/JSON"
53
- j = await _get_json(http, url)
54
- if not j.get("ok"):
55
- return None
56
- data = j["data"]
57
- cids = (data.get("IdentifierList") or {}).get("CID") or []
58
- return int(cids[0]) if cids else None
 
 
59
 
 
 
 
60
 
61
- async def fetch_properties(cid: int, http) -> Dict[str, Any]:
62
- props = "MolecularFormula,MolecularWeight,CanonicalSMILES,IUPACName"
63
- url = f"{PUG}/compound/cid/{cid}/property/{props}/JSON"
64
- j = await _get_json(http, url)
65
- if not j.get("ok"):
66
- return {}
67
- arr = ((j["data"].get("PropertyTable") or {}).get("Properties") or [])
68
- return arr[0] if arr else {}
 
 
 
 
69
 
70
 
71
- async def fetch_synonyms(cid: int, http) -> List[str]:
72
- url = f"{PUG}/compound/cid/{cid}/synonyms/JSON"
73
- j = await _get_json(http, url)
74
- if not j.get("ok"):
75
- return []
76
- info = (((j["data"].get("InformationList") or {}).get("Information")) or [])
77
- syns = (info[0].get("Synonym") if info else []) or []
78
- return [str(s) for s in syns]
79
 
80
 
81
- def _pick_resolved_cas(query: str, synonyms: List[str]) -> Optional[str]:
82
- q = (query or "").strip()
83
- if is_cas(q):
84
- return q
85
- for s in synonyms:
86
- if is_cas(s):
87
- return s
88
  return None
89
 
90
 
91
- def _extract_strings_from_value(value_obj: Any) -> List[str]:
92
- """PUG_VIEW uses Value.StringWithMarkup[].String often."""
93
- out: List[str] = []
94
- if isinstance(value_obj, dict):
95
- swm = value_obj.get("StringWithMarkup")
96
- if isinstance(swm, list):
97
- for item in swm:
98
- if isinstance(item, dict) and item.get("String"):
99
- out.append(str(item["String"]).strip())
100
- # Sometimes direct String
101
- if value_obj.get("String"):
102
- out.append(str(value_obj["String"]).strip())
103
- elif isinstance(value_obj, str):
104
- out.append(value_obj.strip())
105
- return [x for x in out if x]
106
-
107
-
108
- def _walk_sections(section: Any) -> List[dict]:
109
- """Flatten Record.Section tree."""
110
- acc: List[dict] = []
111
- if isinstance(section, dict):
112
- acc.append(section)
113
- kids = section.get("Section")
114
- if isinstance(kids, list):
115
- for k in kids:
116
- acc.extend(_walk_sections(k))
117
- elif isinstance(section, list):
118
- for s in section:
119
- acc.extend(_walk_sections(s))
120
- return acc
121
-
122
-
123
- def _section_heading(sec: dict) -> str:
124
- return str(sec.get("TOCHeading") or sec.get("Heading") or "")
125
-
126
-
127
- def _collect_hazard_paragraphs(pug_view_json: Dict[str, Any]) -> List[str]:
128
- record = pug_view_json.get("Record") or {}
129
- sections = _walk_sections(record.get("Section") or [])
130
-
131
- # Production-like: show all hazard paragraphs under Safety & Hazards / GHS / ECHA
132
- wanted = []
133
- keys = (
134
- "Safety and Hazards",
135
- "Hazards Identification",
136
- "GHS Classification",
137
- "Hazard Statements",
138
- "Precautionary Statement",
139
- "ECHA",
140
- "C&L",
141
- "Classification",
142
- "Label",
143
- "Hazard",
144
  )
 
 
 
 
 
145
 
146
- for sec in sections:
147
- h = _section_heading(sec)
148
- if not h:
149
- continue
150
- if any(k.lower() in h.lower() for k in keys):
151
- info_list = sec.get("Information") or []
152
- if isinstance(info_list, list):
153
- for info in info_list:
154
- if not isinstance(info, dict):
155
- continue
156
- v = info.get("Value")
157
- for s in _extract_strings_from_value(v):
158
- wanted.append(s)
159
-
160
- # De-dup while preserving order
161
- seen = set()
162
- out = []
163
- for p in wanted:
164
- p2 = " ".join(p.split())
165
- if not p2:
166
- continue
167
- if p2 in seen:
168
- continue
169
- seen.add(p2)
170
- out.append(p2)
171
- return out
172
 
 
 
 
 
 
173
 
174
- async def fetch_hazards(cid: int, http) -> Dict[str, Any]:
175
- url = f"{PUG_VIEW}/data/compound/{cid}/JSON"
176
- j = await _get_json(http, url)
177
- if not j.get("ok"):
178
- return {"ok": False, "error": j.get("error"), "url": url, "hazard_paragraphs": []}
179
- data = j["data"]
180
- paragraphs = _collect_hazard_paragraphs(data)
181
- return {"ok": True, "url": url, "hazard_paragraphs": paragraphs, "raw": data}
182
 
 
 
183
 
184
- async def pubchem_by_query(q: str, http) -> Dict[str, Any]:
185
- q = (q or "").strip()
 
186
  if not q:
187
- return {"ok": False, "error": "Empty query."}
188
 
189
- cid = await (cid_from_cas(q, http) if is_cas(q) else cid_from_name(q, http))
190
  if not cid:
191
- return {
192
- "ok": False,
193
- "error": "No PubChem CID found.",
194
- "query": q,
195
- "resolve_url": f"{PUBCHEM}/#query={quote(q)}",
196
- }
197
 
198
- props = await fetch_properties(cid, http)
199
- synonyms = await fetch_synonyms(cid, http)
200
- resolved_cas = _pick_resolved_cas(q, synonyms)
 
 
 
 
 
 
 
201
 
202
- hazards = await fetch_hazards(cid, http)
 
 
 
 
 
 
 
 
203
 
204
  return {
205
  "ok": True,
206
  "query": q,
207
  "cid": cid,
208
  "resolved_cas": resolved_cas,
209
- "url": f"{PUBCHEM}/compound/{cid}",
210
- "structure_png": f"{PUG}/compound/cid/{cid}/PNG?record_type=2d",
211
- "properties": props,
212
- "synonyms": synonyms,
213
- "hazards": hazards,
214
- }
 
1
+ import html
 
 
2
  import re
3
  from typing import Any, Dict, List, Optional
4
  from urllib.parse import quote
5
 
6
+ import httpx
7
+
8
+ PUBCHEM_REST = "https://pubchem.ncbi.nlm.nih.gov/rest/pug"
9
+ PUBCHEM_VIEW = "https://pubchem.ncbi.nlm.nih.gov/rest/pug_view"
10
 
11
  CAS_RE = re.compile(r"^\d{2,7}-\d{2}-\d$")
12
 
 
15
  return bool(CAS_RE.match((s or "").strip()))
16
 
17
 
18
+ def _first_cas_in_text(text: str) -> Optional[str]:
19
+ if not text:
20
+ return None
21
+ m = re.search(r"\b\d{2,7}-\d{2}-\d\b", text)
22
+ return m.group(0) if m else None
 
 
 
23
 
24
 
25
+ def _fmt_value(value: Any) -> str:
26
+ """Port of production `fmtInfoValue()` for PubChem PUG-View values."""
27
+ if value is None:
28
+ return ""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
 
30
+ # PUG-View Value is usually a dict
31
+ if isinstance(value, dict):
32
+ if "StringWithMarkup" in value and isinstance(value["StringWithMarkup"], list):
33
+ parts: List[str] = []
34
+ for item in value["StringWithMarkup"]:
35
+ if isinstance(item, dict) and item.get("String"):
36
+ parts.append(str(item["String"]))
37
+ elif isinstance(item, str):
38
+ parts.append(item)
39
+ return html.unescape("".join(parts)).strip()
40
+ if "String" in value:
41
+ return html.unescape(str(value["String"])).strip()
42
+ if "Number" in value:
43
+ return str(value["Number"]) # already numeric
44
+ if "Boolean" in value:
45
+ return str(value["Boolean"])
46
+ if "Date" in value:
47
+ return str(value["Date"])
48
+
49
+ # Fallback
50
+ return html.unescape(str(value)).strip()
51
+
52
+
53
+ def _scan_hazards(section: Dict[str, Any], out: List[Dict[str, str]]):
54
+ """Recursively scan PubChem PUG-View sections for hazard-related info.
55
+
56
+ Mirrors production `scanHazards()` semantics.
57
+ """
58
+ info_list = section.get("Information") or []
59
+ for info in info_list:
60
+ name = (info.get("Name") or "").strip()
61
+ low = name.lower()
62
+ if (
63
+ "ghs hazard statements" in low
64
+ or "echa c&l notifications summary" in low
65
+ or "carcinogenicity" in low
66
+ or "mutagenicity" in low
67
+ or "genotoxicity" in low
68
+ or "toxic" in low
69
+ or "hazard" in low
70
+ ):
71
+ text = _fmt_value(info.get("Value"))
72
+ if text:
73
+ out.append({"name": name or "Hazard information", "text": text})
74
+
75
+ for sub in section.get("Section") or []:
76
+ _scan_hazards(sub, out)
77
+
78
+
79
+ def _extract_synonyms(record: Dict[str, Any]) -> List[str]:
80
+ """Best-effort extraction of synonyms list from PubChem PUG-View record."""
81
+ if not record:
82
+ return []
83
 
84
+ def walk(sec: Dict[str, Any], acc: List[str]):
85
+ # Synonyms often appear under Names and Identifiers
86
+ if (sec.get("TOCHeading") or "").lower() == "synonyms":
87
+ for info in sec.get("Information") or []:
88
+ val = info.get("Value")
89
+ if isinstance(val, dict) and isinstance(val.get("StringWithMarkup"), list):
90
+ for item in val["StringWithMarkup"]:
91
+ if isinstance(item, dict) and item.get("String"):
92
+ acc.append(str(item["String"]))
93
+ for sub in sec.get("Section") or []:
94
+ walk(sub, acc)
95
 
96
+ out: List[str] = []
97
+ for top in record.get("Section") or []:
98
+ walk(top, out)
99
 
100
+ # De-dupe preserve order
101
+ seen = set()
102
+ uniq: List[str] = []
103
+ for s in out:
104
+ s = s.strip()
105
+ if not s:
106
+ continue
107
+ if s.lower() in seen:
108
+ continue
109
+ seen.add(s.lower())
110
+ uniq.append(s)
111
+ return uniq
112
 
113
 
114
+ def _structure_png_url(cid: int) -> str:
115
+ return f"{PUBCHEM_REST}/compound/cid/{cid}/PNG?record_type=2d"
 
 
 
 
 
 
116
 
117
 
118
+ def _compound_url(cid: int) -> str:
119
+ return f"https://pubchem.ncbi.nlm.nih.gov/compound/{cid}"
120
+
121
+
122
+ def _safe_first(items: Any) -> Optional[Any]:
123
+ if isinstance(items, list) and items:
124
+ return items[0]
125
  return None
126
 
127
 
128
+ async def _cid_from_query(q: str, http: httpx.AsyncClient) -> Optional[int]:
129
+ url = f"{PUBCHEM_REST}/compound/name/{quote(q)}/cids/JSON"
130
+ try:
131
+ r = await http.get(url, timeout=20)
132
+ r.raise_for_status()
133
+ js = r.json()
134
+ cid = _safe_first(js.get("IdentifierList", {}).get("CID"))
135
+ return int(cid) if cid is not None else None
136
+ except Exception:
137
+ return None
138
+
139
+
140
+ async def _props_from_cid(cid: int, http: httpx.AsyncClient) -> Dict[str, Any]:
141
+ # Request all props production needs.
142
+ url = (
143
+ f"{PUBCHEM_REST}/compound/cid/{cid}/property/"
144
+ "MolecularFormula,MolecularWeight,CanonicalSMILES,IUPACName/JSON"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
145
  )
146
+ r = await http.get(url, timeout=20)
147
+ r.raise_for_status()
148
+ js = r.json()
149
+ props = _safe_first(js.get("PropertyTable", {}).get("Properties"))
150
+ return props or {}
151
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
152
 
153
+ async def _view_record(cid: int, http: httpx.AsyncClient) -> Dict[str, Any]:
154
+ url = f"{PUBCHEM_VIEW}/data/compound/{cid}/JSON"
155
+ r = await http.get(url, timeout=25)
156
+ r.raise_for_status()
157
+ return r.json()
158
 
 
 
 
 
 
 
 
 
159
 
160
+ async def pubchem_by_query(query: str, http: httpx.AsyncClient) -> Dict[str, Any]:
161
+ """Query PubChem by CAS or name.
162
 
163
+ Returns a dict compatible with app.py renderers.
164
+ """
165
+ q = (query or "").strip()
166
  if not q:
167
+ return {"ok": False, "error": "Empty query"}
168
 
169
+ cid = await _cid_from_query(q, http)
170
  if not cid:
171
+ return {"ok": False, "error": "No PubChem CID found"}
172
+
173
+ props = await _props_from_cid(cid, http)
174
+
175
+ record_json = await _view_record(cid, http)
176
+ record = record_json.get("Record") or {}
177
 
178
+ synonyms = _extract_synonyms(record)
179
+ resolved_cas = None
180
+ if is_cas(q):
181
+ resolved_cas = q
182
+ else:
183
+ resolved_cas = _first_cas_in_text("\n".join(synonyms))
184
+
185
+ hazards: List[Dict[str, str]] = []
186
+ for top in record.get("Section") or []:
187
+ _scan_hazards(top, hazards)
188
 
189
+ # De-dupe hazards by (name, text)
190
+ seen = set()
191
+ uniq_haz: List[Dict[str, str]] = []
192
+ for h in hazards:
193
+ key = (h.get("name", "").lower(), h.get("text", "").strip())
194
+ if key in seen:
195
+ continue
196
+ seen.add(key)
197
+ uniq_haz.append(h)
198
 
199
  return {
200
  "ok": True,
201
  "query": q,
202
  "cid": cid,
203
  "resolved_cas": resolved_cas,
204
+ "props": props,
205
+ "structure_png": _structure_png_url(cid),
206
+ "url": _compound_url(cid),
207
+ "synonyms": synonyms[:50],
208
+ "hazards": uniq_haz,
209
+ }