rag-qa-command-cente / src /dashboard.py
Tarek Masryo
chore: update project files
6bef416
from __future__ import annotations
import pandas as pd
import streamlit as st
from src.analytics import (
MIN_CONFIG_N,
RETRIEVAL_OK_THRESHOLD,
demand_coverage,
make_decision_brief,
overview_metrics,
retrieval_outcomes,
risk_slices,
)
from src.app_state import DashboardContext, FilterState
from src.data import DataBundle, filter_eval, filter_retrieval_events, load_bundle, option_values
from src.formatting import fmt_int, fmt_latency_ms, fmt_money, fmt_pct
from src.models import APP_TITLE, AppSettings
from src.ui import badge, decision_strip, hero, inject_css, metric_card
from src.views import (
ConfigComparisonViewMixin,
ExportCenterViewMixin,
OverviewViewMixin,
PolicySimulatorViewMixin,
QualityMapViewMixin,
RetrievalLabViewMixin,
RiskBoardViewMixin,
TraceExplorerViewMixin,
)
@st.cache_data(show_spinner="Loading RAG QA logs and corpus tables...")
def cached_load_bundle(data_dir: str, docs_dir: str) -> DataBundle:
return load_bundle(data_dir, docs_dir)
class CommandCenterApp(
OverviewViewMixin,
QualityMapViewMixin,
RetrievalLabViewMixin,
RiskBoardViewMixin,
ConfigComparisonViewMixin,
PolicySimulatorViewMixin,
TraceExplorerViewMixin,
ExportCenterViewMixin,
):
"""Thin Streamlit controller for the RAG QA Command Center.
The controller owns application state and page orchestration. Individual tabs live
in focused view mixins under ``src/views`` so UI concerns stay separated from
data loading, analytics, chart construction, and formatting helpers.
"""
def __init__(self, settings: AppSettings | None = None) -> None:
self.settings = settings or AppSettings()
self.bundle: DataBundle | None = None
self.filter_state: FilterState | None = None
self.context: DashboardContext | None = None
def run(self) -> None:
st.set_page_config(
page_title=APP_TITLE,
page_icon=self.settings.page_icon,
layout="wide",
initial_sidebar_state="expanded",
)
inject_css()
try:
self.bundle = cached_load_bundle(self.settings.data_dir, self.settings.docs_dir)
except Exception as exc: # pragma: no cover - rendered in Streamlit runtime
st.error(str(exc))
st.stop()
self.filter_state = self._render_sidebar(self.bundle.eval_runs)
self.context = self._build_context(self.bundle, self.filter_state)
self._render_header(self.context, self.filter_state)
if self.context.filtered_eval.empty:
st.warning("No rows match the current filters. Relax the sidebar filters to continue.")
st.stop()
self._render_kpis(self.context.metrics)
self._render_pages()
def _render_sidebar(self, eval_df: pd.DataFrame) -> FilterState:
st.sidebar.markdown("### Filters")
preset = st.sidebar.selectbox(
"Focus preset",
["All", "Error review", "Hallucination risk", "Retrieval failure", "High latency", "High cost"],
index=0,
help="Quickly narrow the evaluation rows to a common review queue.",
)
domains = st.sidebar.multiselect("Domain", option_values(eval_df, "domain"))
difficulties = st.sidebar.multiselect("Difficulty", option_values(eval_df, "difficulty"))
scenario_types = st.sidebar.multiselect("Scenario type", option_values(eval_df, "scenario_type"))
retrievers = st.sidebar.multiselect("Retriever", option_values(eval_df, "retrieval_strategy"))
generators = st.sidebar.multiselect("Generator", option_values(eval_df, "generator_model"))
splits = st.sidebar.multiselect("Split", option_values(eval_df, "split"))
min_slice_n = st.sidebar.slider(
"Minimum rows per risk slice",
min_value=10,
max_value=200,
value=30,
step=5,
help="Higher values reduce noisy small-sample slices.",
)
min_config_n = st.sidebar.slider(
"Minimum rows per config",
min_value=20,
max_value=250,
value=MIN_CONFIG_N,
step=5,
help="Higher values make configuration comparisons more reliable.",
)
return FilterState(
preset=preset,
domains=domains,
difficulties=difficulties,
scenario_types=scenario_types,
retrievers=retrievers,
generators=generators,
splits=splits,
min_slice_n=min_slice_n,
min_config_n=min_config_n,
)
def _build_context(self, bundle: DataBundle, filters: FilterState) -> DashboardContext:
filtered_eval = filter_eval(
bundle.eval_runs,
domains=filters.domains,
difficulties=filters.difficulties,
scenario_types=filters.scenario_types,
retrievers=filters.retrievers,
generators=filters.generators,
splits=filters.splits,
)
filtered_eval = self._apply_preset(filtered_eval, filters.preset)
filtered_retrieval = filter_retrieval_events(bundle.retrieval_events, filtered_eval)
metrics = overview_metrics(filtered_eval, bundle.documents, bundle.chunks, filtered_retrieval)
risk_table = risk_slices(filtered_eval, min_n=filters.min_slice_n)
retrieval_table = retrieval_outcomes(filtered_eval)
brief = make_decision_brief(
filtered_eval,
bundle.documents,
bundle.chunks,
filtered_retrieval,
min_slice_n=filters.min_slice_n,
min_config_n=filters.min_config_n,
risk_table=risk_table,
retrieval_table=retrieval_table,
)
return DashboardContext(
filtered_eval=filtered_eval,
filtered_retrieval=filtered_retrieval,
metrics=metrics,
brief=brief,
risk_slices=risk_table,
retrieval_outcomes=retrieval_table,
demand_coverage=demand_coverage(filtered_eval, bundle.documents),
)
@staticmethod
def _apply_preset(df: pd.DataFrame, preset: str) -> pd.DataFrame:
out = df.copy()
if preset == "Error review" and "is_correct" in out.columns:
return out[pd.to_numeric(out["is_correct"], errors="coerce").fillna(0.0) < 0.5]
if preset == "Hallucination risk" and "hallucination_flag" in out.columns:
return out[pd.to_numeric(out["hallucination_flag"], errors="coerce").fillna(0.0) >= 0.5]
if preset == "Retrieval failure" and "recall_at_10" in out.columns:
return out[pd.to_numeric(out["recall_at_10"], errors="coerce").fillna(0.0) < RETRIEVAL_OK_THRESHOLD]
if preset == "High latency" and "total_latency_ms" in out.columns:
cutoff = pd.to_numeric(out["total_latency_ms"], errors="coerce").quantile(0.90)
return out[pd.to_numeric(out["total_latency_ms"], errors="coerce") >= cutoff]
if preset == "High cost" and "total_cost_usd" in out.columns:
cutoff = pd.to_numeric(out["total_cost_usd"], errors="coerce").quantile(0.90)
return out[pd.to_numeric(out["total_cost_usd"], errors="coerce") >= cutoff]
return out
def _render_header(self, context: DashboardContext, filters: FilterState) -> None:
posture_kind = "good" if context.brief.posture == "Stable" else "bad" if context.brief.posture == "High Risk" else "warn"
hero(
APP_TITLE,
"Inspect offline RAG QA evaluation logs across quality, retrieval behavior, hallucination exposure, latency, cost, review thresholds, and example-level evidence.",
badges=[
badge(f"Posture: {context.brief.posture}", posture_kind),
badge(f"Preset: {filters.preset}", "info"),
badge(f"Evaluations: {fmt_int(context.metrics['evaluations'])}", "info"),
badge(f"Retrieval events: {fmt_int(context.metrics['retrieval_events'])}", "info"),
badge("Offline evaluation dataset", "warn"),
],
)
decision_strip(
[
("Quality posture", context.brief.posture, context.brief.posture_reason),
("Main driver", context.brief.main_driver, "Dominant non-healthy issue pattern under the active filters."),
("Highest-risk slice", context.brief.worst_slice, "Uses the current minimum risk-slice size."),
("Best config", context.brief.best_config, "Uses the current minimum config sample size."),
("Recommended action", context.brief.recommended_action, "Suggested next step from the filtered evidence."),
]
)
@staticmethod
def _render_kpis(metrics: dict[str, float]) -> None:
cols = st.columns(6)
card_specs = [
("Answer correctness", fmt_pct(metrics["correct_rate"]), "Share of examples judged correct.", "stable" if metrics["correct_rate"] >= 0.75 else "watch"),
("Hallucination rate", fmt_pct(metrics["hallucination_rate"]), "Lower is better; elevated segments should be reviewed.", "risk" if metrics["hallucination_rate"] >= 0.18 else "stable"),
("Recall@10", fmt_pct(metrics["recall_at_10"]), "Retrieval coverage signal at top 10 chunks.", "stable" if metrics["recall_at_10"] >= 0.65 else "watch"),
("MRR@10", fmt_pct(metrics["mrr_at_10"]), "Rank-sensitive retrieval quality signal.", "stable" if metrics["mrr_at_10"] >= 0.45 else "watch"),
("P95 latency", fmt_latency_ms(metrics["p95_latency_ms"]), "Tail latency for selected evaluation rows.", "watch"),
("Avg cost", fmt_money(metrics["avg_cost_usd"]), "Average estimated cost per run.", "info"),
]
for col, spec in zip(cols, card_specs, strict=False):
with col:
metric_card(*spec)
def _render_pages(self) -> None:
tabs = st.tabs(
[
"Overview",
"Quality Map",
"Retrieval Lab",
"Risk Board",
"Config Comparison",
"Policy Simulator",
"Trace Explorer",
"Export Center",
]
)
with tabs[0]:
self._page_overview()
with tabs[1]:
self._page_quality_map()
with tabs[2]:
self._page_retrieval_lab()
with tabs[3]:
self._page_risk_board()
with tabs[4]:
self._page_config_comparison()
with tabs[5]:
self._page_policy_simulator()
with tabs[6]:
self._page_trace_explorer()
with tabs[7]:
self._page_export_center()
def _plot(self, fig, key: str) -> None:
st.plotly_chart(
fig,
use_container_width=True,
key=f"plot_{key}",
config={"displayModeBar": False, "responsive": True},
)
@property
def ctx(self) -> DashboardContext:
if self.context is None:
raise RuntimeError("Dashboard context has not been built.")
return self.context
@property
def data(self) -> DataBundle:
if self.bundle is None:
raise RuntimeError("Data bundle has not been loaded.")
return self.bundle
@property
def filters(self) -> FilterState:
if self.filter_state is None:
raise RuntimeError("Filter state has not been collected.")
return self.filter_state