InsuranceBot / tools /check_link_rot.py
rohitsar567's picture
fix(link-rot): validate PDF candidates + untrack llm_health.json
f9437bb
Raw
History Blame Contribute Delete
14.1 kB
"""Daily link-rot check + auto-fix.
Three-phase pipeline run unattended by launchd every night:
1. DETECT β€” HEAD every external URL referenced in the KB
2. AUTO-FIX β€” for each dead URL, try repair strategies in order
(a) Wayback Machine snapshot lookup
(b) URL canonicalisation (strip query strings, swap http/https)
(c) Insurer-site root-path retry (PDFs only)
If any fix succeeds, the source file is patched in place.
3. REPORT β€” anything still dead is written to MUST_FIX.md and a macOS
notification is posted so the user knows a manual fix is needed.
The cron job is idempotent. Re-running after a successful auto-fix is a no-op.
URLs are pulled from three places:
- 40-data/corpus_urls.md β€” policy PDF index (markdown table)
- 40-data/premiums/illustrative_premiums.json β€” premium anchors
- 40-data/reviews/*.json β€” aggregator + news + IRDAI + Reddit + YouTube
Exit codes:
0 β€” all URLs reachable, OR all dead URLs were auto-fixed
1 β€” at least one URL is still dead after auto-fix (manual action required)
2 β€” script-level error
"""
from __future__ import annotations
import json
import re
import subprocess
import sys
import time
from pathlib import Path
import httpx
PROJECT_ROOT = Path(__file__).resolve().parent.parent
LOG_DIR = Path.home() / "Library" / "Logs" / "insurance-bot"
LOG_DIR.mkdir(parents=True, exist_ok=True)
LOG_FILE = LOG_DIR / "link_rot.log"
MUST_FIX = PROJECT_ROOT / "MUST_FIX.md"
BROWSER_ALLOWLIST = PROJECT_ROOT / "tools" / "browser_verified.json"
ALLOWLIST_TTL_DAYS = 30 # re-verify via browser after 30 days
UA = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0 Safari/537.36"
)
HEADERS = {
"User-Agent": UA,
"Accept": "text/html,application/xhtml+xml,application/pdf,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-IN,en;q=0.9",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
}
TIMEOUT = httpx.Timeout(20.0, connect=8.0)
def notify(title: str, body: str) -> None:
try:
subprocess.run(
["osascript", "-e", f'display notification "{body}" with title "{title}"'],
check=False,
timeout=5,
)
except Exception: # noqa: BLE001
pass
def load_browser_allowlist() -> dict[str, dict]:
"""URLs that a real browser has verified work. The cron skips these
(real users would succeed) until the entry ages past ALLOWLIST_TTL_DAYS,
at which point browser_verify.py should be re-run."""
if not BROWSER_ALLOWLIST.exists():
return {}
try:
return json.loads(BROWSER_ALLOWLIST.read_text())
except json.JSONDecodeError:
return {}
def is_allowlisted(url: str, allowlist: dict[str, dict]) -> bool:
entry = allowlist.get(url)
if not entry:
return False
ts_str = entry.get("ts", "")
if not ts_str:
return False
try:
# ISO 8601 with TZ offset; chop fractional seconds if present
ts_clean = ts_str.split(".")[0]
# python <3.11 doesn't parse +0530 without colon β€” normalise
if len(ts_clean) >= 5 and ts_clean[-5] in ("+", "-") and ts_clean[-3] != ":":
ts_clean = ts_clean[:-2] + ":" + ts_clean[-2:]
from datetime import datetime, timezone
verified = datetime.fromisoformat(ts_clean)
if verified.tzinfo is None:
verified = verified.replace(tzinfo=timezone.utc)
age_days = (datetime.now(timezone.utc) - verified).days
return age_days <= ALLOWLIST_TTL_DAYS
except (ValueError, IndexError):
return False
def collect_urls() -> dict[str, list[tuple[str, Path]]]:
"""Return {url: [(label, source_file), ...]}."""
urls: dict[str, list[tuple[str, Path]]] = {}
def add(url: str, label: str, source: Path) -> None:
u = url.strip().rstrip(",.)")
if not u.startswith("http"):
return
urls.setdefault(u, []).append((label, source))
corpus_md = PROJECT_ROOT / "40-data" / "corpus_urls.md"
if corpus_md.exists():
# Markdown tables use `|` as column separator. URLs themselves may
# contain parens (e.g. care-advantage-(health-insurance-product...) so
# we only break on whitespace and the column separator.
for line in corpus_md.read_text().splitlines():
if not line.startswith("|"):
continue
parts = [p.strip() for p in line.strip("|").split("|")]
for p in parts:
m = re.search(r"https?://\S+", p)
if m:
add(m.group(0), "corpus_urls.md", corpus_md)
prem_json = PROJECT_ROOT / "40-data" / "premiums" / "illustrative_premiums.json"
if prem_json.exists():
d = json.loads(prem_json.read_text())
for pid, entry in d.get("base_premiums", {}).items():
for s in entry.get("samples", []):
add(s.get("source_url", ""), f"premiums:{pid}", prem_json)
reviews_dir = PROJECT_ROOT / "40-data" / "reviews"
if reviews_dir.exists():
for f in reviews_dir.glob("*.json"):
text = f.read_text()
for m in re.finditer(r"https?://[^\s\"',\]\}]+", text):
add(m.group(0), f"reviews:{f.name}", f)
return urls
def head_check(url: str, client: httpx.Client) -> tuple[int, str]:
try:
r = client.head(url, headers=HEADERS, follow_redirects=True)
if r.status_code in (403, 405, 501):
r = client.get(
url,
headers={**HEADERS, "Range": "bytes=0-1023"},
follow_redirects=True,
)
return r.status_code, f"final_url={r.url}"
except httpx.TimeoutException:
return 0, "timeout"
except httpx.HTTPError as e:
return 0, f"transport_error:{type(e).__name__}"
# ---------- auto-fix strategies ----------
def _looks_like_pdf_url(url: str) -> bool:
"""Heuristic: corpus expects a PDF at this URL."""
# Direct .pdf paths
if ".pdf" in url.lower().split("?", 1)[0]:
return True
# Wayback-wrapped originals: https://web.archive.org/web/<ts>/<original>
if "web.archive.org/web/" in url and ".pdf" in url.lower():
return True
return False
def _validate_pdf_candidate(url: str, client: httpx.Client) -> bool:
"""Confirm a candidate URL ACTUALLY serves a PDF (Bug #X 2026-05-28).
Background: HEAD on the same path can return 200 with text/html (server
interstitial) while GET returns 200 with %PDF β€” and vice versa on
transient failures. The previous auto-fix accepted any candidate that
returned 200 to a HEAD, which silently downgraded https→http and
swapped live URLs for HTML-only Wayback wrappers. Validate by
GET-Range and check the file magic.
"""
try:
r = client.get(
url,
headers={**HEADERS, "Range": "bytes=0-31"},
follow_redirects=True,
timeout=15,
)
if r.status_code not in (200, 206):
return False
ct = (r.headers.get("Content-Type") or "").lower()
body = r.content[:4]
# Accept either explicit PDF content-type OR %PDF magic β€” some
# CDNs send octet-stream / generic types for PDF assets.
return body == b"%PDF" or "pdf" in ct
except (httpx.HTTPError, ValueError):
return False
def _wayback_raw_variant(url: str) -> str:
"""Rewrite a Wayback snapshot URL to its `id_` raw variant so the
archive serves the original PDF bytes instead of the HTML wrapper.
https://web.archive.org/web/<ts>/<orig> -->
https://web.archive.org/web/<ts>id_/<orig>"""
m = re.match(r"^(https?://web\.archive\.org/web/\d+)(/.+)$", url)
if not m:
return url
return f"{m.group(1)}id_{m.group(2)}"
def try_wayback(url: str, client: httpx.Client) -> str | None:
"""Return a working Wayback Machine snapshot URL, or None.
For PDF URLs, rewrite to the `id_` raw variant and verify the
snapshot actually serves the PDF bytes (some snapshots are HTML
error pages with status 200).
"""
try:
r = client.get(
"https://archive.org/wayback/available",
params={"url": url},
headers=HEADERS,
timeout=15,
)
if r.status_code != 200:
return None
snap = r.json().get("archived_snapshots", {}).get("closest", {})
if not (snap.get("available") and snap.get("status", "").startswith("2")):
return None
candidate = snap.get("url")
if not candidate:
return None
if _looks_like_pdf_url(url):
candidate = _wayback_raw_variant(candidate)
if not _validate_pdf_candidate(candidate, client):
return None
return candidate
except (httpx.HTTPError, ValueError):
return None
def try_canonicalise(url: str, client: httpx.Client) -> str | None:
"""Strip query strings, flip http<->https.
For PDF URLs, the candidate MUST serve actual PDF bytes (not just
return HTTP 200) β€” HEAD alone is too credulous on servers that
answer interstitial HTML for HEAD but real PDF for GET.
"""
candidates = []
if "?" in url:
candidates.append(url.split("?", 1)[0])
if url.startswith("http://"):
candidates.append("https://" + url[len("http://") :])
elif url.startswith("https://"):
candidates.append("http://" + url[len("https://") :])
expect_pdf = _looks_like_pdf_url(url)
for c in candidates:
s, _ = head_check(c, client)
if not (200 <= s < 400):
continue
if expect_pdf and not _validate_pdf_candidate(c, client):
continue
return c
return None
def auto_fix(url: str, client: httpx.Client) -> tuple[str | None, str]:
"""Return (replacement_url, strategy) or (None, "")."""
for strat_name, strat in (
("canonicalise", try_canonicalise),
("wayback", try_wayback),
):
fix = strat(url, client)
if fix:
return fix, strat_name
return None, ""
def apply_patch(old: str, new: str, files: list[Path]) -> int:
"""Replace `old` with `new` in every distinct file; return file-count patched."""
count = 0
for f in set(files):
try:
content = f.read_text()
if old in content:
f.write_text(content.replace(old, new))
count += 1
except OSError:
continue
return count
# ---------- main ----------
def main() -> int:
urls = collect_urls()
if not urls:
print("[link-rot] no URLs found β€” KB layout changed?", file=sys.stderr)
return 2
started = time.strftime("%Y-%m-%dT%H:%M:%S%z")
fixed: list[tuple[str, str, str]] = [] # (old, new, strategy)
still_dead: list[tuple[str, int, str, list[str]]] = []
allowlist = load_browser_allowlist()
allowlisted_count = 0
with LOG_FILE.open("a") as fp, httpx.Client(timeout=TIMEOUT) as client:
fp.write(f"\n=== run start {started} | {len(urls)} URLs | allowlist={len(allowlist)} ===\n")
for url, refs in urls.items():
# Skip URLs a real browser has already verified work β€” until TTL expires.
# These are typically bot-protected hosts (Akamai/Cloudflare/DataDome)
# that httpx cannot HEAD but render fine for end users.
if is_allowlisted(url, allowlist):
fp.write(json.dumps({"url": url, "browser_allowlisted": allowlist[url].get("ts")}) + "\n")
allowlisted_count += 1
continue
status, note = head_check(url, client)
ok = 200 <= status < 400
entry = {
"ts": time.strftime("%Y-%m-%dT%H:%M:%S%z"),
"url": url,
"status": status,
"ok": ok,
"note": note,
"sources": [lbl for lbl, _ in refs],
}
if ok:
fp.write(json.dumps(entry) + "\n")
continue
# auto-fix attempt
new_url, strat = auto_fix(url, client)
if new_url:
files = [src for _, src in refs]
patched = apply_patch(url, new_url, files)
entry["auto_fix"] = {"strategy": strat, "new_url": new_url, "files_patched": patched}
fixed.append((url, new_url, strat))
else:
still_dead.append((url, status, note, [lbl for lbl, _ in refs]))
fp.write(json.dumps(entry) + "\n")
# MUST_FIX.md report β€” overwritten every run so it always reflects current state
if still_dead:
lines = [
"# Link-rot β€” manual fix required",
"",
f"Run: {started}",
f"Auto-fixed: {len(fixed)} Still dead: {len(still_dead)}",
"",
"| status | url | sources |",
"|---|---|---|",
]
for url, status, _note, sources in still_dead:
lines.append(f"| {status} | {url} | {', '.join(sources)} |")
MUST_FIX.write_text("\n".join(lines) + "\n")
notify(
"Insurance Bot β€” link rot",
f"{len(still_dead)} dead URLs need manual fix. See MUST_FIX.md",
)
elif MUST_FIX.exists():
MUST_FIX.unlink() # clean up stale report
print(
f"[link-rot] total {len(urls)} | browser-allowlisted {allowlisted_count} | "
f"http-checked {len(urls) - allowlisted_count} | auto-fixed {len(fixed)} | "
f"still dead {len(still_dead)}"
)
for old, new, strat in fixed[:10]:
print(f" FIXED ({strat}): {old}\n -> {new}")
for url, status, note, sources in still_dead[:10]:
print(f" DEAD [{status}]: {url} ({note}) ← {','.join(sources)}")
return 1 if still_dead else 0
if __name__ == "__main__":
sys.exit(main())