""" URL liveness checker for bibliography entries. Many @misc / blog / repo references rot over time. This checker does a HEAD (falling back to a small GET) on entry.url and flags anything that returns 4xx/5xx or fails to connect. Operates on BibEntry objects, not on tex_content. Invoked from main.py / app.py when `submission_extra.url_liveness` is true. """ from __future__ import annotations import concurrent.futures import logging import re from dataclasses import dataclass from typing import Iterable, List, Optional import requests from src.utils.http import get_session, is_open, record_failure, record_success from src.parsers.bib_parser import BibEntry logger = logging.getLogger(__name__) # arxiv.org HTML endpoints (`/abs/...`, `/pdf/...`) routinely reset # connections from cloud egress IPs (HF Spaces, AWS, GCP). The arXiv # *export API* — same paper IDs, official endpoint — is far more stable. # When we see an arxiv URL, we verify it by querying export.arxiv.org # instead of HEAD'ing arxiv.org directly. _ARXIV_URL_RE = re.compile( r"^https?://(?:www\.)?arxiv\.org/(?:abs|pdf|html)/([\w.\-/]+?)(?:\.pdf|\.html)?(?:[?#]|$)", re.IGNORECASE, ) _ARXIV_EXPORT_API = "http://export.arxiv.org/api/query" # Share the arxiv metadata fetcher's circuit breaker. Both hit # export.arxiv.org; once the breaker is tripped (typically after 2 quick # 429s from the metadata fetcher), it makes no sense for the URL checker # to keep firing requests at the same dead host — that was producing # 18+ false "unreachable" findings for arxiv URLs that are actually fine. # When the breaker is open, mark the URL as "skipped" so the report # doesn't falsely claim it's broken. _ARXIV_SOURCE = "arxiv" @dataclass class URLFinding: entry_key: str url: str status: str # "ok" | "broken" | "unreachable" | "skipped" status_code: Optional[int] = None detail: str = "" class URLChecker: """Concurrent HEAD-then-GET liveness check.""" SKIP_PREFIXES = ("mailto:", "ftp://", "tel:", "javascript:") def __init__(self, max_workers: int = 8, timeout: float = 15.0): self.max_workers = max_workers self.timeout = timeout def _check_arxiv_via_api(self, entry_key: str, url: str, arxiv_id: str) -> URLFinding: """Verify an arxiv URL by hitting export.arxiv.org instead of arxiv.org. Honors the shared `arxiv` circuit breaker: if the metadata fetcher already proved the host is rate-limiting us, we report "skipped" rather than spamming the host and reporting bogus "unreachable". Returns "ok" if the export API returns an Atom entry for the ID, "broken" if the feed is empty (ID doesn't exist), "skipped" if the breaker is open, or "unreachable" if the API itself fails. """ if is_open(_ARXIV_SOURCE): return URLFinding( entry_key, url, "skipped", detail="arxiv source rate-limited (circuit breaker open)", ) session = get_session() try: r = session.get( _ARXIV_EXPORT_API, params={"id_list": arxiv_id, "max_results": 1}, timeout=self.timeout, ) r.raise_for_status() record_success(_ARXIV_SOURCE) except requests.RequestException as e: logger.debug("arXiv API check failed for %s: %s", url, e, exc_info=True) record_failure(_ARXIV_SOURCE) return URLFinding(entry_key, url, "unreachable", detail=f"arxiv-api: {str(e)[:100]}") # The Atom feed contains `` only when the ID resolves. An # empty feed (totalResults=0) means the ID is bogus. body = r.text or "" if "" in body or " Optional[URLFinding]: url = (entry.url or "").strip() if not url: return None if any(url.lower().startswith(p) for p in self.SKIP_PREFIXES): return URLFinding(entry.key, url, "skipped", detail="non-http scheme") # arxiv.org HEAD requests get connection-reset on shared egress IPs. # Re-route to the export API, which is the official liveness signal. m = _ARXIV_URL_RE.match(url) if m: return self._check_arxiv_via_api(entry.key, url, m.group(1)) session = get_session() try: r = session.head(url, allow_redirects=True, timeout=self.timeout) # Many servers return 405/403 for HEAD but are fine with GET; double-check with a tiny GET. if r.status_code in (403, 405, 501): r = session.get(url, allow_redirects=True, timeout=self.timeout, stream=True) # Don't actually read the body r.close() except requests.RequestException as e: logger.debug("URL check failed for %s: %s", url, e, exc_info=True) return URLFinding(entry.key, url, "unreachable", detail=str(e)[:120]) if 200 <= r.status_code < 400: return URLFinding(entry.key, url, "ok", status_code=r.status_code) return URLFinding( entry.key, url, "broken", status_code=r.status_code, detail=f"HTTP {r.status_code}", ) def check_entries(self, entries: Iterable[BibEntry]) -> List[URLFinding]: targets = [e for e in entries if getattr(e, "url", "")] if not targets: return [] findings: List[URLFinding] = [] with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as ex: for f in ex.map(self._check_one, targets): if f is not None: findings.append(f) return findings