File size: 6,022 Bytes
fcffa22
 
 
 
 
 
 
 
 
 
 
 
 
 
dc4b1cd
fcffa22
 
 
 
 
58f0f1d
fcffa22
 
 
 
dc4b1cd
 
 
 
 
 
 
 
 
 
58f0f1d
 
 
 
 
 
 
 
dc4b1cd
fcffa22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dc4b1cd
 
 
58f0f1d
 
 
 
dc4b1cd
58f0f1d
 
dc4b1cd
58f0f1d
 
 
 
 
dc4b1cd
 
 
 
 
 
 
 
58f0f1d
dc4b1cd
 
58f0f1d
dc4b1cd
 
 
 
 
 
 
 
 
 
 
 
fcffa22
 
 
 
 
 
 
dc4b1cd
 
 
 
 
 
fcffa22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""
URL liveness checker for bibliography entries.

Many @misc / blog / repo references rot over time. This checker does a HEAD
(falling back to a small GET) on entry.url and flags anything that returns
4xx/5xx or fails to connect.

Operates on BibEntry objects, not on tex_content. Invoked from main.py / app.py
when `submission_extra.url_liveness` is true.
"""
from __future__ import annotations

import concurrent.futures
import logging
import re
from dataclasses import dataclass
from typing import Iterable, List, Optional

import requests

from src.utils.http import get_session, is_open, record_failure, record_success
from src.parsers.bib_parser import BibEntry

logger = logging.getLogger(__name__)

# arxiv.org HTML endpoints (`/abs/...`, `/pdf/...`) routinely reset
# connections from cloud egress IPs (HF Spaces, AWS, GCP). The arXiv
# *export API* — same paper IDs, official endpoint — is far more stable.
# When we see an arxiv URL, we verify it by querying export.arxiv.org
# instead of HEAD'ing arxiv.org directly.
_ARXIV_URL_RE = re.compile(
    r"^https?://(?:www\.)?arxiv\.org/(?:abs|pdf|html)/([\w.\-/]+?)(?:\.pdf|\.html)?(?:[?#]|$)",
    re.IGNORECASE,
)
_ARXIV_EXPORT_API = "http://export.arxiv.org/api/query"
# Share the arxiv metadata fetcher's circuit breaker. Both hit
# export.arxiv.org; once the breaker is tripped (typically after 2 quick
# 429s from the metadata fetcher), it makes no sense for the URL checker
# to keep firing requests at the same dead host — that was producing
# 18+ false "unreachable" findings for arxiv URLs that are actually fine.
# When the breaker is open, mark the URL as "skipped" so the report
# doesn't falsely claim it's broken.
_ARXIV_SOURCE = "arxiv"


@dataclass
class URLFinding:
    entry_key: str
    url: str
    status: str            # "ok" | "broken" | "unreachable" | "skipped"
    status_code: Optional[int] = None
    detail: str = ""


class URLChecker:
    """Concurrent HEAD-then-GET liveness check."""

    SKIP_PREFIXES = ("mailto:", "ftp://", "tel:", "javascript:")

    def __init__(self, max_workers: int = 8, timeout: float = 15.0):
        self.max_workers = max_workers
        self.timeout = timeout

    def _check_arxiv_via_api(self, entry_key: str, url: str, arxiv_id: str) -> URLFinding:
        """Verify an arxiv URL by hitting export.arxiv.org instead of arxiv.org.

        Honors the shared `arxiv` circuit breaker: if the metadata fetcher
        already proved the host is rate-limiting us, we report "skipped"
        rather than spamming the host and reporting bogus "unreachable".

        Returns "ok" if the export API returns an Atom entry for the ID,
        "broken" if the feed is empty (ID doesn't exist), "skipped" if the
        breaker is open, or "unreachable" if the API itself fails.
        """
        if is_open(_ARXIV_SOURCE):
            return URLFinding(
                entry_key, url, "skipped",
                detail="arxiv source rate-limited (circuit breaker open)",
            )
        session = get_session()
        try:
            r = session.get(
                _ARXIV_EXPORT_API,
                params={"id_list": arxiv_id, "max_results": 1},
                timeout=self.timeout,
            )
            r.raise_for_status()
            record_success(_ARXIV_SOURCE)
        except requests.RequestException as e:
            logger.debug("arXiv API check failed for %s: %s", url, e, exc_info=True)
            record_failure(_ARXIV_SOURCE)
            return URLFinding(entry_key, url, "unreachable", detail=f"arxiv-api: {str(e)[:100]}")
        # The Atom feed contains `<entry>` only when the ID resolves. An
        # empty feed (totalResults=0) means the ID is bogus.
        body = r.text or ""
        if "<entry>" in body or "<entry " in body:
            return URLFinding(entry_key, url, "ok", status_code=200)
        return URLFinding(
            entry_key, url, "broken",
            status_code=200,
            detail=f"arxiv id {arxiv_id!r} not found in export API",
        )

    def _check_one(self, entry: BibEntry) -> Optional[URLFinding]:
        url = (entry.url or "").strip()
        if not url:
            return None
        if any(url.lower().startswith(p) for p in self.SKIP_PREFIXES):
            return URLFinding(entry.key, url, "skipped", detail="non-http scheme")

        # arxiv.org HEAD requests get connection-reset on shared egress IPs.
        # Re-route to the export API, which is the official liveness signal.
        m = _ARXIV_URL_RE.match(url)
        if m:
            return self._check_arxiv_via_api(entry.key, url, m.group(1))

        session = get_session()
        try:
            r = session.head(url, allow_redirects=True, timeout=self.timeout)
            # Many servers return 405/403 for HEAD but are fine with GET; double-check with a tiny GET.
            if r.status_code in (403, 405, 501):
                r = session.get(url, allow_redirects=True, timeout=self.timeout, stream=True)
                # Don't actually read the body
                r.close()
        except requests.RequestException as e:
            logger.debug("URL check failed for %s: %s", url, e, exc_info=True)
            return URLFinding(entry.key, url, "unreachable", detail=str(e)[:120])

        if 200 <= r.status_code < 400:
            return URLFinding(entry.key, url, "ok", status_code=r.status_code)
        return URLFinding(
            entry.key, url, "broken",
            status_code=r.status_code,
            detail=f"HTTP {r.status_code}",
        )

    def check_entries(self, entries: Iterable[BibEntry]) -> List[URLFinding]:
        targets = [e for e in entries if getattr(e, "url", "")]
        if not targets:
            return []
        findings: List[URLFinding] = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as ex:
            for f in ex.map(self._check_one, targets):
                if f is not None:
                    findings.append(f)
        return findings