| """ |
| URL liveness checker for bibliography entries. |
| |
| Many @misc / blog / repo references rot over time. This checker does a HEAD |
| (falling back to a small GET) on entry.url and flags anything that returns |
| 4xx/5xx or fails to connect. |
| |
| Operates on BibEntry objects, not on tex_content. Invoked from main.py / app.py |
| when `submission_extra.url_liveness` is true. |
| """ |
| from __future__ import annotations |
|
|
| import concurrent.futures |
| import logging |
| import re |
| from dataclasses import dataclass |
| from typing import Iterable, List, Optional |
|
|
| import requests |
|
|
| from src.utils.http import get_session, is_open, record_failure, record_success |
| from src.parsers.bib_parser import BibEntry |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| |
| |
| |
| _ARXIV_URL_RE = re.compile( |
| r"^https?://(?:www\.)?arxiv\.org/(?:abs|pdf|html)/([\w.\-/]+?)(?:\.pdf|\.html)?(?:[?#]|$)", |
| re.IGNORECASE, |
| ) |
| _ARXIV_EXPORT_API = "http://export.arxiv.org/api/query" |
| |
| |
| |
| |
| |
| |
| |
| _ARXIV_SOURCE = "arxiv" |
|
|
|
|
| @dataclass |
| class URLFinding: |
| entry_key: str |
| url: str |
| status: str |
| status_code: Optional[int] = None |
| detail: str = "" |
|
|
|
|
| class URLChecker: |
| """Concurrent HEAD-then-GET liveness check.""" |
|
|
| SKIP_PREFIXES = ("mailto:", "ftp://", "tel:", "javascript:") |
|
|
| def __init__(self, max_workers: int = 8, timeout: float = 15.0): |
| self.max_workers = max_workers |
| self.timeout = timeout |
|
|
| def _check_arxiv_via_api(self, entry_key: str, url: str, arxiv_id: str) -> URLFinding: |
| """Verify an arxiv URL by hitting export.arxiv.org instead of arxiv.org. |
| |
| Honors the shared `arxiv` circuit breaker: if the metadata fetcher |
| already proved the host is rate-limiting us, we report "skipped" |
| rather than spamming the host and reporting bogus "unreachable". |
| |
| Returns "ok" if the export API returns an Atom entry for the ID, |
| "broken" if the feed is empty (ID doesn't exist), "skipped" if the |
| breaker is open, or "unreachable" if the API itself fails. |
| """ |
| if is_open(_ARXIV_SOURCE): |
| return URLFinding( |
| entry_key, url, "skipped", |
| detail="arxiv source rate-limited (circuit breaker open)", |
| ) |
| session = get_session() |
| try: |
| r = session.get( |
| _ARXIV_EXPORT_API, |
| params={"id_list": arxiv_id, "max_results": 1}, |
| timeout=self.timeout, |
| ) |
| r.raise_for_status() |
| record_success(_ARXIV_SOURCE) |
| except requests.RequestException as e: |
| logger.debug("arXiv API check failed for %s: %s", url, e, exc_info=True) |
| record_failure(_ARXIV_SOURCE) |
| return URLFinding(entry_key, url, "unreachable", detail=f"arxiv-api: {str(e)[:100]}") |
| |
| |
| body = r.text or "" |
| if "<entry>" in body or "<entry " in body: |
| return URLFinding(entry_key, url, "ok", status_code=200) |
| return URLFinding( |
| entry_key, url, "broken", |
| status_code=200, |
| detail=f"arxiv id {arxiv_id!r} not found in export API", |
| ) |
|
|
| def _check_one(self, entry: BibEntry) -> Optional[URLFinding]: |
| url = (entry.url or "").strip() |
| if not url: |
| return None |
| if any(url.lower().startswith(p) for p in self.SKIP_PREFIXES): |
| return URLFinding(entry.key, url, "skipped", detail="non-http scheme") |
|
|
| |
| |
| m = _ARXIV_URL_RE.match(url) |
| if m: |
| return self._check_arxiv_via_api(entry.key, url, m.group(1)) |
|
|
| session = get_session() |
| try: |
| r = session.head(url, allow_redirects=True, timeout=self.timeout) |
| |
| if r.status_code in (403, 405, 501): |
| r = session.get(url, allow_redirects=True, timeout=self.timeout, stream=True) |
| |
| r.close() |
| except requests.RequestException as e: |
| logger.debug("URL check failed for %s: %s", url, e, exc_info=True) |
| return URLFinding(entry.key, url, "unreachable", detail=str(e)[:120]) |
|
|
| if 200 <= r.status_code < 400: |
| return URLFinding(entry.key, url, "ok", status_code=r.status_code) |
| return URLFinding( |
| entry.key, url, "broken", |
| status_code=r.status_code, |
| detail=f"HTTP {r.status_code}", |
| ) |
|
|
| def check_entries(self, entries: Iterable[BibEntry]) -> List[URLFinding]: |
| targets = [e for e in entries if getattr(e, "url", "")] |
| if not targets: |
| return [] |
| findings: List[URLFinding] = [] |
| with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as ex: |
| for f in ex.map(self._check_one, targets): |
| if f is not None: |
| findings.append(f) |
| return findings |
|
|