Spaces:
Sleeping
Sleeping
| """Daily link-rot check + auto-fix. | |
| Three-phase pipeline run unattended by launchd every night: | |
| 1. DETECT β HEAD every external URL referenced in the KB | |
| 2. AUTO-FIX β for each dead URL, try repair strategies in order | |
| (a) Wayback Machine snapshot lookup | |
| (b) URL canonicalisation (strip query strings, swap http/https) | |
| (c) Insurer-site root-path retry (PDFs only) | |
| If any fix succeeds, the source file is patched in place. | |
| 3. REPORT β anything still dead is written to MUST_FIX.md and a macOS | |
| notification is posted so the user knows a manual fix is needed. | |
| The cron job is idempotent. Re-running after a successful auto-fix is a no-op. | |
| URLs are pulled from three places: | |
| - 40-data/corpus_urls.md β policy PDF index (markdown table) | |
| - 40-data/premiums/illustrative_premiums.json β premium anchors | |
| - 40-data/reviews/*.json β aggregator + news + IRDAI + Reddit + YouTube | |
| Exit codes: | |
| 0 β all URLs reachable, OR all dead URLs were auto-fixed | |
| 1 β at least one URL is still dead after auto-fix (manual action required) | |
| 2 β script-level error | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import re | |
| import subprocess | |
| import sys | |
| import time | |
| from pathlib import Path | |
| import httpx | |
| PROJECT_ROOT = Path(__file__).resolve().parent.parent | |
| LOG_DIR = Path.home() / "Library" / "Logs" / "insurance-bot" | |
| LOG_DIR.mkdir(parents=True, exist_ok=True) | |
| LOG_FILE = LOG_DIR / "link_rot.log" | |
| MUST_FIX = PROJECT_ROOT / "MUST_FIX.md" | |
| BROWSER_ALLOWLIST = PROJECT_ROOT / "tools" / "browser_verified.json" | |
| ALLOWLIST_TTL_DAYS = 30 # re-verify via browser after 30 days | |
| UA = ( | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 " | |
| "(KHTML, like Gecko) Chrome/124.0 Safari/537.36" | |
| ) | |
| HEADERS = { | |
| "User-Agent": UA, | |
| "Accept": "text/html,application/xhtml+xml,application/pdf,application/xml;q=0.9,*/*;q=0.8", | |
| "Accept-Language": "en-IN,en;q=0.9", | |
| "Sec-Fetch-Dest": "document", | |
| "Sec-Fetch-Mode": "navigate", | |
| "Sec-Fetch-Site": "none", | |
| } | |
| TIMEOUT = httpx.Timeout(20.0, connect=8.0) | |
| def notify(title: str, body: str) -> None: | |
| try: | |
| subprocess.run( | |
| ["osascript", "-e", f'display notification "{body}" with title "{title}"'], | |
| check=False, | |
| timeout=5, | |
| ) | |
| except Exception: # noqa: BLE001 | |
| pass | |
| def load_browser_allowlist() -> dict[str, dict]: | |
| """URLs that a real browser has verified work. The cron skips these | |
| (real users would succeed) until the entry ages past ALLOWLIST_TTL_DAYS, | |
| at which point browser_verify.py should be re-run.""" | |
| if not BROWSER_ALLOWLIST.exists(): | |
| return {} | |
| try: | |
| return json.loads(BROWSER_ALLOWLIST.read_text()) | |
| except json.JSONDecodeError: | |
| return {} | |
| def is_allowlisted(url: str, allowlist: dict[str, dict]) -> bool: | |
| entry = allowlist.get(url) | |
| if not entry: | |
| return False | |
| ts_str = entry.get("ts", "") | |
| if not ts_str: | |
| return False | |
| try: | |
| # ISO 8601 with TZ offset; chop fractional seconds if present | |
| ts_clean = ts_str.split(".")[0] | |
| # python <3.11 doesn't parse +0530 without colon β normalise | |
| if len(ts_clean) >= 5 and ts_clean[-5] in ("+", "-") and ts_clean[-3] != ":": | |
| ts_clean = ts_clean[:-2] + ":" + ts_clean[-2:] | |
| from datetime import datetime, timezone | |
| verified = datetime.fromisoformat(ts_clean) | |
| if verified.tzinfo is None: | |
| verified = verified.replace(tzinfo=timezone.utc) | |
| age_days = (datetime.now(timezone.utc) - verified).days | |
| return age_days <= ALLOWLIST_TTL_DAYS | |
| except (ValueError, IndexError): | |
| return False | |
| def collect_urls() -> dict[str, list[tuple[str, Path]]]: | |
| """Return {url: [(label, source_file), ...]}.""" | |
| urls: dict[str, list[tuple[str, Path]]] = {} | |
| def add(url: str, label: str, source: Path) -> None: | |
| u = url.strip().rstrip(",.)") | |
| if not u.startswith("http"): | |
| return | |
| urls.setdefault(u, []).append((label, source)) | |
| corpus_md = PROJECT_ROOT / "40-data" / "corpus_urls.md" | |
| if corpus_md.exists(): | |
| # Markdown tables use `|` as column separator. URLs themselves may | |
| # contain parens (e.g. care-advantage-(health-insurance-product...) so | |
| # we only break on whitespace and the column separator. | |
| for line in corpus_md.read_text().splitlines(): | |
| if not line.startswith("|"): | |
| continue | |
| parts = [p.strip() for p in line.strip("|").split("|")] | |
| for p in parts: | |
| m = re.search(r"https?://\S+", p) | |
| if m: | |
| add(m.group(0), "corpus_urls.md", corpus_md) | |
| prem_json = PROJECT_ROOT / "40-data" / "premiums" / "illustrative_premiums.json" | |
| if prem_json.exists(): | |
| d = json.loads(prem_json.read_text()) | |
| for pid, entry in d.get("base_premiums", {}).items(): | |
| for s in entry.get("samples", []): | |
| add(s.get("source_url", ""), f"premiums:{pid}", prem_json) | |
| reviews_dir = PROJECT_ROOT / "40-data" / "reviews" | |
| if reviews_dir.exists(): | |
| for f in reviews_dir.glob("*.json"): | |
| text = f.read_text() | |
| for m in re.finditer(r"https?://[^\s\"',\]\}]+", text): | |
| add(m.group(0), f"reviews:{f.name}", f) | |
| return urls | |
| def head_check(url: str, client: httpx.Client) -> tuple[int, str]: | |
| try: | |
| r = client.head(url, headers=HEADERS, follow_redirects=True) | |
| if r.status_code in (403, 405, 501): | |
| r = client.get( | |
| url, | |
| headers={**HEADERS, "Range": "bytes=0-1023"}, | |
| follow_redirects=True, | |
| ) | |
| return r.status_code, f"final_url={r.url}" | |
| except httpx.TimeoutException: | |
| return 0, "timeout" | |
| except httpx.HTTPError as e: | |
| return 0, f"transport_error:{type(e).__name__}" | |
| # ---------- auto-fix strategies ---------- | |
| def _looks_like_pdf_url(url: str) -> bool: | |
| """Heuristic: corpus expects a PDF at this URL.""" | |
| # Direct .pdf paths | |
| if ".pdf" in url.lower().split("?", 1)[0]: | |
| return True | |
| # Wayback-wrapped originals: https://web.archive.org/web/<ts>/<original> | |
| if "web.archive.org/web/" in url and ".pdf" in url.lower(): | |
| return True | |
| return False | |
| def _validate_pdf_candidate(url: str, client: httpx.Client) -> bool: | |
| """Confirm a candidate URL ACTUALLY serves a PDF (Bug #X 2026-05-28). | |
| Background: HEAD on the same path can return 200 with text/html (server | |
| interstitial) while GET returns 200 with %PDF β and vice versa on | |
| transient failures. The previous auto-fix accepted any candidate that | |
| returned 200 to a HEAD, which silently downgraded httpsβhttp and | |
| swapped live URLs for HTML-only Wayback wrappers. Validate by | |
| GET-Range and check the file magic. | |
| """ | |
| try: | |
| r = client.get( | |
| url, | |
| headers={**HEADERS, "Range": "bytes=0-31"}, | |
| follow_redirects=True, | |
| timeout=15, | |
| ) | |
| if r.status_code not in (200, 206): | |
| return False | |
| ct = (r.headers.get("Content-Type") or "").lower() | |
| body = r.content[:4] | |
| # Accept either explicit PDF content-type OR %PDF magic β some | |
| # CDNs send octet-stream / generic types for PDF assets. | |
| return body == b"%PDF" or "pdf" in ct | |
| except (httpx.HTTPError, ValueError): | |
| return False | |
| def _wayback_raw_variant(url: str) -> str: | |
| """Rewrite a Wayback snapshot URL to its `id_` raw variant so the | |
| archive serves the original PDF bytes instead of the HTML wrapper. | |
| https://web.archive.org/web/<ts>/<orig> --> | |
| https://web.archive.org/web/<ts>id_/<orig>""" | |
| m = re.match(r"^(https?://web\.archive\.org/web/\d+)(/.+)$", url) | |
| if not m: | |
| return url | |
| return f"{m.group(1)}id_{m.group(2)}" | |
| def try_wayback(url: str, client: httpx.Client) -> str | None: | |
| """Return a working Wayback Machine snapshot URL, or None. | |
| For PDF URLs, rewrite to the `id_` raw variant and verify the | |
| snapshot actually serves the PDF bytes (some snapshots are HTML | |
| error pages with status 200). | |
| """ | |
| try: | |
| r = client.get( | |
| "https://archive.org/wayback/available", | |
| params={"url": url}, | |
| headers=HEADERS, | |
| timeout=15, | |
| ) | |
| if r.status_code != 200: | |
| return None | |
| snap = r.json().get("archived_snapshots", {}).get("closest", {}) | |
| if not (snap.get("available") and snap.get("status", "").startswith("2")): | |
| return None | |
| candidate = snap.get("url") | |
| if not candidate: | |
| return None | |
| if _looks_like_pdf_url(url): | |
| candidate = _wayback_raw_variant(candidate) | |
| if not _validate_pdf_candidate(candidate, client): | |
| return None | |
| return candidate | |
| except (httpx.HTTPError, ValueError): | |
| return None | |
| def try_canonicalise(url: str, client: httpx.Client) -> str | None: | |
| """Strip query strings, flip http<->https. | |
| For PDF URLs, the candidate MUST serve actual PDF bytes (not just | |
| return HTTP 200) β HEAD alone is too credulous on servers that | |
| answer interstitial HTML for HEAD but real PDF for GET. | |
| """ | |
| candidates = [] | |
| if "?" in url: | |
| candidates.append(url.split("?", 1)[0]) | |
| if url.startswith("http://"): | |
| candidates.append("https://" + url[len("http://") :]) | |
| elif url.startswith("https://"): | |
| candidates.append("http://" + url[len("https://") :]) | |
| expect_pdf = _looks_like_pdf_url(url) | |
| for c in candidates: | |
| s, _ = head_check(c, client) | |
| if not (200 <= s < 400): | |
| continue | |
| if expect_pdf and not _validate_pdf_candidate(c, client): | |
| continue | |
| return c | |
| return None | |
| def auto_fix(url: str, client: httpx.Client) -> tuple[str | None, str]: | |
| """Return (replacement_url, strategy) or (None, "").""" | |
| for strat_name, strat in ( | |
| ("canonicalise", try_canonicalise), | |
| ("wayback", try_wayback), | |
| ): | |
| fix = strat(url, client) | |
| if fix: | |
| return fix, strat_name | |
| return None, "" | |
| def apply_patch(old: str, new: str, files: list[Path]) -> int: | |
| """Replace `old` with `new` in every distinct file; return file-count patched.""" | |
| count = 0 | |
| for f in set(files): | |
| try: | |
| content = f.read_text() | |
| if old in content: | |
| f.write_text(content.replace(old, new)) | |
| count += 1 | |
| except OSError: | |
| continue | |
| return count | |
| # ---------- main ---------- | |
| def main() -> int: | |
| urls = collect_urls() | |
| if not urls: | |
| print("[link-rot] no URLs found β KB layout changed?", file=sys.stderr) | |
| return 2 | |
| started = time.strftime("%Y-%m-%dT%H:%M:%S%z") | |
| fixed: list[tuple[str, str, str]] = [] # (old, new, strategy) | |
| still_dead: list[tuple[str, int, str, list[str]]] = [] | |
| allowlist = load_browser_allowlist() | |
| allowlisted_count = 0 | |
| with LOG_FILE.open("a") as fp, httpx.Client(timeout=TIMEOUT) as client: | |
| fp.write(f"\n=== run start {started} | {len(urls)} URLs | allowlist={len(allowlist)} ===\n") | |
| for url, refs in urls.items(): | |
| # Skip URLs a real browser has already verified work β until TTL expires. | |
| # These are typically bot-protected hosts (Akamai/Cloudflare/DataDome) | |
| # that httpx cannot HEAD but render fine for end users. | |
| if is_allowlisted(url, allowlist): | |
| fp.write(json.dumps({"url": url, "browser_allowlisted": allowlist[url].get("ts")}) + "\n") | |
| allowlisted_count += 1 | |
| continue | |
| status, note = head_check(url, client) | |
| ok = 200 <= status < 400 | |
| entry = { | |
| "ts": time.strftime("%Y-%m-%dT%H:%M:%S%z"), | |
| "url": url, | |
| "status": status, | |
| "ok": ok, | |
| "note": note, | |
| "sources": [lbl for lbl, _ in refs], | |
| } | |
| if ok: | |
| fp.write(json.dumps(entry) + "\n") | |
| continue | |
| # auto-fix attempt | |
| new_url, strat = auto_fix(url, client) | |
| if new_url: | |
| files = [src for _, src in refs] | |
| patched = apply_patch(url, new_url, files) | |
| entry["auto_fix"] = {"strategy": strat, "new_url": new_url, "files_patched": patched} | |
| fixed.append((url, new_url, strat)) | |
| else: | |
| still_dead.append((url, status, note, [lbl for lbl, _ in refs])) | |
| fp.write(json.dumps(entry) + "\n") | |
| # MUST_FIX.md report β overwritten every run so it always reflects current state | |
| if still_dead: | |
| lines = [ | |
| "# Link-rot β manual fix required", | |
| "", | |
| f"Run: {started}", | |
| f"Auto-fixed: {len(fixed)} Still dead: {len(still_dead)}", | |
| "", | |
| "| status | url | sources |", | |
| "|---|---|---|", | |
| ] | |
| for url, status, _note, sources in still_dead: | |
| lines.append(f"| {status} | {url} | {', '.join(sources)} |") | |
| MUST_FIX.write_text("\n".join(lines) + "\n") | |
| notify( | |
| "Insurance Bot β link rot", | |
| f"{len(still_dead)} dead URLs need manual fix. See MUST_FIX.md", | |
| ) | |
| elif MUST_FIX.exists(): | |
| MUST_FIX.unlink() # clean up stale report | |
| print( | |
| f"[link-rot] total {len(urls)} | browser-allowlisted {allowlisted_count} | " | |
| f"http-checked {len(urls) - allowlisted_count} | auto-fixed {len(fixed)} | " | |
| f"still dead {len(still_dead)}" | |
| ) | |
| for old, new, strat in fixed[:10]: | |
| print(f" FIXED ({strat}): {old}\n -> {new}") | |
| for url, status, note, sources in still_dead[:10]: | |
| print(f" DEAD [{status}]: {url} ({note}) β {','.join(sources)}") | |
| return 1 if still_dead else 0 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |