from __future__ import annotations from collections.abc import Iterable, Sequence from dataclasses import dataclass from pathlib import Path import pandas as pd MISSING_LABEL = "Missing / Not provided" @dataclass(frozen=True) class DataBundle: eval_runs: pd.DataFrame retrieval_events: pd.DataFrame documents: pd.DataFrame chunks: pd.DataFrame scenarios: pd.DataFrame dictionary: pd.DataFrame @dataclass(frozen=True) class DataPaths: data_dir: Path = Path("data") docs_dir: Path = Path("docs") DATA_FILES: dict[str, list[str]] = { "eval_runs": ["eval_runs.csv", "rag_qa_eval_runs.csv"], "retrieval_events": ["rag_retrieval_events.csv", "retrieval_events.csv"], "documents": ["rag_corpus_documents.csv", "documents.csv"], "chunks": ["rag_corpus_chunks.csv", "chunks.csv"], "scenarios": ["scenarios.csv", "rag_qa_scenarios.csv"], "dictionary": ["data_dictionary.csv"], } REQUIRED_COLUMNS: dict[str, list[str]] = { "eval_runs": [ "example_id", "run_id", "scenario_id", "domain", "difficulty", "is_correct", "hallucination_flag", "retrieval_strategy", "generator_model", "recall_at_10", "mrr_at_10", "total_latency_ms", "total_cost_usd", ], "retrieval_events": ["example_id", "rank", "chunk_id", "retrieval_score", "is_relevant"], "documents": ["doc_id", "domain", "title", "n_chunks", "n_tokens"], "chunks": ["chunk_id", "doc_id", "domain", "chunk_text"], "scenarios": ["scenario_id", "scenario_type", "difficulty_level"], } UNIQUE_KEYS: dict[str, str] = { "eval_runs": "example_id", "documents": "doc_id", "chunks": "chunk_id", "scenarios": "scenario_id", } EVAL_RATE_COLUMNS = [ "is_correct", "hallucination_flag", "recall_at_5", "recall_at_10", "mrr_at_10", "has_answer_in_corpus", "is_noanswer_probe", "has_relevant_in_top5", "has_relevant_in_top10", "answered_without_retrieval", ] REQUIRED_NUMERIC_COLUMNS: dict[str, list[str]] = { "eval_runs": [ "is_correct", "hallucination_flag", "recall_at_10", "mrr_at_10", "total_latency_ms", "total_cost_usd", ], "retrieval_events": ["rank", "retrieval_score", "is_relevant"], "documents": ["n_chunks", "n_tokens"], } OPTIONAL_NUMERIC_COLUMNS: dict[str, list[str]] = { "eval_runs": [ "recall_at_5", "top1_score", "mean_retrieved_score", "has_answer_in_corpus", "is_noanswer_probe", "has_relevant_in_top5", "has_relevant_in_top10", "answered_without_retrieval", ], "chunks": ["estimated_tokens"], } class DataContractError(ValueError): """Raised when packaged tables violate a required dashboard contract.""" class DataRepository: """Load and standardize the packaged RAG QA tables.""" def __init__(self, paths: DataPaths | None = None) -> None: self.paths = paths or DataPaths() def load(self) -> DataBundle: data_root = self.paths.data_dir docs_root = self.paths.docs_dir if not data_root.exists(): raise FileNotFoundError( "Data directory was not found. Expected a local ./data directory or a Kaggle attached dataset. " "On Kaggle, attach the RAG QA Logs & Corpus dataset from Data > Add Data and keep the expected CSV files available." ) eval_runs_raw = self._read_csv(self._find_file(data_root, DATA_FILES["eval_runs"])) retrieval_events = standardize_retrieval_events(self._read_csv(self._find_file(data_root, DATA_FILES["retrieval_events"]))) documents = self._read_csv(self._find_file(data_root, DATA_FILES["documents"])) chunks = self._read_csv(self._find_file(data_root, DATA_FILES["chunks"])) scenarios = self._read_csv(self._find_file(data_root, DATA_FILES["scenarios"])) dictionary = self._load_dictionary(docs_root) bundle = DataBundle( eval_runs=standardize_eval(eval_runs_raw, scenarios), retrieval_events=retrieval_events, documents=documents, chunks=chunks, scenarios=scenarios, dictionary=dictionary, ) validate_bundle(bundle) return bundle @staticmethod def _find_file(root: Path, candidates: Iterable[str]) -> Path: for name in candidates: direct = root / name if direct.exists(): return direct for name in candidates: matches = sorted(root.rglob(name)) if matches: return matches[0] raise FileNotFoundError( f"Missing file. Expected one of: {', '.join(candidates)} under {root}. " "If you are running this on Kaggle, attach the RAG QA Logs & Corpus dataset via Data > Add Data." ) @staticmethod def _read_csv(path: Path) -> pd.DataFrame: return pd.read_csv(path) def _load_dictionary(self, docs_root: Path) -> pd.DataFrame: if not docs_root.exists(): return pd.DataFrame() try: return self._read_csv(self._find_file(docs_root, DATA_FILES["dictionary"])) except FileNotFoundError: return pd.DataFrame() def _missing_columns(df: pd.DataFrame, required: Sequence[str]) -> list[str]: return [col for col in required if col not in df.columns] def _require_columns(table: str, df: pd.DataFrame) -> None: missing = _missing_columns(df, REQUIRED_COLUMNS.get(table, [])) if missing: raise DataContractError(f"{table} is missing required columns: {', '.join(missing)}") def _require_non_empty(table: str, df: pd.DataFrame) -> None: if df.empty: raise DataContractError(f"{table} must not be empty.") def _missing_value_mask(series: pd.Series) -> pd.Series: if pd.api.types.is_numeric_dtype(series): return series.isna() as_text = series.astype("string").str.strip() return series.isna() | as_text.isin(["", "", "nan", "None"]) def _require_no_missing(table: str, df: pd.DataFrame, columns: Sequence[str]) -> None: for col in columns: if col not in df.columns: continue missing = _missing_value_mask(df[col]) if missing.any(): raise DataContractError(f"{table}.{col} contains missing values.") def _require_unique(table: str, df: pd.DataFrame, key: str) -> None: if key not in df.columns: raise DataContractError(f"{table} is missing primary key column: {key}") _require_no_missing(table, df, [key]) duplicated = df[key].astype(str).duplicated().sum() if duplicated: raise DataContractError(f"{table}.{key} contains {duplicated} duplicate values.") def _require_subset( child_table: str, child_df: pd.DataFrame, child_col: str, parent_table: str, parent_df: pd.DataFrame, parent_col: str, *, allow_missing_child: bool = False, ) -> None: if child_col not in child_df.columns or parent_col not in parent_df.columns: raise DataContractError(f"Cannot validate {child_table}.{child_col} -> {parent_table}.{parent_col}; one column is missing.") if not allow_missing_child: _require_no_missing(child_table, child_df, [child_col]) _require_no_missing(parent_table, parent_df, [parent_col]) child_values = set(child_df[child_col].dropna().astype(str)) parent_values = set(parent_df[parent_col].dropna().astype(str)) missing = child_values - parent_values if missing: sample = ", ".join(sorted(list(missing))[:5]) raise DataContractError( f"{child_table}.{child_col} contains {len(missing)} values missing from {parent_table}.{parent_col}: {sample}" ) def _coerce_numeric_series(table: str, series: pd.Series, col: str, *, allow_missing: bool) -> pd.Series: missing = _missing_value_mask(series) if pd.api.types.is_numeric_dtype(series): converted = pd.to_numeric(series, errors="coerce") non_numeric = pd.Series(False, index=series.index) else: normalized = series.astype("object").where(~missing, pd.NA) converted = pd.to_numeric(normalized, errors="coerce") non_numeric = ~missing & converted.isna() if non_numeric.any(): raise DataContractError(f"{table}.{col} contains non-numeric values.") if not allow_missing and converted.isna().any(): raise DataContractError(f"{table}.{col} contains missing numeric values.") return converted def _numeric_values(table: str, df: pd.DataFrame, col: str, *, allow_missing: bool) -> pd.Series: if col not in df.columns: return pd.Series(dtype="float64") return _coerce_numeric_series(table, df[col], col, allow_missing=allow_missing).dropna() def _require_numeric_columns(table: str, df: pd.DataFrame, columns: Sequence[str], *, allow_missing: bool) -> None: for col in columns: if col in df.columns: _numeric_values(table, df, col, allow_missing=allow_missing) def _require_unit_interval(table: str, df: pd.DataFrame, columns: Sequence[str]) -> None: for col in columns: if col not in df.columns: continue values = _numeric_values(table, df, col, allow_missing=True) if values.empty: continue invalid = ~values.between(0, 1) if invalid.any(): raise DataContractError(f"{table}.{col} contains values outside [0, 1].") def _require_numeric_minimum(table: str, df: pd.DataFrame, col: str, minimum: float, *, inclusive: bool = True) -> None: if col not in df.columns: return values = _numeric_values(table, df, col, allow_missing=True) if values.empty: return invalid = values < minimum if inclusive else values <= minimum if invalid.any(): qualifier = "at least" if inclusive else "greater than" raise DataContractError(f"{table}.{col} must be {qualifier} {minimum}.") def _require_integer_numeric(table: str, df: pd.DataFrame, col: str) -> None: if col not in df.columns: return values = _numeric_values(table, df, col, allow_missing=True) if values.empty: return if not (values % 1 == 0).all(): raise DataContractError(f"{table}.{col} must contain integer values.") def validate_bundle(bundle: DataBundle) -> None: """Fail fast when packaged RAG QA data is internally inconsistent. The dashboard is intentionally self-contained, so the startup path validates schema, primary keys, referential integrity, and basic metric ranges before any review UI is rendered. """ frames = { "eval_runs": bundle.eval_runs, "retrieval_events": bundle.retrieval_events, "documents": bundle.documents, "chunks": bundle.chunks, "scenarios": bundle.scenarios, } for table, df in frames.items(): _require_non_empty(table, df) _require_columns(table, df) for table, key in UNIQUE_KEYS.items(): _require_unique(table, frames[table], key) _require_subset("eval_runs", bundle.eval_runs, "scenario_id", "scenarios", bundle.scenarios, "scenario_id") _require_subset("retrieval_events", bundle.retrieval_events, "example_id", "eval_runs", bundle.eval_runs, "example_id") _require_subset("retrieval_events", bundle.retrieval_events, "chunk_id", "chunks", bundle.chunks, "chunk_id") _require_subset("chunks", bundle.chunks, "doc_id", "documents", bundle.documents, "doc_id") if "primary_doc_id" in bundle.scenarios.columns and "doc_id" in bundle.documents.columns: _require_subset( "scenarios", bundle.scenarios, "primary_doc_id", "documents", bundle.documents, "doc_id", allow_missing_child=True, ) for table, columns in REQUIRED_NUMERIC_COLUMNS.items(): _require_numeric_columns(table, frames[table], columns, allow_missing=False) for table, columns in OPTIONAL_NUMERIC_COLUMNS.items(): _require_numeric_columns(table, frames[table], columns, allow_missing=True) _require_unit_interval("eval_runs", bundle.eval_runs, EVAL_RATE_COLUMNS) _require_unit_interval("retrieval_events", bundle.retrieval_events, ["is_relevant"]) for table, df, col, minimum, inclusive in [ ("eval_runs", bundle.eval_runs, "total_latency_ms", 0.0, True), ("eval_runs", bundle.eval_runs, "total_cost_usd", 0.0, True), ("retrieval_events", bundle.retrieval_events, "rank", 1.0, True), ("chunks", bundle.chunks, "estimated_tokens", 1.0, True), ("documents", bundle.documents, "n_chunks", 1.0, True), ("documents", bundle.documents, "n_tokens", 1.0, True), ]: _require_numeric_minimum(table, df, col, minimum, inclusive=inclusive) _require_integer_numeric("retrieval_events", bundle.retrieval_events, "rank") def load_bundle(data_dir: str | Path = "data", docs_dir: str | Path = "docs") -> DataBundle: return DataRepository(DataPaths(Path(data_dir), Path(docs_dir))).load() def standardize_eval(eval_runs: pd.DataFrame, scenarios: pd.DataFrame) -> pd.DataFrame: raw_columns = list(eval_runs.columns) df = eval_runs.copy() if "scenario_type" not in df.columns and {"scenario_id", "scenario_type"}.issubset(scenarios.columns): merge_cols = ["scenario_id", "scenario_type"] if "difficulty_level" in scenarios.columns: merge_cols.append("difficulty_level") df = df.merge(scenarios[merge_cols].drop_duplicates("scenario_id"), on="scenario_id", how="left") if "scenario_type" not in df.columns: df["scenario_type"] = df["task_type"] if "task_type" in df.columns else MISSING_LABEL if "difficulty" not in df.columns and "difficulty_level" in df.columns: df["difficulty"] = df["difficulty_level"] if "difficulty" not in df.columns: df["difficulty"] = MISSING_LABEL text_cols = ["domain", "scenario_type", "difficulty", "retrieval_strategy", "generator_model", "split"] for col in text_cols: if col in df.columns: df[col] = df[col].astype("string").fillna(MISSING_LABEL) if "total_latency_ms" not in df.columns: latency_cols = [c for c in ["latency_ms_retrieval", "latency_ms_generation"] if c in df.columns] df["total_latency_ms"] = df[latency_cols].sum(axis=1, min_count=1) if latency_cols else pd.NA if "total_cost_usd" not in df.columns: df["total_cost_usd"] = pd.NA numeric_cols = list(dict.fromkeys(REQUIRED_NUMERIC_COLUMNS["eval_runs"] + OPTIONAL_NUMERIC_COLUMNS["eval_runs"])) for col in numeric_cols: if col in df.columns: df[col] = _coerce_numeric_series( "eval_runs", df[col], col, allow_missing=col not in REQUIRED_NUMERIC_COLUMNS["eval_runs"], ) df.attrs["raw_columns"] = raw_columns return df def standardize_retrieval_events(retrieval_events: pd.DataFrame) -> pd.DataFrame: raw_columns = list(retrieval_events.columns) df = retrieval_events.copy() for col in ["query_domain", "difficulty", "retrieval_strategy", "split"]: if col in df.columns: df[col] = df[col].astype("string").fillna(MISSING_LABEL) for col in REQUIRED_NUMERIC_COLUMNS["retrieval_events"]: if col in df.columns: df[col] = _coerce_numeric_series("retrieval_events", df[col], col, allow_missing=False) df.attrs["raw_columns"] = raw_columns return df def schema_report(bundle: DataBundle) -> pd.DataFrame: frames = { "eval_runs": bundle.eval_runs, "retrieval_events": bundle.retrieval_events, "documents": bundle.documents, "chunks": bundle.chunks, "scenarios": bundle.scenarios, } rows = [] for name, df in frames.items(): required = REQUIRED_COLUMNS.get(name, []) source_columns = set(df.attrs.get("raw_columns", list(df.columns))) missing = [col for col in required if col not in source_columns] rows.append( { "table": name, "rows": len(df), "columns": df.shape[1], "required_missing_raw": ", ".join(missing) if missing else "none", "status": "pass" if not missing else "review", } ) return pd.DataFrame(rows) def filter_eval( df: pd.DataFrame, domains: Sequence[str] | None = None, difficulties: Sequence[str] | None = None, scenario_types: Sequence[str] | None = None, retrievers: Sequence[str] | None = None, generators: Sequence[str] | None = None, splits: Sequence[str] | None = None, ) -> pd.DataFrame: out = df.copy() filters = { "domain": domains, "difficulty": difficulties, "scenario_type": scenario_types, "retrieval_strategy": retrievers, "generator_model": generators, "split": splits, } for col, values in filters.items(): if values and col in out.columns: out = out[out[col].astype(str).isin([str(v) for v in values])] return out def filter_retrieval_events(retrieval_df: pd.DataFrame, eval_df: pd.DataFrame) -> pd.DataFrame: """Return retrieval rows that belong to the currently filtered evaluation examples.""" if retrieval_df.empty or eval_df.empty: return retrieval_df.iloc[0:0].copy() if "example_id" not in retrieval_df.columns or "example_id" not in eval_df.columns: return retrieval_df.copy() allowed = set(eval_df["example_id"].astype(str)) return retrieval_df[retrieval_df["example_id"].astype(str).isin(allowed)].copy() def option_values(df: pd.DataFrame, col: str) -> list[str]: if col not in df.columns: return [] values = df[col].astype("string").fillna(MISSING_LABEL).unique().tolist() return sorted([str(value) for value in values])