hchevva's picture
Upload 4 files
02835d5 verified
import html
import re
from typing import Any, Dict, List, Optional
from urllib.parse import urljoin
import httpx
from core.config import settings
REPORTS_URL = "https://ntp.niehs.nih.gov/publications/reports"
BASE = "https://ntp.niehs.nih.gov"
INDEX_URL = "https://ntp.niehs.nih.gov/data/tr"
TR_RE = re.compile(r"\bTR-?(\d{3,4})\b", re.IGNORECASE)
HREF_RE = re.compile(r"href=[\"']([^\"']+)[\"']", re.IGNORECASE)
TITLE_RE = re.compile(r"<title[^>]*>(.*?)</title>", re.IGNORECASE | re.DOTALL)
TR_MARKER_RE = re.compile(r">(TR-\d{3,})<", re.IGNORECASE)
def _strip_tags(html_text: str) -> str:
# crude but robust enough for the NTP index page
text = re.sub(r"<script[\s\S]*?</script>", " ", html_text, flags=re.IGNORECASE)
text = re.sub(r"<style[\s\S]*?</style>", " ", text, flags=re.IGNORECASE)
text = re.sub(r"<[^>]+>", " ", text)
text = html.unescape(text)
text = re.sub(r"\s+", " ", text).strip()
return text
def _extract_title(page_html: str) -> str:
m = TITLE_RE.search(page_html or "")
if not m:
return ""
t = html.unescape(m.group(1))
t = re.sub(r"\s+", " ", t).strip()
# common suffix
t = re.sub(r"\s*\|\s*NTP.*$", "", t, flags=re.IGNORECASE).strip()
return t
def _extract_pdf_url(page_html: str, page_url: str) -> Optional[str]:
# Look for any href ending in .pdf
hrefs = HREF_RE.findall(page_html or "")
for href in hrefs:
if ".pdf" not in href.lower():
continue
if href.startswith("#"):
continue
return urljoin(page_url, href)
return None
async def _fetch_tr_page(num: str, http: httpx.AsyncClient) -> Optional[Dict[str, Any]]:
page_url = f"{BASE}/publications/reports/tr{num}"
try:
r = await http.get(
page_url,
timeout=25,
follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0"},
)
if r.status_code >= 400:
return None
page_html = r.text
except Exception:
return None
title = _extract_title(page_html)
pdf_url = _extract_pdf_url(page_html, str(r.url))
# Try to find a year in the title
year = None
if title:
years = re.findall(r"\b(19\d{2}|20\d{2})\b", title)
if years:
year = years[-1]
item = {
"num": num,
"tr": f"TR-{num}",
"report_page": str(r.url),
"title": title,
"year": year,
"pdf": pdf_url,
}
return item
async def search_technical_reports(query: str, http: httpx.AsyncClient, limit: int = 8) -> Dict[str, Any]:
"""Search NTP Technical Reports and return ONLY TR hits relevant to the query.
Implementation mirrors production (Cloudflare worker):
- download the NTP reports index HTML
- locate TR-### occurrences
- keep a TR if the query appears in the surrounding neighborhood text
- fetch each TR page to obtain report page + PDF
"""
q = (query or "").strip()
if not q:
return {"ok": False, "error": "Empty query", "items": []}
# Prefer worker proxy if configured (matches production behavior)
if settings.worker_base_url:
try:
worker_url = settings.worker_base_url.rstrip("/") + "/ntp-tr"
r = await http.post(worker_url, json={"query": q, "limit": limit}, timeout=25.0)
if r.status_code < 400:
data = r.json()
rows = data.get("results") or []
items: List[Dict[str, Any]] = []
for row in rows:
if not isinstance(row, dict):
continue
items.append(
{
"num": (row.get("tr") or "").replace("TR-", ""),
"tr": row.get("tr"),
"report_page": row.get("page") or row.get("url"),
"title": row.get("title") or "NTP Technical Report",
"year": row.get("year"),
"pdf": row.get("pdf"),
}
)
if items:
return {"ok": True, "query": q, "items": items}
except Exception:
pass
try:
r = await http.get(
REPORTS_URL,
timeout=25,
follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0"},
)
r.raise_for_status()
index_html = r.text
except Exception as e:
return {"ok": False, "error": f"Failed to fetch NTP index: {e}", "items": []}
lines = index_html.splitlines()
q_low = q.lower()
is_cas = bool(re.match(r"^\d{2,7}-\d{2}-\d$", q))
q_digits = re.sub(r"\D", "", q) if is_cas else ""
results: List[Dict[str, Any]] = []
for i, line in enumerate(lines):
m = TR_MARKER_RE.search(line)
if not m:
continue
snippet = " ".join(lines[i : min(i + 12, len(lines))])
mini_snippet = " ".join(lines[i : min(i + 3, len(lines))])
low_text = _strip_tags(snippet).lower()
mini_text = _strip_tags(mini_snippet)
if is_cas:
if q_digits not in re.sub(r"\D", "", mini_text):
continue
else:
if q_low not in low_text:
continue
tr_id = m.group(1)
hrefs = HREF_RE.findall(snippet)
# Prefer a non-PDF link under /publications/ or /go/
candidates = [urljoin(REPORTS_URL, h) for h in hrefs if not re.search(r"\.pdf(\?|$)", h, re.I)]
def score(u: str) -> int:
s = 0
if "/publications/" in u:
s += 3
if "/go/" in u:
s += 3
if tr_id and tr_id.lower() in u.lower():
s += 2
if re.search(r"/reports?", u):
s += 1
return s
candidates.sort(key=score, reverse=True)
page_href = candidates[0] if candidates else None
pdf_match = re.search(r'href="([^"]+\.pdf)"', snippet, re.I)
pdf_url = urljoin(REPORTS_URL, pdf_match.group(1)) if pdf_match else None
text_block = _strip_tags(snippet)
if is_cas and q_digits not in re.sub(r"\D", "", text_block):
continue
title_match = re.search(
r"TR-\d{3,}\s+PDF\s+(.*?)\s+\b(20\d{2}|19\d{2})\b\s+Technical Report",
text_block,
re.I,
)
year_match = re.search(r"\b(20\d{2}|19\d{2})\b", text_block)
results.append(
{
"tr": tr_id,
"title": title_match.group(1) if title_match else "",
"year": year_match.group(1) if year_match else "",
"pdf": pdf_url,
"report_page": page_href or REPORTS_URL,
}
)
if len(results) >= int(limit):
break
# Fallback: scan the TR index page (data/tr)
if not results:
try:
r2 = await http.get(
INDEX_URL,
timeout=25,
follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0"},
)
if r2.status_code < 400:
idx_html = r2.text
else:
idx_html = ""
except Exception:
idx_html = ""
if idx_html:
idx_lines = idx_html.splitlines()
for i, row in enumerate(idx_lines):
if not re.search(r"TR-\d{3,}", row, re.I):
continue
block = " ".join(idx_lines[i : min(i + 6, len(idx_lines))])
block_text = _strip_tags(block)
low = block_text.lower()
if q_low not in low:
continue
tr = re.search(r"TR-\d{3,}", block_text, re.I)
cas = re.search(r"\b\d{2,7}-\d{2}-\d\b", block_text)
name = block_text
if tr:
name = re.sub(r"TR-\d{3,}", "", name, flags=re.I)
if cas:
name = re.sub(r"\b\d{2,7}-\d{2}-\d\b", "", name)
name = name.strip()
results.append(
{
"tr": tr.group(0) if tr else "",
"title": name or "",
"year": "",
"pdf": None,
"report_page": INDEX_URL,
}
)
if len(results) >= int(limit):
break
return {"ok": True, "query": q, "items": results}