Spaces:
Running
Running
| """Audit reachability of every RSS source listed in ``sources.json``. | |
| For each source, perform a real HTTP GET and report: | |
| - HTTP status code | |
| - Round-trip latency (ms) | |
| - Number of feed entries parsed by feedparser | |
| - Bytes fetched | |
| - Final verdict: ``OK``, ``BAD_STATUS``, ``TIMEOUT``, ``CONN_ERR``, ``PARSE_ERR`` | |
| This script is the W13-C diagnostic counterpart to ``rss_aggregator.py``: it | |
| exists so an operator can reproduce, on demand, the per-source health view | |
| without spinning up the full API surface. Run it before claiming "live mode | |
| works" — if fewer than ``MIN_HEALTHY_SOURCES`` succeed, the live RSS path | |
| will fail-loud at runtime. | |
| Usage:: | |
| python scripts/audit_rss_sources.py # default 15s timeout | |
| python scripts/audit_rss_sources.py --timeout 30 | |
| python scripts/audit_rss_sources.py --json # machine-readable | |
| Exit code: | |
| 0 if at least ``--min-healthy`` sources succeed (default 2) | |
| 1 otherwise — surface to CI / operator | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import asyncio | |
| import json | |
| import sys | |
| import time | |
| from dataclasses import asdict, dataclass | |
| from pathlib import Path | |
| import feedparser | |
| import httpx | |
| # Make ``polyglot_alpha`` importable when the script is run from the repo root | |
| # without an installed package. | |
| _REPO_ROOT = Path(__file__).resolve().parents[1] | |
| sys.path.insert(0, str(_REPO_ROOT)) | |
| from polyglot_alpha.ingestion.rss_aggregator import ( # noqa: E402 | |
| DEFAULT_USER_AGENT, | |
| load_sources, | |
| ) | |
| DEFAULT_TIMEOUT_S: float = 15.0 | |
| DEFAULT_MIN_HEALTHY: int = 2 | |
| class AuditResult: | |
| name: str | |
| url: str | |
| language: str | |
| status_code: int | None | |
| latency_ms: int | |
| bytes_fetched: int | |
| entries_parsed: int | |
| verdict: str | |
| error: str | None | |
| def healthy(self) -> bool: | |
| return self.verdict == "OK" and self.entries_parsed > 0 | |
| async def _audit_one( | |
| client: httpx.AsyncClient, src: dict[str, object] | |
| ) -> AuditResult: | |
| name = str(src.get("name", "<unnamed>")) | |
| url = str(src.get("url", "")) | |
| language = str(src.get("language", "unknown")) | |
| if not src.get("enabled", True): | |
| return AuditResult( | |
| name=name, | |
| url=url, | |
| language=language, | |
| status_code=None, | |
| latency_ms=0, | |
| bytes_fetched=0, | |
| entries_parsed=0, | |
| verdict="DISABLED", | |
| error=str(src.get("disabled_reason") or "marked enabled=false"), | |
| ) | |
| started = time.perf_counter() | |
| try: | |
| resp = await client.get(url) | |
| except httpx.TimeoutException as exc: | |
| return AuditResult( | |
| name=name, | |
| url=url, | |
| language=language, | |
| status_code=None, | |
| latency_ms=int((time.perf_counter() - started) * 1000), | |
| bytes_fetched=0, | |
| entries_parsed=0, | |
| verdict="TIMEOUT", | |
| error=str(exc) or "request timed out", | |
| ) | |
| except httpx.HTTPError as exc: | |
| return AuditResult( | |
| name=name, | |
| url=url, | |
| language=language, | |
| status_code=None, | |
| latency_ms=int((time.perf_counter() - started) * 1000), | |
| bytes_fetched=0, | |
| entries_parsed=0, | |
| verdict="CONN_ERR", | |
| error=str(exc), | |
| ) | |
| latency_ms = int((time.perf_counter() - started) * 1000) | |
| body = resp.content or b"" | |
| if resp.status_code >= 400: | |
| return AuditResult( | |
| name=name, | |
| url=url, | |
| language=language, | |
| status_code=resp.status_code, | |
| latency_ms=latency_ms, | |
| bytes_fetched=len(body), | |
| entries_parsed=0, | |
| verdict="BAD_STATUS", | |
| error=f"HTTP {resp.status_code}", | |
| ) | |
| # 2xx-3xx: try to parse to confirm it's really a feed. | |
| try: | |
| parsed = feedparser.parse(body) | |
| entries = len(getattr(parsed, "entries", []) or []) | |
| except Exception as exc: # pragma: no cover - feedparser is robust | |
| return AuditResult( | |
| name=name, | |
| url=url, | |
| language=language, | |
| status_code=resp.status_code, | |
| latency_ms=latency_ms, | |
| bytes_fetched=len(body), | |
| entries_parsed=0, | |
| verdict="PARSE_ERR", | |
| error=str(exc), | |
| ) | |
| return AuditResult( | |
| name=name, | |
| url=url, | |
| language=language, | |
| status_code=resp.status_code, | |
| latency_ms=latency_ms, | |
| bytes_fetched=len(body), | |
| entries_parsed=entries, | |
| verdict="OK" if entries > 0 else "PARSE_ERR", | |
| error=None if entries > 0 else "feed parsed to zero entries", | |
| ) | |
| async def audit_all(timeout: float) -> list[AuditResult]: | |
| sources = load_sources() | |
| async with httpx.AsyncClient( | |
| headers={"User-Agent": DEFAULT_USER_AGENT}, | |
| timeout=timeout, | |
| follow_redirects=True, | |
| ) as client: | |
| return await asyncio.gather(*(_audit_one(client, s) for s in sources)) | |
| def _print_table(results: list[AuditResult]) -> None: | |
| headers = ("Source", "URL", "Lang", "Status", "Latency", "Entries", "Verdict") | |
| rows: list[tuple[str, ...]] = [] | |
| for r in results: | |
| status_str = str(r.status_code) if r.status_code is not None else "-" | |
| url_display = r.url if len(r.url) <= 60 else r.url[:57] + "..." | |
| rows.append( | |
| ( | |
| r.name, | |
| url_display, | |
| r.language, | |
| status_str, | |
| f"{r.latency_ms}ms", | |
| str(r.entries_parsed), | |
| r.verdict, | |
| ) | |
| ) | |
| widths = [ | |
| max(len(h), *(len(row[i]) for row in rows)) for i, h in enumerate(headers) | |
| ] | |
| fmt = " | ".join(f"{{:<{w}}}" for w in widths) | |
| print(fmt.format(*headers)) | |
| print("-+-".join("-" * w for w in widths)) | |
| for row in rows: | |
| print(fmt.format(*row)) | |
| def main() -> int: | |
| parser = argparse.ArgumentParser(description=__doc__) | |
| parser.add_argument( | |
| "--timeout", | |
| type=float, | |
| default=DEFAULT_TIMEOUT_S, | |
| help="Per-request HTTP timeout in seconds (default: %(default)s).", | |
| ) | |
| parser.add_argument( | |
| "--min-healthy", | |
| type=int, | |
| default=DEFAULT_MIN_HEALTHY, | |
| help=( | |
| "Exit non-zero unless at least this many sources are healthy " | |
| "(default: %(default)s)." | |
| ), | |
| ) | |
| parser.add_argument( | |
| "--json", | |
| action="store_true", | |
| help="Emit JSON instead of a human-readable table.", | |
| ) | |
| args = parser.parse_args() | |
| results = asyncio.run(audit_all(timeout=args.timeout)) | |
| healthy = sum(1 for r in results if r.healthy()) | |
| disabled = sum(1 for r in results if r.verdict == "DISABLED") | |
| total = len(results) | |
| broken = total - healthy - disabled | |
| if args.json: | |
| payload = { | |
| "total_sources": total, | |
| "healthy": healthy, | |
| "broken": broken, | |
| "disabled": disabled, | |
| "results": [asdict(r) for r in results], | |
| } | |
| print(json.dumps(payload, indent=2, ensure_ascii=False)) | |
| else: | |
| _print_table(results) | |
| print() | |
| print( | |
| f"Summary: {healthy}/{total} healthy · " | |
| f"{broken} broken · {disabled} disabled · " | |
| f"min required = {args.min_healthy}" | |
| ) | |
| if broken: | |
| print("Broken sources:") | |
| for r in results: | |
| if not r.healthy() and r.verdict != "DISABLED": | |
| print(f" - {r.name}: {r.verdict} ({r.error or '?'})") | |
| return 0 if healthy >= args.min_healthy else 1 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |