polyglot-alpha / scripts /audit_rss_sources.py
licaomeng
deploy: main@8970ffb → HF Spaces (2026-05-27T05:19Z)
88d2f2a
"""Audit reachability of every RSS source listed in ``sources.json``.
For each source, perform a real HTTP GET and report:
- HTTP status code
- Round-trip latency (ms)
- Number of feed entries parsed by feedparser
- Bytes fetched
- Final verdict: ``OK``, ``BAD_STATUS``, ``TIMEOUT``, ``CONN_ERR``, ``PARSE_ERR``
This script is the W13-C diagnostic counterpart to ``rss_aggregator.py``: it
exists so an operator can reproduce, on demand, the per-source health view
without spinning up the full API surface. Run it before claiming "live mode
works" — if fewer than ``MIN_HEALTHY_SOURCES`` succeed, the live RSS path
will fail-loud at runtime.
Usage::
python scripts/audit_rss_sources.py # default 15s timeout
python scripts/audit_rss_sources.py --timeout 30
python scripts/audit_rss_sources.py --json # machine-readable
Exit code:
0 if at least ``--min-healthy`` sources succeed (default 2)
1 otherwise — surface to CI / operator
"""
from __future__ import annotations
import argparse
import asyncio
import json
import sys
import time
from dataclasses import asdict, dataclass
from pathlib import Path
import feedparser
import httpx
# Make ``polyglot_alpha`` importable when the script is run from the repo root
# without an installed package.
_REPO_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(_REPO_ROOT))
from polyglot_alpha.ingestion.rss_aggregator import ( # noqa: E402
DEFAULT_USER_AGENT,
load_sources,
)
DEFAULT_TIMEOUT_S: float = 15.0
DEFAULT_MIN_HEALTHY: int = 2
@dataclass(frozen=True)
class AuditResult:
name: str
url: str
language: str
status_code: int | None
latency_ms: int
bytes_fetched: int
entries_parsed: int
verdict: str
error: str | None
def healthy(self) -> bool:
return self.verdict == "OK" and self.entries_parsed > 0
async def _audit_one(
client: httpx.AsyncClient, src: dict[str, object]
) -> AuditResult:
name = str(src.get("name", "<unnamed>"))
url = str(src.get("url", ""))
language = str(src.get("language", "unknown"))
if not src.get("enabled", True):
return AuditResult(
name=name,
url=url,
language=language,
status_code=None,
latency_ms=0,
bytes_fetched=0,
entries_parsed=0,
verdict="DISABLED",
error=str(src.get("disabled_reason") or "marked enabled=false"),
)
started = time.perf_counter()
try:
resp = await client.get(url)
except httpx.TimeoutException as exc:
return AuditResult(
name=name,
url=url,
language=language,
status_code=None,
latency_ms=int((time.perf_counter() - started) * 1000),
bytes_fetched=0,
entries_parsed=0,
verdict="TIMEOUT",
error=str(exc) or "request timed out",
)
except httpx.HTTPError as exc:
return AuditResult(
name=name,
url=url,
language=language,
status_code=None,
latency_ms=int((time.perf_counter() - started) * 1000),
bytes_fetched=0,
entries_parsed=0,
verdict="CONN_ERR",
error=str(exc),
)
latency_ms = int((time.perf_counter() - started) * 1000)
body = resp.content or b""
if resp.status_code >= 400:
return AuditResult(
name=name,
url=url,
language=language,
status_code=resp.status_code,
latency_ms=latency_ms,
bytes_fetched=len(body),
entries_parsed=0,
verdict="BAD_STATUS",
error=f"HTTP {resp.status_code}",
)
# 2xx-3xx: try to parse to confirm it's really a feed.
try:
parsed = feedparser.parse(body)
entries = len(getattr(parsed, "entries", []) or [])
except Exception as exc: # pragma: no cover - feedparser is robust
return AuditResult(
name=name,
url=url,
language=language,
status_code=resp.status_code,
latency_ms=latency_ms,
bytes_fetched=len(body),
entries_parsed=0,
verdict="PARSE_ERR",
error=str(exc),
)
return AuditResult(
name=name,
url=url,
language=language,
status_code=resp.status_code,
latency_ms=latency_ms,
bytes_fetched=len(body),
entries_parsed=entries,
verdict="OK" if entries > 0 else "PARSE_ERR",
error=None if entries > 0 else "feed parsed to zero entries",
)
async def audit_all(timeout: float) -> list[AuditResult]:
sources = load_sources()
async with httpx.AsyncClient(
headers={"User-Agent": DEFAULT_USER_AGENT},
timeout=timeout,
follow_redirects=True,
) as client:
return await asyncio.gather(*(_audit_one(client, s) for s in sources))
def _print_table(results: list[AuditResult]) -> None:
headers = ("Source", "URL", "Lang", "Status", "Latency", "Entries", "Verdict")
rows: list[tuple[str, ...]] = []
for r in results:
status_str = str(r.status_code) if r.status_code is not None else "-"
url_display = r.url if len(r.url) <= 60 else r.url[:57] + "..."
rows.append(
(
r.name,
url_display,
r.language,
status_str,
f"{r.latency_ms}ms",
str(r.entries_parsed),
r.verdict,
)
)
widths = [
max(len(h), *(len(row[i]) for row in rows)) for i, h in enumerate(headers)
]
fmt = " | ".join(f"{{:<{w}}}" for w in widths)
print(fmt.format(*headers))
print("-+-".join("-" * w for w in widths))
for row in rows:
print(fmt.format(*row))
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--timeout",
type=float,
default=DEFAULT_TIMEOUT_S,
help="Per-request HTTP timeout in seconds (default: %(default)s).",
)
parser.add_argument(
"--min-healthy",
type=int,
default=DEFAULT_MIN_HEALTHY,
help=(
"Exit non-zero unless at least this many sources are healthy "
"(default: %(default)s)."
),
)
parser.add_argument(
"--json",
action="store_true",
help="Emit JSON instead of a human-readable table.",
)
args = parser.parse_args()
results = asyncio.run(audit_all(timeout=args.timeout))
healthy = sum(1 for r in results if r.healthy())
disabled = sum(1 for r in results if r.verdict == "DISABLED")
total = len(results)
broken = total - healthy - disabled
if args.json:
payload = {
"total_sources": total,
"healthy": healthy,
"broken": broken,
"disabled": disabled,
"results": [asdict(r) for r in results],
}
print(json.dumps(payload, indent=2, ensure_ascii=False))
else:
_print_table(results)
print()
print(
f"Summary: {healthy}/{total} healthy · "
f"{broken} broken · {disabled} disabled · "
f"min required = {args.min_healthy}"
)
if broken:
print("Broken sources:")
for r in results:
if not r.healthy() and r.verdict != "DISABLED":
print(f" - {r.name}: {r.verdict} ({r.error or '?'})")
return 0 if healthy >= args.min_healthy else 1
if __name__ == "__main__":
sys.exit(main())