from __future__ import annotations import sys from pathlib import Path ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from src.analytics import ( # noqa: E402 MIN_CONFIG_N, config_leaderboard, make_decision_brief, overview_metrics, policy_curve, retrieval_outcomes, risk_slices, ) from src.data import filter_eval, filter_retrieval_events, load_bundle, schema_report # noqa: E402 def test_load_bundle_and_schema_report() -> None: bundle = load_bundle(ROOT / "data", ROOT / "docs") assert len(bundle.eval_runs) == 3824 assert len(bundle.retrieval_events) == 93375 assert schema_report(bundle)["status"].isin(["pass", "review"]).all() def test_filtered_retrieval_events_align_with_eval_filters() -> None: bundle = load_bundle(ROOT / "data", ROOT / "docs") filtered_eval = filter_eval(bundle.eval_runs, domains=["financial_reports"]) filtered_retrieval = filter_retrieval_events(bundle.retrieval_events, filtered_eval) assert len(filtered_eval) == 771 assert len(filtered_retrieval) == 18947 assert set(filtered_retrieval["example_id"].astype(str)).issubset(set(filtered_eval["example_id"].astype(str))) def test_core_analytics_tables_are_available() -> None: bundle = load_bundle(ROOT / "data", ROOT / "docs") metrics = overview_metrics(bundle.eval_runs, bundle.documents, bundle.chunks, bundle.retrieval_events) assert metrics["evaluations"] == 3824 assert not risk_slices(bundle.eval_runs).empty assert not retrieval_outcomes(bundle.eval_runs).empty assert not config_leaderboard(bundle.eval_runs, min_n=MIN_CONFIG_N).empty assert not policy_curve(bundle.eval_runs).empty brief = make_decision_brief(bundle.eval_runs, bundle.documents, bundle.chunks, bundle.retrieval_events) assert brief.posture in {"Stable", "Watch", "High Risk", "Review"}