Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import pandas as pd | |
| import streamlit as st | |
| from src.analytics import ( | |
| MIN_CONFIG_N, | |
| RETRIEVAL_OK_THRESHOLD, | |
| demand_coverage, | |
| make_decision_brief, | |
| overview_metrics, | |
| retrieval_outcomes, | |
| risk_slices, | |
| ) | |
| from src.app_state import DashboardContext, FilterState | |
| from src.data import DataBundle, filter_eval, filter_retrieval_events, load_bundle, option_values | |
| from src.formatting import fmt_int, fmt_latency_ms, fmt_money, fmt_pct | |
| from src.models import APP_TITLE, AppSettings | |
| from src.ui import badge, decision_strip, hero, inject_css, metric_card | |
| from src.views import ( | |
| ConfigComparisonViewMixin, | |
| ExportCenterViewMixin, | |
| OverviewViewMixin, | |
| PolicySimulatorViewMixin, | |
| QualityMapViewMixin, | |
| RetrievalLabViewMixin, | |
| RiskBoardViewMixin, | |
| TraceExplorerViewMixin, | |
| ) | |
| def cached_load_bundle(data_dir: str, docs_dir: str) -> DataBundle: | |
| return load_bundle(data_dir, docs_dir) | |
| class CommandCenterApp( | |
| OverviewViewMixin, | |
| QualityMapViewMixin, | |
| RetrievalLabViewMixin, | |
| RiskBoardViewMixin, | |
| ConfigComparisonViewMixin, | |
| PolicySimulatorViewMixin, | |
| TraceExplorerViewMixin, | |
| ExportCenterViewMixin, | |
| ): | |
| """Thin Streamlit controller for the RAG QA Command Center. | |
| The controller owns application state and page orchestration. Individual tabs live | |
| in focused view mixins under ``src/views`` so UI concerns stay separated from | |
| data loading, analytics, chart construction, and formatting helpers. | |
| """ | |
| def __init__(self, settings: AppSettings | None = None) -> None: | |
| self.settings = settings or AppSettings() | |
| self.bundle: DataBundle | None = None | |
| self.filter_state: FilterState | None = None | |
| self.context: DashboardContext | None = None | |
| def run(self) -> None: | |
| st.set_page_config( | |
| page_title=APP_TITLE, | |
| page_icon=self.settings.page_icon, | |
| layout="wide", | |
| initial_sidebar_state="expanded", | |
| ) | |
| inject_css() | |
| try: | |
| self.bundle = cached_load_bundle(self.settings.data_dir, self.settings.docs_dir) | |
| except Exception as exc: # pragma: no cover - rendered in Streamlit runtime | |
| st.error(str(exc)) | |
| st.stop() | |
| self.filter_state = self._render_sidebar(self.bundle.eval_runs) | |
| self.context = self._build_context(self.bundle, self.filter_state) | |
| self._render_header(self.context, self.filter_state) | |
| if self.context.filtered_eval.empty: | |
| st.warning("No rows match the current filters. Relax the sidebar filters to continue.") | |
| st.stop() | |
| self._render_kpis(self.context.metrics) | |
| self._render_pages() | |
| def _render_sidebar(self, eval_df: pd.DataFrame) -> FilterState: | |
| st.sidebar.markdown("### Filters") | |
| preset = st.sidebar.selectbox( | |
| "Focus preset", | |
| ["All", "Error review", "Hallucination risk", "Retrieval failure", "High latency", "High cost"], | |
| index=0, | |
| help="Quickly narrow the evaluation rows to a common review queue.", | |
| ) | |
| domains = st.sidebar.multiselect("Domain", option_values(eval_df, "domain")) | |
| difficulties = st.sidebar.multiselect("Difficulty", option_values(eval_df, "difficulty")) | |
| scenario_types = st.sidebar.multiselect("Scenario type", option_values(eval_df, "scenario_type")) | |
| retrievers = st.sidebar.multiselect("Retriever", option_values(eval_df, "retrieval_strategy")) | |
| generators = st.sidebar.multiselect("Generator", option_values(eval_df, "generator_model")) | |
| splits = st.sidebar.multiselect("Split", option_values(eval_df, "split")) | |
| min_slice_n = st.sidebar.slider( | |
| "Minimum rows per risk slice", | |
| min_value=10, | |
| max_value=200, | |
| value=30, | |
| step=5, | |
| help="Higher values reduce noisy small-sample slices.", | |
| ) | |
| min_config_n = st.sidebar.slider( | |
| "Minimum rows per config", | |
| min_value=20, | |
| max_value=250, | |
| value=MIN_CONFIG_N, | |
| step=5, | |
| help="Higher values make configuration comparisons more reliable.", | |
| ) | |
| return FilterState( | |
| preset=preset, | |
| domains=domains, | |
| difficulties=difficulties, | |
| scenario_types=scenario_types, | |
| retrievers=retrievers, | |
| generators=generators, | |
| splits=splits, | |
| min_slice_n=min_slice_n, | |
| min_config_n=min_config_n, | |
| ) | |
| def _build_context(self, bundle: DataBundle, filters: FilterState) -> DashboardContext: | |
| filtered_eval = filter_eval( | |
| bundle.eval_runs, | |
| domains=filters.domains, | |
| difficulties=filters.difficulties, | |
| scenario_types=filters.scenario_types, | |
| retrievers=filters.retrievers, | |
| generators=filters.generators, | |
| splits=filters.splits, | |
| ) | |
| filtered_eval = self._apply_preset(filtered_eval, filters.preset) | |
| filtered_retrieval = filter_retrieval_events(bundle.retrieval_events, filtered_eval) | |
| metrics = overview_metrics(filtered_eval, bundle.documents, bundle.chunks, filtered_retrieval) | |
| risk_table = risk_slices(filtered_eval, min_n=filters.min_slice_n) | |
| retrieval_table = retrieval_outcomes(filtered_eval) | |
| brief = make_decision_brief( | |
| filtered_eval, | |
| bundle.documents, | |
| bundle.chunks, | |
| filtered_retrieval, | |
| min_slice_n=filters.min_slice_n, | |
| min_config_n=filters.min_config_n, | |
| risk_table=risk_table, | |
| retrieval_table=retrieval_table, | |
| ) | |
| return DashboardContext( | |
| filtered_eval=filtered_eval, | |
| filtered_retrieval=filtered_retrieval, | |
| metrics=metrics, | |
| brief=brief, | |
| risk_slices=risk_table, | |
| retrieval_outcomes=retrieval_table, | |
| demand_coverage=demand_coverage(filtered_eval, bundle.documents), | |
| ) | |
| def _apply_preset(df: pd.DataFrame, preset: str) -> pd.DataFrame: | |
| out = df.copy() | |
| if preset == "Error review" and "is_correct" in out.columns: | |
| return out[pd.to_numeric(out["is_correct"], errors="coerce").fillna(0.0) < 0.5] | |
| if preset == "Hallucination risk" and "hallucination_flag" in out.columns: | |
| return out[pd.to_numeric(out["hallucination_flag"], errors="coerce").fillna(0.0) >= 0.5] | |
| if preset == "Retrieval failure" and "recall_at_10" in out.columns: | |
| return out[pd.to_numeric(out["recall_at_10"], errors="coerce").fillna(0.0) < RETRIEVAL_OK_THRESHOLD] | |
| if preset == "High latency" and "total_latency_ms" in out.columns: | |
| cutoff = pd.to_numeric(out["total_latency_ms"], errors="coerce").quantile(0.90) | |
| return out[pd.to_numeric(out["total_latency_ms"], errors="coerce") >= cutoff] | |
| if preset == "High cost" and "total_cost_usd" in out.columns: | |
| cutoff = pd.to_numeric(out["total_cost_usd"], errors="coerce").quantile(0.90) | |
| return out[pd.to_numeric(out["total_cost_usd"], errors="coerce") >= cutoff] | |
| return out | |
| def _render_header(self, context: DashboardContext, filters: FilterState) -> None: | |
| posture_kind = "good" if context.brief.posture == "Stable" else "bad" if context.brief.posture == "High Risk" else "warn" | |
| hero( | |
| APP_TITLE, | |
| "Inspect offline RAG QA evaluation logs across quality, retrieval behavior, hallucination exposure, latency, cost, review thresholds, and example-level evidence.", | |
| badges=[ | |
| badge(f"Posture: {context.brief.posture}", posture_kind), | |
| badge(f"Preset: {filters.preset}", "info"), | |
| badge(f"Evaluations: {fmt_int(context.metrics['evaluations'])}", "info"), | |
| badge(f"Retrieval events: {fmt_int(context.metrics['retrieval_events'])}", "info"), | |
| badge("Offline evaluation dataset", "warn"), | |
| ], | |
| ) | |
| decision_strip( | |
| [ | |
| ("Quality posture", context.brief.posture, context.brief.posture_reason), | |
| ("Main driver", context.brief.main_driver, "Dominant non-healthy issue pattern under the active filters."), | |
| ("Highest-risk slice", context.brief.worst_slice, "Uses the current minimum risk-slice size."), | |
| ("Best config", context.brief.best_config, "Uses the current minimum config sample size."), | |
| ("Recommended action", context.brief.recommended_action, "Suggested next step from the filtered evidence."), | |
| ] | |
| ) | |
| def _render_kpis(metrics: dict[str, float]) -> None: | |
| cols = st.columns(6) | |
| card_specs = [ | |
| ("Answer correctness", fmt_pct(metrics["correct_rate"]), "Share of examples judged correct.", "stable" if metrics["correct_rate"] >= 0.75 else "watch"), | |
| ("Hallucination rate", fmt_pct(metrics["hallucination_rate"]), "Lower is better; elevated segments should be reviewed.", "risk" if metrics["hallucination_rate"] >= 0.18 else "stable"), | |
| ("Recall@10", fmt_pct(metrics["recall_at_10"]), "Retrieval coverage signal at top 10 chunks.", "stable" if metrics["recall_at_10"] >= 0.65 else "watch"), | |
| ("MRR@10", fmt_pct(metrics["mrr_at_10"]), "Rank-sensitive retrieval quality signal.", "stable" if metrics["mrr_at_10"] >= 0.45 else "watch"), | |
| ("P95 latency", fmt_latency_ms(metrics["p95_latency_ms"]), "Tail latency for selected evaluation rows.", "watch"), | |
| ("Avg cost", fmt_money(metrics["avg_cost_usd"]), "Average estimated cost per run.", "info"), | |
| ] | |
| for col, spec in zip(cols, card_specs, strict=False): | |
| with col: | |
| metric_card(*spec) | |
| def _render_pages(self) -> None: | |
| tabs = st.tabs( | |
| [ | |
| "Overview", | |
| "Quality Map", | |
| "Retrieval Lab", | |
| "Risk Board", | |
| "Config Comparison", | |
| "Policy Simulator", | |
| "Trace Explorer", | |
| "Export Center", | |
| ] | |
| ) | |
| with tabs[0]: | |
| self._page_overview() | |
| with tabs[1]: | |
| self._page_quality_map() | |
| with tabs[2]: | |
| self._page_retrieval_lab() | |
| with tabs[3]: | |
| self._page_risk_board() | |
| with tabs[4]: | |
| self._page_config_comparison() | |
| with tabs[5]: | |
| self._page_policy_simulator() | |
| with tabs[6]: | |
| self._page_trace_explorer() | |
| with tabs[7]: | |
| self._page_export_center() | |
| def _plot(self, fig, key: str) -> None: | |
| st.plotly_chart( | |
| fig, | |
| use_container_width=True, | |
| key=f"plot_{key}", | |
| config={"displayModeBar": False, "responsive": True}, | |
| ) | |
| def ctx(self) -> DashboardContext: | |
| if self.context is None: | |
| raise RuntimeError("Dashboard context has not been built.") | |
| return self.context | |
| def data(self) -> DataBundle: | |
| if self.bundle is None: | |
| raise RuntimeError("Data bundle has not been loaded.") | |
| return self.bundle | |
| def filters(self) -> FilterState: | |
| if self.filter_state is None: | |
| raise RuntimeError("Filter state has not been collected.") | |
| return self.filter_state | |