#!/usr/bin/env python3 """Compare a fresh snapshot against the committed baseline. Usage: python scripts/diff_golden.py \ [--baseline tests/golden/snapshot.json] \ [--base-url http://localhost:8001] \ [--allow-timing-pct 25] Exit code 0 if fingerprints match. Non-zero if any query's ordered result fingerprint changed or if elapsed_ms regressed by more than the allowed percentage. """ from __future__ import annotations import argparse import json import sys import time from pathlib import Path REPO_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(REPO_ROOT / "scripts")) from golden_queries import GOLDEN_QUERIES # noqa: E402 from snapshot_golden import _fetch # noqa: E402 def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--baseline", default=str(REPO_ROOT / "tests" / "golden" / "snapshot.json")) parser.add_argument("--base-url", default="http://localhost:8001") parser.add_argument("--timeout", type=float, default=30.0) parser.add_argument("--allow-timing-pct", type=float, default=25.0) args = parser.parse_args() baseline_path = Path(args.baseline) if not baseline_path.exists(): print(f"Baseline {baseline_path} does not exist. Run snapshot_golden.py first.") return 2 baseline = json.loads(baseline_path.read_text()) baseline_by_id = {q["id"]: q for q in baseline["queries"]} drift = [] timing_regressions = [] missing_in_baseline = [] for query in GOLDEN_QUERIES: fresh = _fetch(args.base_url, query, args.timeout) bid = query["id"] prior = baseline_by_id.get(bid) if prior is None: missing_in_baseline.append(bid) continue if not fresh.get("ok"): drift.append((bid, "fresh fetch failed", fresh.get("error"))) continue if not prior.get("ok"): drift.append((bid, "baseline was a failure, now ok", None)) continue if fresh["fingerprint"] != prior["fingerprint"]: drift.append((bid, "fingerprint changed", None)) continue prior_ms = prior.get("elapsed_ms") or 1 fresh_ms = fresh.get("elapsed_ms") or 1 if prior_ms >= 50: # only flag timing on queries that take meaningful time growth_pct = (fresh_ms - prior_ms) / prior_ms * 100 if growth_pct > args.allow_timing_pct: timing_regressions.append((bid, prior_ms, fresh_ms, growth_pct)) print(f"\nBaseline captured: {baseline.get('captured_at')}") print(f"Fresh capture: {time.strftime('%Y-%m-%dT%H:%M:%S')}") print(f"Queries checked: {len(GOLDEN_QUERIES)}") if drift: print(f"\n{len(drift)} query(ies) drifted:") for bid, kind, extra in drift: print(f" - {bid}: {kind}" + (f" ({extra})" if extra else "")) if timing_regressions: print(f"\n{len(timing_regressions)} query(ies) regressed in timing:") for bid, prior_ms, fresh_ms, growth_pct in timing_regressions: print(f" - {bid}: {prior_ms}ms -> {fresh_ms}ms (+{growth_pct:.1f}%)") if missing_in_baseline: print(f"\n{len(missing_in_baseline)} query(ies) not in baseline (run snapshot_golden.py to add):") for bid in missing_in_baseline: print(f" - {bid}") if not drift and not timing_regressions: print("\nAll golden queries match baseline.") return 0 return 1 if __name__ == "__main__": raise SystemExit(main())