BibGuard / src /checkers /url_checker.py
thinkwee
improve api stability
58f0f1d
"""
URL liveness checker for bibliography entries.
Many @misc / blog / repo references rot over time. This checker does a HEAD
(falling back to a small GET) on entry.url and flags anything that returns
4xx/5xx or fails to connect.
Operates on BibEntry objects, not on tex_content. Invoked from main.py / app.py
when `submission_extra.url_liveness` is true.
"""
from __future__ import annotations
import concurrent.futures
import logging
import re
from dataclasses import dataclass
from typing import Iterable, List, Optional
import requests
from src.utils.http import get_session, is_open, record_failure, record_success
from src.parsers.bib_parser import BibEntry
logger = logging.getLogger(__name__)
# arxiv.org HTML endpoints (`/abs/...`, `/pdf/...`) routinely reset
# connections from cloud egress IPs (HF Spaces, AWS, GCP). The arXiv
# *export API* — same paper IDs, official endpoint — is far more stable.
# When we see an arxiv URL, we verify it by querying export.arxiv.org
# instead of HEAD'ing arxiv.org directly.
_ARXIV_URL_RE = re.compile(
r"^https?://(?:www\.)?arxiv\.org/(?:abs|pdf|html)/([\w.\-/]+?)(?:\.pdf|\.html)?(?:[?#]|$)",
re.IGNORECASE,
)
_ARXIV_EXPORT_API = "http://export.arxiv.org/api/query"
# Share the arxiv metadata fetcher's circuit breaker. Both hit
# export.arxiv.org; once the breaker is tripped (typically after 2 quick
# 429s from the metadata fetcher), it makes no sense for the URL checker
# to keep firing requests at the same dead host — that was producing
# 18+ false "unreachable" findings for arxiv URLs that are actually fine.
# When the breaker is open, mark the URL as "skipped" so the report
# doesn't falsely claim it's broken.
_ARXIV_SOURCE = "arxiv"
@dataclass
class URLFinding:
entry_key: str
url: str
status: str # "ok" | "broken" | "unreachable" | "skipped"
status_code: Optional[int] = None
detail: str = ""
class URLChecker:
"""Concurrent HEAD-then-GET liveness check."""
SKIP_PREFIXES = ("mailto:", "ftp://", "tel:", "javascript:")
def __init__(self, max_workers: int = 8, timeout: float = 15.0):
self.max_workers = max_workers
self.timeout = timeout
def _check_arxiv_via_api(self, entry_key: str, url: str, arxiv_id: str) -> URLFinding:
"""Verify an arxiv URL by hitting export.arxiv.org instead of arxiv.org.
Honors the shared `arxiv` circuit breaker: if the metadata fetcher
already proved the host is rate-limiting us, we report "skipped"
rather than spamming the host and reporting bogus "unreachable".
Returns "ok" if the export API returns an Atom entry for the ID,
"broken" if the feed is empty (ID doesn't exist), "skipped" if the
breaker is open, or "unreachable" if the API itself fails.
"""
if is_open(_ARXIV_SOURCE):
return URLFinding(
entry_key, url, "skipped",
detail="arxiv source rate-limited (circuit breaker open)",
)
session = get_session()
try:
r = session.get(
_ARXIV_EXPORT_API,
params={"id_list": arxiv_id, "max_results": 1},
timeout=self.timeout,
)
r.raise_for_status()
record_success(_ARXIV_SOURCE)
except requests.RequestException as e:
logger.debug("arXiv API check failed for %s: %s", url, e, exc_info=True)
record_failure(_ARXIV_SOURCE)
return URLFinding(entry_key, url, "unreachable", detail=f"arxiv-api: {str(e)[:100]}")
# The Atom feed contains `<entry>` only when the ID resolves. An
# empty feed (totalResults=0) means the ID is bogus.
body = r.text or ""
if "<entry>" in body or "<entry " in body:
return URLFinding(entry_key, url, "ok", status_code=200)
return URLFinding(
entry_key, url, "broken",
status_code=200,
detail=f"arxiv id {arxiv_id!r} not found in export API",
)
def _check_one(self, entry: BibEntry) -> Optional[URLFinding]:
url = (entry.url or "").strip()
if not url:
return None
if any(url.lower().startswith(p) for p in self.SKIP_PREFIXES):
return URLFinding(entry.key, url, "skipped", detail="non-http scheme")
# arxiv.org HEAD requests get connection-reset on shared egress IPs.
# Re-route to the export API, which is the official liveness signal.
m = _ARXIV_URL_RE.match(url)
if m:
return self._check_arxiv_via_api(entry.key, url, m.group(1))
session = get_session()
try:
r = session.head(url, allow_redirects=True, timeout=self.timeout)
# Many servers return 405/403 for HEAD but are fine with GET; double-check with a tiny GET.
if r.status_code in (403, 405, 501):
r = session.get(url, allow_redirects=True, timeout=self.timeout, stream=True)
# Don't actually read the body
r.close()
except requests.RequestException as e:
logger.debug("URL check failed for %s: %s", url, e, exc_info=True)
return URLFinding(entry.key, url, "unreachable", detail=str(e)[:120])
if 200 <= r.status_code < 400:
return URLFinding(entry.key, url, "ok", status_code=r.status_code)
return URLFinding(
entry.key, url, "broken",
status_code=r.status_code,
detail=f"HTTP {r.status_code}",
)
def check_entries(self, entries: Iterable[BibEntry]) -> List[URLFinding]:
targets = [e for e in entries if getattr(e, "url", "")]
if not targets:
return []
findings: List[URLFinding] = []
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as ex:
for f in ex.map(self._check_one, targets):
if f is not None:
findings.append(f)
return findings