from __future__ import annotations import pandas as pd import streamlit as st from src.analytics import ( MIN_CONFIG_N, RETRIEVAL_OK_THRESHOLD, demand_coverage, make_decision_brief, overview_metrics, retrieval_outcomes, risk_slices, ) from src.app_state import DashboardContext, FilterState from src.data import DataBundle, filter_eval, filter_retrieval_events, load_bundle, option_values from src.formatting import fmt_int, fmt_latency_ms, fmt_money, fmt_pct from src.models import APP_TITLE, AppSettings from src.ui import badge, decision_strip, hero, inject_css, metric_card from src.views import ( ConfigComparisonViewMixin, ExportCenterViewMixin, OverviewViewMixin, PolicySimulatorViewMixin, QualityMapViewMixin, RetrievalLabViewMixin, RiskBoardViewMixin, TraceExplorerViewMixin, ) @st.cache_data(show_spinner="Loading RAG QA logs and corpus tables...") def cached_load_bundle(data_dir: str, docs_dir: str) -> DataBundle: return load_bundle(data_dir, docs_dir) class CommandCenterApp( OverviewViewMixin, QualityMapViewMixin, RetrievalLabViewMixin, RiskBoardViewMixin, ConfigComparisonViewMixin, PolicySimulatorViewMixin, TraceExplorerViewMixin, ExportCenterViewMixin, ): """Thin Streamlit controller for the RAG QA Command Center. The controller owns application state and page orchestration. Individual tabs live in focused view mixins under ``src/views`` so UI concerns stay separated from data loading, analytics, chart construction, and formatting helpers. """ def __init__(self, settings: AppSettings | None = None) -> None: self.settings = settings or AppSettings() self.bundle: DataBundle | None = None self.filter_state: FilterState | None = None self.context: DashboardContext | None = None def run(self) -> None: st.set_page_config( page_title=APP_TITLE, page_icon=self.settings.page_icon, layout="wide", initial_sidebar_state="expanded", ) inject_css() try: self.bundle = cached_load_bundle(self.settings.data_dir, self.settings.docs_dir) except Exception as exc: # pragma: no cover - rendered in Streamlit runtime st.error(str(exc)) st.stop() self.filter_state = self._render_sidebar(self.bundle.eval_runs) self.context = self._build_context(self.bundle, self.filter_state) self._render_header(self.context, self.filter_state) if self.context.filtered_eval.empty: st.warning("No rows match the current filters. Relax the sidebar filters to continue.") st.stop() self._render_kpis(self.context.metrics) self._render_pages() def _render_sidebar(self, eval_df: pd.DataFrame) -> FilterState: st.sidebar.markdown("### Filters") preset = st.sidebar.selectbox( "Focus preset", ["All", "Error review", "Hallucination risk", "Retrieval failure", "High latency", "High cost"], index=0, help="Quickly narrow the evaluation rows to a common review queue.", ) domains = st.sidebar.multiselect("Domain", option_values(eval_df, "domain")) difficulties = st.sidebar.multiselect("Difficulty", option_values(eval_df, "difficulty")) scenario_types = st.sidebar.multiselect("Scenario type", option_values(eval_df, "scenario_type")) retrievers = st.sidebar.multiselect("Retriever", option_values(eval_df, "retrieval_strategy")) generators = st.sidebar.multiselect("Generator", option_values(eval_df, "generator_model")) splits = st.sidebar.multiselect("Split", option_values(eval_df, "split")) min_slice_n = st.sidebar.slider( "Minimum rows per risk slice", min_value=10, max_value=200, value=30, step=5, help="Higher values reduce noisy small-sample slices.", ) min_config_n = st.sidebar.slider( "Minimum rows per config", min_value=20, max_value=250, value=MIN_CONFIG_N, step=5, help="Higher values make configuration comparisons more reliable.", ) return FilterState( preset=preset, domains=domains, difficulties=difficulties, scenario_types=scenario_types, retrievers=retrievers, generators=generators, splits=splits, min_slice_n=min_slice_n, min_config_n=min_config_n, ) def _build_context(self, bundle: DataBundle, filters: FilterState) -> DashboardContext: filtered_eval = filter_eval( bundle.eval_runs, domains=filters.domains, difficulties=filters.difficulties, scenario_types=filters.scenario_types, retrievers=filters.retrievers, generators=filters.generators, splits=filters.splits, ) filtered_eval = self._apply_preset(filtered_eval, filters.preset) filtered_retrieval = filter_retrieval_events(bundle.retrieval_events, filtered_eval) metrics = overview_metrics(filtered_eval, bundle.documents, bundle.chunks, filtered_retrieval) risk_table = risk_slices(filtered_eval, min_n=filters.min_slice_n) retrieval_table = retrieval_outcomes(filtered_eval) brief = make_decision_brief( filtered_eval, bundle.documents, bundle.chunks, filtered_retrieval, min_slice_n=filters.min_slice_n, min_config_n=filters.min_config_n, risk_table=risk_table, retrieval_table=retrieval_table, ) return DashboardContext( filtered_eval=filtered_eval, filtered_retrieval=filtered_retrieval, metrics=metrics, brief=brief, risk_slices=risk_table, retrieval_outcomes=retrieval_table, demand_coverage=demand_coverage(filtered_eval, bundle.documents), ) @staticmethod def _apply_preset(df: pd.DataFrame, preset: str) -> pd.DataFrame: out = df.copy() if preset == "Error review" and "is_correct" in out.columns: return out[pd.to_numeric(out["is_correct"], errors="coerce").fillna(0.0) < 0.5] if preset == "Hallucination risk" and "hallucination_flag" in out.columns: return out[pd.to_numeric(out["hallucination_flag"], errors="coerce").fillna(0.0) >= 0.5] if preset == "Retrieval failure" and "recall_at_10" in out.columns: return out[pd.to_numeric(out["recall_at_10"], errors="coerce").fillna(0.0) < RETRIEVAL_OK_THRESHOLD] if preset == "High latency" and "total_latency_ms" in out.columns: cutoff = pd.to_numeric(out["total_latency_ms"], errors="coerce").quantile(0.90) return out[pd.to_numeric(out["total_latency_ms"], errors="coerce") >= cutoff] if preset == "High cost" and "total_cost_usd" in out.columns: cutoff = pd.to_numeric(out["total_cost_usd"], errors="coerce").quantile(0.90) return out[pd.to_numeric(out["total_cost_usd"], errors="coerce") >= cutoff] return out def _render_header(self, context: DashboardContext, filters: FilterState) -> None: posture_kind = "good" if context.brief.posture == "Stable" else "bad" if context.brief.posture == "High Risk" else "warn" hero( APP_TITLE, "Inspect offline RAG QA evaluation logs across quality, retrieval behavior, hallucination exposure, latency, cost, review thresholds, and example-level evidence.", badges=[ badge(f"Posture: {context.brief.posture}", posture_kind), badge(f"Preset: {filters.preset}", "info"), badge(f"Evaluations: {fmt_int(context.metrics['evaluations'])}", "info"), badge(f"Retrieval events: {fmt_int(context.metrics['retrieval_events'])}", "info"), badge("Offline evaluation dataset", "warn"), ], ) decision_strip( [ ("Quality posture", context.brief.posture, context.brief.posture_reason), ("Main driver", context.brief.main_driver, "Dominant non-healthy issue pattern under the active filters."), ("Highest-risk slice", context.brief.worst_slice, "Uses the current minimum risk-slice size."), ("Best config", context.brief.best_config, "Uses the current minimum config sample size."), ("Recommended action", context.brief.recommended_action, "Suggested next step from the filtered evidence."), ] ) @staticmethod def _render_kpis(metrics: dict[str, float]) -> None: cols = st.columns(6) card_specs = [ ("Answer correctness", fmt_pct(metrics["correct_rate"]), "Share of examples judged correct.", "stable" if metrics["correct_rate"] >= 0.75 else "watch"), ("Hallucination rate", fmt_pct(metrics["hallucination_rate"]), "Lower is better; elevated segments should be reviewed.", "risk" if metrics["hallucination_rate"] >= 0.18 else "stable"), ("Recall@10", fmt_pct(metrics["recall_at_10"]), "Retrieval coverage signal at top 10 chunks.", "stable" if metrics["recall_at_10"] >= 0.65 else "watch"), ("MRR@10", fmt_pct(metrics["mrr_at_10"]), "Rank-sensitive retrieval quality signal.", "stable" if metrics["mrr_at_10"] >= 0.45 else "watch"), ("P95 latency", fmt_latency_ms(metrics["p95_latency_ms"]), "Tail latency for selected evaluation rows.", "watch"), ("Avg cost", fmt_money(metrics["avg_cost_usd"]), "Average estimated cost per run.", "info"), ] for col, spec in zip(cols, card_specs, strict=False): with col: metric_card(*spec) def _render_pages(self) -> None: tabs = st.tabs( [ "Overview", "Quality Map", "Retrieval Lab", "Risk Board", "Config Comparison", "Policy Simulator", "Trace Explorer", "Export Center", ] ) with tabs[0]: self._page_overview() with tabs[1]: self._page_quality_map() with tabs[2]: self._page_retrieval_lab() with tabs[3]: self._page_risk_board() with tabs[4]: self._page_config_comparison() with tabs[5]: self._page_policy_simulator() with tabs[6]: self._page_trace_explorer() with tabs[7]: self._page_export_center() def _plot(self, fig, key: str) -> None: st.plotly_chart( fig, use_container_width=True, key=f"plot_{key}", config={"displayModeBar": False, "responsive": True}, ) @property def ctx(self) -> DashboardContext: if self.context is None: raise RuntimeError("Dashboard context has not been built.") return self.context @property def data(self) -> DataBundle: if self.bundle is None: raise RuntimeError("Data bundle has not been loaded.") return self.bundle @property def filters(self) -> FilterState: if self.filter_state is None: raise RuntimeError("Filter state has not been collected.") return self.filter_state