| from __future__ import annotations |
|
|
| from collections.abc import Iterable, Sequence |
| from dataclasses import dataclass |
| from pathlib import Path |
|
|
| import pandas as pd |
|
|
| MISSING_LABEL = "Missing / Not provided" |
|
|
|
|
| @dataclass(frozen=True) |
| class DataBundle: |
| eval_runs: pd.DataFrame |
| retrieval_events: pd.DataFrame |
| documents: pd.DataFrame |
| chunks: pd.DataFrame |
| scenarios: pd.DataFrame |
| dictionary: pd.DataFrame |
|
|
|
|
| @dataclass(frozen=True) |
| class DataPaths: |
| data_dir: Path = Path("data") |
| docs_dir: Path = Path("docs") |
|
|
|
|
| DATA_FILES: dict[str, list[str]] = { |
| "eval_runs": ["eval_runs.csv", "rag_qa_eval_runs.csv"], |
| "retrieval_events": ["rag_retrieval_events.csv", "retrieval_events.csv"], |
| "documents": ["rag_corpus_documents.csv", "documents.csv"], |
| "chunks": ["rag_corpus_chunks.csv", "chunks.csv"], |
| "scenarios": ["scenarios.csv", "rag_qa_scenarios.csv"], |
| "dictionary": ["data_dictionary.csv"], |
| } |
|
|
|
|
| REQUIRED_COLUMNS: dict[str, list[str]] = { |
| "eval_runs": [ |
| "example_id", |
| "run_id", |
| "scenario_id", |
| "domain", |
| "difficulty", |
| "is_correct", |
| "hallucination_flag", |
| "retrieval_strategy", |
| "generator_model", |
| "recall_at_10", |
| "mrr_at_10", |
| "total_latency_ms", |
| "total_cost_usd", |
| ], |
| "retrieval_events": ["example_id", "rank", "chunk_id", "retrieval_score", "is_relevant"], |
| "documents": ["doc_id", "domain", "title", "n_chunks", "n_tokens"], |
| "chunks": ["chunk_id", "doc_id", "domain", "chunk_text"], |
| "scenarios": ["scenario_id", "scenario_type", "difficulty_level"], |
| } |
|
|
|
|
| UNIQUE_KEYS: dict[str, str] = { |
| "eval_runs": "example_id", |
| "documents": "doc_id", |
| "chunks": "chunk_id", |
| "scenarios": "scenario_id", |
| } |
|
|
| EVAL_RATE_COLUMNS = [ |
| "is_correct", |
| "hallucination_flag", |
| "recall_at_5", |
| "recall_at_10", |
| "mrr_at_10", |
| "has_answer_in_corpus", |
| "is_noanswer_probe", |
| "has_relevant_in_top5", |
| "has_relevant_in_top10", |
| "answered_without_retrieval", |
| ] |
|
|
| REQUIRED_NUMERIC_COLUMNS: dict[str, list[str]] = { |
| "eval_runs": [ |
| "is_correct", |
| "hallucination_flag", |
| "recall_at_10", |
| "mrr_at_10", |
| "total_latency_ms", |
| "total_cost_usd", |
| ], |
| "retrieval_events": ["rank", "retrieval_score", "is_relevant"], |
| "documents": ["n_chunks", "n_tokens"], |
| } |
|
|
| OPTIONAL_NUMERIC_COLUMNS: dict[str, list[str]] = { |
| "eval_runs": [ |
| "recall_at_5", |
| "top1_score", |
| "mean_retrieved_score", |
| "has_answer_in_corpus", |
| "is_noanswer_probe", |
| "has_relevant_in_top5", |
| "has_relevant_in_top10", |
| "answered_without_retrieval", |
| ], |
| "chunks": ["estimated_tokens"], |
| } |
|
|
|
|
| class DataContractError(ValueError): |
| """Raised when packaged tables violate a required dashboard contract.""" |
|
|
|
|
| class DataRepository: |
| """Load and standardize the packaged RAG QA tables.""" |
|
|
| def __init__(self, paths: DataPaths | None = None) -> None: |
| self.paths = paths or DataPaths() |
|
|
| def load(self) -> DataBundle: |
| data_root = self.paths.data_dir |
| docs_root = self.paths.docs_dir |
| if not data_root.exists(): |
| raise FileNotFoundError( |
| "Data directory was not found. Expected a local ./data directory or a Kaggle attached dataset. " |
| "On Kaggle, attach the RAG QA Logs & Corpus dataset from Data > Add Data and keep the expected CSV files available." |
| ) |
|
|
| eval_runs_raw = self._read_csv(self._find_file(data_root, DATA_FILES["eval_runs"])) |
| retrieval_events = standardize_retrieval_events(self._read_csv(self._find_file(data_root, DATA_FILES["retrieval_events"]))) |
| documents = self._read_csv(self._find_file(data_root, DATA_FILES["documents"])) |
| chunks = self._read_csv(self._find_file(data_root, DATA_FILES["chunks"])) |
| scenarios = self._read_csv(self._find_file(data_root, DATA_FILES["scenarios"])) |
| dictionary = self._load_dictionary(docs_root) |
|
|
| bundle = DataBundle( |
| eval_runs=standardize_eval(eval_runs_raw, scenarios), |
| retrieval_events=retrieval_events, |
| documents=documents, |
| chunks=chunks, |
| scenarios=scenarios, |
| dictionary=dictionary, |
| ) |
| validate_bundle(bundle) |
| return bundle |
|
|
| @staticmethod |
| def _find_file(root: Path, candidates: Iterable[str]) -> Path: |
| for name in candidates: |
| direct = root / name |
| if direct.exists(): |
| return direct |
| for name in candidates: |
| matches = sorted(root.rglob(name)) |
| if matches: |
| return matches[0] |
| raise FileNotFoundError( |
| f"Missing file. Expected one of: {', '.join(candidates)} under {root}. " |
| "If you are running this on Kaggle, attach the RAG QA Logs & Corpus dataset via Data > Add Data." |
| ) |
|
|
| @staticmethod |
| def _read_csv(path: Path) -> pd.DataFrame: |
| return pd.read_csv(path) |
|
|
| def _load_dictionary(self, docs_root: Path) -> pd.DataFrame: |
| if not docs_root.exists(): |
| return pd.DataFrame() |
| try: |
| return self._read_csv(self._find_file(docs_root, DATA_FILES["dictionary"])) |
| except FileNotFoundError: |
| return pd.DataFrame() |
|
|
|
|
| def _missing_columns(df: pd.DataFrame, required: Sequence[str]) -> list[str]: |
| return [col for col in required if col not in df.columns] |
|
|
|
|
| def _require_columns(table: str, df: pd.DataFrame) -> None: |
| missing = _missing_columns(df, REQUIRED_COLUMNS.get(table, [])) |
| if missing: |
| raise DataContractError(f"{table} is missing required columns: {', '.join(missing)}") |
|
|
|
|
| def _require_non_empty(table: str, df: pd.DataFrame) -> None: |
| if df.empty: |
| raise DataContractError(f"{table} must not be empty.") |
|
|
|
|
| def _missing_value_mask(series: pd.Series) -> pd.Series: |
| if pd.api.types.is_numeric_dtype(series): |
| return series.isna() |
| as_text = series.astype("string").str.strip() |
| return series.isna() | as_text.isin(["", "<NA>", "nan", "None"]) |
|
|
|
|
| def _require_no_missing(table: str, df: pd.DataFrame, columns: Sequence[str]) -> None: |
| for col in columns: |
| if col not in df.columns: |
| continue |
| missing = _missing_value_mask(df[col]) |
| if missing.any(): |
| raise DataContractError(f"{table}.{col} contains missing values.") |
|
|
|
|
| def _require_unique(table: str, df: pd.DataFrame, key: str) -> None: |
| if key not in df.columns: |
| raise DataContractError(f"{table} is missing primary key column: {key}") |
| _require_no_missing(table, df, [key]) |
| duplicated = df[key].astype(str).duplicated().sum() |
| if duplicated: |
| raise DataContractError(f"{table}.{key} contains {duplicated} duplicate values.") |
|
|
|
|
| def _require_subset( |
| child_table: str, |
| child_df: pd.DataFrame, |
| child_col: str, |
| parent_table: str, |
| parent_df: pd.DataFrame, |
| parent_col: str, |
| *, |
| allow_missing_child: bool = False, |
| ) -> None: |
| if child_col not in child_df.columns or parent_col not in parent_df.columns: |
| raise DataContractError(f"Cannot validate {child_table}.{child_col} -> {parent_table}.{parent_col}; one column is missing.") |
| if not allow_missing_child: |
| _require_no_missing(child_table, child_df, [child_col]) |
| _require_no_missing(parent_table, parent_df, [parent_col]) |
| child_values = set(child_df[child_col].dropna().astype(str)) |
| parent_values = set(parent_df[parent_col].dropna().astype(str)) |
| missing = child_values - parent_values |
| if missing: |
| sample = ", ".join(sorted(list(missing))[:5]) |
| raise DataContractError( |
| f"{child_table}.{child_col} contains {len(missing)} values missing from {parent_table}.{parent_col}: {sample}" |
| ) |
|
|
|
|
| def _coerce_numeric_series(table: str, series: pd.Series, col: str, *, allow_missing: bool) -> pd.Series: |
| missing = _missing_value_mask(series) |
| if pd.api.types.is_numeric_dtype(series): |
| converted = pd.to_numeric(series, errors="coerce") |
| non_numeric = pd.Series(False, index=series.index) |
| else: |
| normalized = series.astype("object").where(~missing, pd.NA) |
| converted = pd.to_numeric(normalized, errors="coerce") |
| non_numeric = ~missing & converted.isna() |
|
|
| if non_numeric.any(): |
| raise DataContractError(f"{table}.{col} contains non-numeric values.") |
| if not allow_missing and converted.isna().any(): |
| raise DataContractError(f"{table}.{col} contains missing numeric values.") |
| return converted |
|
|
|
|
| def _numeric_values(table: str, df: pd.DataFrame, col: str, *, allow_missing: bool) -> pd.Series: |
| if col not in df.columns: |
| return pd.Series(dtype="float64") |
| return _coerce_numeric_series(table, df[col], col, allow_missing=allow_missing).dropna() |
|
|
|
|
| def _require_numeric_columns(table: str, df: pd.DataFrame, columns: Sequence[str], *, allow_missing: bool) -> None: |
| for col in columns: |
| if col in df.columns: |
| _numeric_values(table, df, col, allow_missing=allow_missing) |
|
|
|
|
| def _require_unit_interval(table: str, df: pd.DataFrame, columns: Sequence[str]) -> None: |
| for col in columns: |
| if col not in df.columns: |
| continue |
| values = _numeric_values(table, df, col, allow_missing=True) |
| if values.empty: |
| continue |
| invalid = ~values.between(0, 1) |
| if invalid.any(): |
| raise DataContractError(f"{table}.{col} contains values outside [0, 1].") |
|
|
|
|
| def _require_numeric_minimum(table: str, df: pd.DataFrame, col: str, minimum: float, *, inclusive: bool = True) -> None: |
| if col not in df.columns: |
| return |
| values = _numeric_values(table, df, col, allow_missing=True) |
| if values.empty: |
| return |
| invalid = values < minimum if inclusive else values <= minimum |
| if invalid.any(): |
| qualifier = "at least" if inclusive else "greater than" |
| raise DataContractError(f"{table}.{col} must be {qualifier} {minimum}.") |
|
|
|
|
| def _require_integer_numeric(table: str, df: pd.DataFrame, col: str) -> None: |
| if col not in df.columns: |
| return |
| values = _numeric_values(table, df, col, allow_missing=True) |
| if values.empty: |
| return |
| if not (values % 1 == 0).all(): |
| raise DataContractError(f"{table}.{col} must contain integer values.") |
|
|
|
|
| def validate_bundle(bundle: DataBundle) -> None: |
| """Fail fast when packaged RAG QA data is internally inconsistent. |
| |
| The dashboard is intentionally self-contained, so the startup path validates |
| schema, primary keys, referential integrity, and basic metric ranges before |
| any review UI is rendered. |
| """ |
| frames = { |
| "eval_runs": bundle.eval_runs, |
| "retrieval_events": bundle.retrieval_events, |
| "documents": bundle.documents, |
| "chunks": bundle.chunks, |
| "scenarios": bundle.scenarios, |
| } |
| for table, df in frames.items(): |
| _require_non_empty(table, df) |
| _require_columns(table, df) |
|
|
| for table, key in UNIQUE_KEYS.items(): |
| _require_unique(table, frames[table], key) |
|
|
| _require_subset("eval_runs", bundle.eval_runs, "scenario_id", "scenarios", bundle.scenarios, "scenario_id") |
| _require_subset("retrieval_events", bundle.retrieval_events, "example_id", "eval_runs", bundle.eval_runs, "example_id") |
| _require_subset("retrieval_events", bundle.retrieval_events, "chunk_id", "chunks", bundle.chunks, "chunk_id") |
| _require_subset("chunks", bundle.chunks, "doc_id", "documents", bundle.documents, "doc_id") |
| if "primary_doc_id" in bundle.scenarios.columns and "doc_id" in bundle.documents.columns: |
| _require_subset( |
| "scenarios", |
| bundle.scenarios, |
| "primary_doc_id", |
| "documents", |
| bundle.documents, |
| "doc_id", |
| allow_missing_child=True, |
| ) |
|
|
| for table, columns in REQUIRED_NUMERIC_COLUMNS.items(): |
| _require_numeric_columns(table, frames[table], columns, allow_missing=False) |
| for table, columns in OPTIONAL_NUMERIC_COLUMNS.items(): |
| _require_numeric_columns(table, frames[table], columns, allow_missing=True) |
|
|
| _require_unit_interval("eval_runs", bundle.eval_runs, EVAL_RATE_COLUMNS) |
| _require_unit_interval("retrieval_events", bundle.retrieval_events, ["is_relevant"]) |
|
|
| for table, df, col, minimum, inclusive in [ |
| ("eval_runs", bundle.eval_runs, "total_latency_ms", 0.0, True), |
| ("eval_runs", bundle.eval_runs, "total_cost_usd", 0.0, True), |
| ("retrieval_events", bundle.retrieval_events, "rank", 1.0, True), |
| ("chunks", bundle.chunks, "estimated_tokens", 1.0, True), |
| ("documents", bundle.documents, "n_chunks", 1.0, True), |
| ("documents", bundle.documents, "n_tokens", 1.0, True), |
| ]: |
| _require_numeric_minimum(table, df, col, minimum, inclusive=inclusive) |
| _require_integer_numeric("retrieval_events", bundle.retrieval_events, "rank") |
|
|
|
|
| def load_bundle(data_dir: str | Path = "data", docs_dir: str | Path = "docs") -> DataBundle: |
| return DataRepository(DataPaths(Path(data_dir), Path(docs_dir))).load() |
|
|
|
|
| def standardize_eval(eval_runs: pd.DataFrame, scenarios: pd.DataFrame) -> pd.DataFrame: |
| raw_columns = list(eval_runs.columns) |
| df = eval_runs.copy() |
|
|
| if "scenario_type" not in df.columns and {"scenario_id", "scenario_type"}.issubset(scenarios.columns): |
| merge_cols = ["scenario_id", "scenario_type"] |
| if "difficulty_level" in scenarios.columns: |
| merge_cols.append("difficulty_level") |
| df = df.merge(scenarios[merge_cols].drop_duplicates("scenario_id"), on="scenario_id", how="left") |
|
|
| if "scenario_type" not in df.columns: |
| df["scenario_type"] = df["task_type"] if "task_type" in df.columns else MISSING_LABEL |
| if "difficulty" not in df.columns and "difficulty_level" in df.columns: |
| df["difficulty"] = df["difficulty_level"] |
| if "difficulty" not in df.columns: |
| df["difficulty"] = MISSING_LABEL |
|
|
| text_cols = ["domain", "scenario_type", "difficulty", "retrieval_strategy", "generator_model", "split"] |
| for col in text_cols: |
| if col in df.columns: |
| df[col] = df[col].astype("string").fillna(MISSING_LABEL) |
|
|
| if "total_latency_ms" not in df.columns: |
| latency_cols = [c for c in ["latency_ms_retrieval", "latency_ms_generation"] if c in df.columns] |
| df["total_latency_ms"] = df[latency_cols].sum(axis=1, min_count=1) if latency_cols else pd.NA |
| if "total_cost_usd" not in df.columns: |
| df["total_cost_usd"] = pd.NA |
|
|
| numeric_cols = list(dict.fromkeys(REQUIRED_NUMERIC_COLUMNS["eval_runs"] + OPTIONAL_NUMERIC_COLUMNS["eval_runs"])) |
| for col in numeric_cols: |
| if col in df.columns: |
| df[col] = _coerce_numeric_series( |
| "eval_runs", |
| df[col], |
| col, |
| allow_missing=col not in REQUIRED_NUMERIC_COLUMNS["eval_runs"], |
| ) |
|
|
| df.attrs["raw_columns"] = raw_columns |
| return df |
|
|
|
|
| def standardize_retrieval_events(retrieval_events: pd.DataFrame) -> pd.DataFrame: |
| raw_columns = list(retrieval_events.columns) |
| df = retrieval_events.copy() |
| for col in ["query_domain", "difficulty", "retrieval_strategy", "split"]: |
| if col in df.columns: |
| df[col] = df[col].astype("string").fillna(MISSING_LABEL) |
| for col in REQUIRED_NUMERIC_COLUMNS["retrieval_events"]: |
| if col in df.columns: |
| df[col] = _coerce_numeric_series("retrieval_events", df[col], col, allow_missing=False) |
| df.attrs["raw_columns"] = raw_columns |
| return df |
|
|
|
|
| def schema_report(bundle: DataBundle) -> pd.DataFrame: |
| frames = { |
| "eval_runs": bundle.eval_runs, |
| "retrieval_events": bundle.retrieval_events, |
| "documents": bundle.documents, |
| "chunks": bundle.chunks, |
| "scenarios": bundle.scenarios, |
| } |
| rows = [] |
| for name, df in frames.items(): |
| required = REQUIRED_COLUMNS.get(name, []) |
| source_columns = set(df.attrs.get("raw_columns", list(df.columns))) |
| missing = [col for col in required if col not in source_columns] |
| rows.append( |
| { |
| "table": name, |
| "rows": len(df), |
| "columns": df.shape[1], |
| "required_missing_raw": ", ".join(missing) if missing else "none", |
| "status": "pass" if not missing else "review", |
| } |
| ) |
| return pd.DataFrame(rows) |
|
|
|
|
| def filter_eval( |
| df: pd.DataFrame, |
| domains: Sequence[str] | None = None, |
| difficulties: Sequence[str] | None = None, |
| scenario_types: Sequence[str] | None = None, |
| retrievers: Sequence[str] | None = None, |
| generators: Sequence[str] | None = None, |
| splits: Sequence[str] | None = None, |
| ) -> pd.DataFrame: |
| out = df.copy() |
| filters = { |
| "domain": domains, |
| "difficulty": difficulties, |
| "scenario_type": scenario_types, |
| "retrieval_strategy": retrievers, |
| "generator_model": generators, |
| "split": splits, |
| } |
| for col, values in filters.items(): |
| if values and col in out.columns: |
| out = out[out[col].astype(str).isin([str(v) for v in values])] |
| return out |
|
|
|
|
| def filter_retrieval_events(retrieval_df: pd.DataFrame, eval_df: pd.DataFrame) -> pd.DataFrame: |
| """Return retrieval rows that belong to the currently filtered evaluation examples.""" |
| if retrieval_df.empty or eval_df.empty: |
| return retrieval_df.iloc[0:0].copy() |
| if "example_id" not in retrieval_df.columns or "example_id" not in eval_df.columns: |
| return retrieval_df.copy() |
| allowed = set(eval_df["example_id"].astype(str)) |
| return retrieval_df[retrieval_df["example_id"].astype(str).isin(allowed)].copy() |
|
|
|
|
| def option_values(df: pd.DataFrame, col: str) -> list[str]: |
| if col not in df.columns: |
| return [] |
| values = df[col].astype("string").fillna(MISSING_LABEL).unique().tolist() |
| return sorted([str(value) for value in values]) |
|
|