Tarek Masryo
chore: update project files
6bef416
from __future__ import annotations
from collections.abc import Iterable, Sequence
from dataclasses import dataclass
from pathlib import Path
import pandas as pd
MISSING_LABEL = "Missing / Not provided"
@dataclass(frozen=True)
class DataBundle:
eval_runs: pd.DataFrame
retrieval_events: pd.DataFrame
documents: pd.DataFrame
chunks: pd.DataFrame
scenarios: pd.DataFrame
dictionary: pd.DataFrame
@dataclass(frozen=True)
class DataPaths:
data_dir: Path = Path("data")
docs_dir: Path = Path("docs")
DATA_FILES: dict[str, list[str]] = {
"eval_runs": ["eval_runs.csv", "rag_qa_eval_runs.csv"],
"retrieval_events": ["rag_retrieval_events.csv", "retrieval_events.csv"],
"documents": ["rag_corpus_documents.csv", "documents.csv"],
"chunks": ["rag_corpus_chunks.csv", "chunks.csv"],
"scenarios": ["scenarios.csv", "rag_qa_scenarios.csv"],
"dictionary": ["data_dictionary.csv"],
}
REQUIRED_COLUMNS: dict[str, list[str]] = {
"eval_runs": [
"example_id",
"run_id",
"scenario_id",
"domain",
"difficulty",
"is_correct",
"hallucination_flag",
"retrieval_strategy",
"generator_model",
"recall_at_10",
"mrr_at_10",
"total_latency_ms",
"total_cost_usd",
],
"retrieval_events": ["example_id", "rank", "chunk_id", "retrieval_score", "is_relevant"],
"documents": ["doc_id", "domain", "title", "n_chunks", "n_tokens"],
"chunks": ["chunk_id", "doc_id", "domain", "chunk_text"],
"scenarios": ["scenario_id", "scenario_type", "difficulty_level"],
}
UNIQUE_KEYS: dict[str, str] = {
"eval_runs": "example_id",
"documents": "doc_id",
"chunks": "chunk_id",
"scenarios": "scenario_id",
}
EVAL_RATE_COLUMNS = [
"is_correct",
"hallucination_flag",
"recall_at_5",
"recall_at_10",
"mrr_at_10",
"has_answer_in_corpus",
"is_noanswer_probe",
"has_relevant_in_top5",
"has_relevant_in_top10",
"answered_without_retrieval",
]
REQUIRED_NUMERIC_COLUMNS: dict[str, list[str]] = {
"eval_runs": [
"is_correct",
"hallucination_flag",
"recall_at_10",
"mrr_at_10",
"total_latency_ms",
"total_cost_usd",
],
"retrieval_events": ["rank", "retrieval_score", "is_relevant"],
"documents": ["n_chunks", "n_tokens"],
}
OPTIONAL_NUMERIC_COLUMNS: dict[str, list[str]] = {
"eval_runs": [
"recall_at_5",
"top1_score",
"mean_retrieved_score",
"has_answer_in_corpus",
"is_noanswer_probe",
"has_relevant_in_top5",
"has_relevant_in_top10",
"answered_without_retrieval",
],
"chunks": ["estimated_tokens"],
}
class DataContractError(ValueError):
"""Raised when packaged tables violate a required dashboard contract."""
class DataRepository:
"""Load and standardize the packaged RAG QA tables."""
def __init__(self, paths: DataPaths | None = None) -> None:
self.paths = paths or DataPaths()
def load(self) -> DataBundle:
data_root = self.paths.data_dir
docs_root = self.paths.docs_dir
if not data_root.exists():
raise FileNotFoundError(
"Data directory was not found. Expected a local ./data directory or a Kaggle attached dataset. "
"On Kaggle, attach the RAG QA Logs & Corpus dataset from Data > Add Data and keep the expected CSV files available."
)
eval_runs_raw = self._read_csv(self._find_file(data_root, DATA_FILES["eval_runs"]))
retrieval_events = standardize_retrieval_events(self._read_csv(self._find_file(data_root, DATA_FILES["retrieval_events"])))
documents = self._read_csv(self._find_file(data_root, DATA_FILES["documents"]))
chunks = self._read_csv(self._find_file(data_root, DATA_FILES["chunks"]))
scenarios = self._read_csv(self._find_file(data_root, DATA_FILES["scenarios"]))
dictionary = self._load_dictionary(docs_root)
bundle = DataBundle(
eval_runs=standardize_eval(eval_runs_raw, scenarios),
retrieval_events=retrieval_events,
documents=documents,
chunks=chunks,
scenarios=scenarios,
dictionary=dictionary,
)
validate_bundle(bundle)
return bundle
@staticmethod
def _find_file(root: Path, candidates: Iterable[str]) -> Path:
for name in candidates:
direct = root / name
if direct.exists():
return direct
for name in candidates:
matches = sorted(root.rglob(name))
if matches:
return matches[0]
raise FileNotFoundError(
f"Missing file. Expected one of: {', '.join(candidates)} under {root}. "
"If you are running this on Kaggle, attach the RAG QA Logs & Corpus dataset via Data > Add Data."
)
@staticmethod
def _read_csv(path: Path) -> pd.DataFrame:
return pd.read_csv(path)
def _load_dictionary(self, docs_root: Path) -> pd.DataFrame:
if not docs_root.exists():
return pd.DataFrame()
try:
return self._read_csv(self._find_file(docs_root, DATA_FILES["dictionary"]))
except FileNotFoundError:
return pd.DataFrame()
def _missing_columns(df: pd.DataFrame, required: Sequence[str]) -> list[str]:
return [col for col in required if col not in df.columns]
def _require_columns(table: str, df: pd.DataFrame) -> None:
missing = _missing_columns(df, REQUIRED_COLUMNS.get(table, []))
if missing:
raise DataContractError(f"{table} is missing required columns: {', '.join(missing)}")
def _require_non_empty(table: str, df: pd.DataFrame) -> None:
if df.empty:
raise DataContractError(f"{table} must not be empty.")
def _missing_value_mask(series: pd.Series) -> pd.Series:
if pd.api.types.is_numeric_dtype(series):
return series.isna()
as_text = series.astype("string").str.strip()
return series.isna() | as_text.isin(["", "<NA>", "nan", "None"])
def _require_no_missing(table: str, df: pd.DataFrame, columns: Sequence[str]) -> None:
for col in columns:
if col not in df.columns:
continue
missing = _missing_value_mask(df[col])
if missing.any():
raise DataContractError(f"{table}.{col} contains missing values.")
def _require_unique(table: str, df: pd.DataFrame, key: str) -> None:
if key not in df.columns:
raise DataContractError(f"{table} is missing primary key column: {key}")
_require_no_missing(table, df, [key])
duplicated = df[key].astype(str).duplicated().sum()
if duplicated:
raise DataContractError(f"{table}.{key} contains {duplicated} duplicate values.")
def _require_subset(
child_table: str,
child_df: pd.DataFrame,
child_col: str,
parent_table: str,
parent_df: pd.DataFrame,
parent_col: str,
*,
allow_missing_child: bool = False,
) -> None:
if child_col not in child_df.columns or parent_col not in parent_df.columns:
raise DataContractError(f"Cannot validate {child_table}.{child_col} -> {parent_table}.{parent_col}; one column is missing.")
if not allow_missing_child:
_require_no_missing(child_table, child_df, [child_col])
_require_no_missing(parent_table, parent_df, [parent_col])
child_values = set(child_df[child_col].dropna().astype(str))
parent_values = set(parent_df[parent_col].dropna().astype(str))
missing = child_values - parent_values
if missing:
sample = ", ".join(sorted(list(missing))[:5])
raise DataContractError(
f"{child_table}.{child_col} contains {len(missing)} values missing from {parent_table}.{parent_col}: {sample}"
)
def _coerce_numeric_series(table: str, series: pd.Series, col: str, *, allow_missing: bool) -> pd.Series:
missing = _missing_value_mask(series)
if pd.api.types.is_numeric_dtype(series):
converted = pd.to_numeric(series, errors="coerce")
non_numeric = pd.Series(False, index=series.index)
else:
normalized = series.astype("object").where(~missing, pd.NA)
converted = pd.to_numeric(normalized, errors="coerce")
non_numeric = ~missing & converted.isna()
if non_numeric.any():
raise DataContractError(f"{table}.{col} contains non-numeric values.")
if not allow_missing and converted.isna().any():
raise DataContractError(f"{table}.{col} contains missing numeric values.")
return converted
def _numeric_values(table: str, df: pd.DataFrame, col: str, *, allow_missing: bool) -> pd.Series:
if col not in df.columns:
return pd.Series(dtype="float64")
return _coerce_numeric_series(table, df[col], col, allow_missing=allow_missing).dropna()
def _require_numeric_columns(table: str, df: pd.DataFrame, columns: Sequence[str], *, allow_missing: bool) -> None:
for col in columns:
if col in df.columns:
_numeric_values(table, df, col, allow_missing=allow_missing)
def _require_unit_interval(table: str, df: pd.DataFrame, columns: Sequence[str]) -> None:
for col in columns:
if col not in df.columns:
continue
values = _numeric_values(table, df, col, allow_missing=True)
if values.empty:
continue
invalid = ~values.between(0, 1)
if invalid.any():
raise DataContractError(f"{table}.{col} contains values outside [0, 1].")
def _require_numeric_minimum(table: str, df: pd.DataFrame, col: str, minimum: float, *, inclusive: bool = True) -> None:
if col not in df.columns:
return
values = _numeric_values(table, df, col, allow_missing=True)
if values.empty:
return
invalid = values < minimum if inclusive else values <= minimum
if invalid.any():
qualifier = "at least" if inclusive else "greater than"
raise DataContractError(f"{table}.{col} must be {qualifier} {minimum}.")
def _require_integer_numeric(table: str, df: pd.DataFrame, col: str) -> None:
if col not in df.columns:
return
values = _numeric_values(table, df, col, allow_missing=True)
if values.empty:
return
if not (values % 1 == 0).all():
raise DataContractError(f"{table}.{col} must contain integer values.")
def validate_bundle(bundle: DataBundle) -> None:
"""Fail fast when packaged RAG QA data is internally inconsistent.
The dashboard is intentionally self-contained, so the startup path validates
schema, primary keys, referential integrity, and basic metric ranges before
any review UI is rendered.
"""
frames = {
"eval_runs": bundle.eval_runs,
"retrieval_events": bundle.retrieval_events,
"documents": bundle.documents,
"chunks": bundle.chunks,
"scenarios": bundle.scenarios,
}
for table, df in frames.items():
_require_non_empty(table, df)
_require_columns(table, df)
for table, key in UNIQUE_KEYS.items():
_require_unique(table, frames[table], key)
_require_subset("eval_runs", bundle.eval_runs, "scenario_id", "scenarios", bundle.scenarios, "scenario_id")
_require_subset("retrieval_events", bundle.retrieval_events, "example_id", "eval_runs", bundle.eval_runs, "example_id")
_require_subset("retrieval_events", bundle.retrieval_events, "chunk_id", "chunks", bundle.chunks, "chunk_id")
_require_subset("chunks", bundle.chunks, "doc_id", "documents", bundle.documents, "doc_id")
if "primary_doc_id" in bundle.scenarios.columns and "doc_id" in bundle.documents.columns:
_require_subset(
"scenarios",
bundle.scenarios,
"primary_doc_id",
"documents",
bundle.documents,
"doc_id",
allow_missing_child=True,
)
for table, columns in REQUIRED_NUMERIC_COLUMNS.items():
_require_numeric_columns(table, frames[table], columns, allow_missing=False)
for table, columns in OPTIONAL_NUMERIC_COLUMNS.items():
_require_numeric_columns(table, frames[table], columns, allow_missing=True)
_require_unit_interval("eval_runs", bundle.eval_runs, EVAL_RATE_COLUMNS)
_require_unit_interval("retrieval_events", bundle.retrieval_events, ["is_relevant"])
for table, df, col, minimum, inclusive in [
("eval_runs", bundle.eval_runs, "total_latency_ms", 0.0, True),
("eval_runs", bundle.eval_runs, "total_cost_usd", 0.0, True),
("retrieval_events", bundle.retrieval_events, "rank", 1.0, True),
("chunks", bundle.chunks, "estimated_tokens", 1.0, True),
("documents", bundle.documents, "n_chunks", 1.0, True),
("documents", bundle.documents, "n_tokens", 1.0, True),
]:
_require_numeric_minimum(table, df, col, minimum, inclusive=inclusive)
_require_integer_numeric("retrieval_events", bundle.retrieval_events, "rank")
def load_bundle(data_dir: str | Path = "data", docs_dir: str | Path = "docs") -> DataBundle:
return DataRepository(DataPaths(Path(data_dir), Path(docs_dir))).load()
def standardize_eval(eval_runs: pd.DataFrame, scenarios: pd.DataFrame) -> pd.DataFrame:
raw_columns = list(eval_runs.columns)
df = eval_runs.copy()
if "scenario_type" not in df.columns and {"scenario_id", "scenario_type"}.issubset(scenarios.columns):
merge_cols = ["scenario_id", "scenario_type"]
if "difficulty_level" in scenarios.columns:
merge_cols.append("difficulty_level")
df = df.merge(scenarios[merge_cols].drop_duplicates("scenario_id"), on="scenario_id", how="left")
if "scenario_type" not in df.columns:
df["scenario_type"] = df["task_type"] if "task_type" in df.columns else MISSING_LABEL
if "difficulty" not in df.columns and "difficulty_level" in df.columns:
df["difficulty"] = df["difficulty_level"]
if "difficulty" not in df.columns:
df["difficulty"] = MISSING_LABEL
text_cols = ["domain", "scenario_type", "difficulty", "retrieval_strategy", "generator_model", "split"]
for col in text_cols:
if col in df.columns:
df[col] = df[col].astype("string").fillna(MISSING_LABEL)
if "total_latency_ms" not in df.columns:
latency_cols = [c for c in ["latency_ms_retrieval", "latency_ms_generation"] if c in df.columns]
df["total_latency_ms"] = df[latency_cols].sum(axis=1, min_count=1) if latency_cols else pd.NA
if "total_cost_usd" not in df.columns:
df["total_cost_usd"] = pd.NA
numeric_cols = list(dict.fromkeys(REQUIRED_NUMERIC_COLUMNS["eval_runs"] + OPTIONAL_NUMERIC_COLUMNS["eval_runs"]))
for col in numeric_cols:
if col in df.columns:
df[col] = _coerce_numeric_series(
"eval_runs",
df[col],
col,
allow_missing=col not in REQUIRED_NUMERIC_COLUMNS["eval_runs"],
)
df.attrs["raw_columns"] = raw_columns
return df
def standardize_retrieval_events(retrieval_events: pd.DataFrame) -> pd.DataFrame:
raw_columns = list(retrieval_events.columns)
df = retrieval_events.copy()
for col in ["query_domain", "difficulty", "retrieval_strategy", "split"]:
if col in df.columns:
df[col] = df[col].astype("string").fillna(MISSING_LABEL)
for col in REQUIRED_NUMERIC_COLUMNS["retrieval_events"]:
if col in df.columns:
df[col] = _coerce_numeric_series("retrieval_events", df[col], col, allow_missing=False)
df.attrs["raw_columns"] = raw_columns
return df
def schema_report(bundle: DataBundle) -> pd.DataFrame:
frames = {
"eval_runs": bundle.eval_runs,
"retrieval_events": bundle.retrieval_events,
"documents": bundle.documents,
"chunks": bundle.chunks,
"scenarios": bundle.scenarios,
}
rows = []
for name, df in frames.items():
required = REQUIRED_COLUMNS.get(name, [])
source_columns = set(df.attrs.get("raw_columns", list(df.columns)))
missing = [col for col in required if col not in source_columns]
rows.append(
{
"table": name,
"rows": len(df),
"columns": df.shape[1],
"required_missing_raw": ", ".join(missing) if missing else "none",
"status": "pass" if not missing else "review",
}
)
return pd.DataFrame(rows)
def filter_eval(
df: pd.DataFrame,
domains: Sequence[str] | None = None,
difficulties: Sequence[str] | None = None,
scenario_types: Sequence[str] | None = None,
retrievers: Sequence[str] | None = None,
generators: Sequence[str] | None = None,
splits: Sequence[str] | None = None,
) -> pd.DataFrame:
out = df.copy()
filters = {
"domain": domains,
"difficulty": difficulties,
"scenario_type": scenario_types,
"retrieval_strategy": retrievers,
"generator_model": generators,
"split": splits,
}
for col, values in filters.items():
if values and col in out.columns:
out = out[out[col].astype(str).isin([str(v) for v in values])]
return out
def filter_retrieval_events(retrieval_df: pd.DataFrame, eval_df: pd.DataFrame) -> pd.DataFrame:
"""Return retrieval rows that belong to the currently filtered evaluation examples."""
if retrieval_df.empty or eval_df.empty:
return retrieval_df.iloc[0:0].copy()
if "example_id" not in retrieval_df.columns or "example_id" not in eval_df.columns:
return retrieval_df.copy()
allowed = set(eval_df["example_id"].astype(str))
return retrieval_df[retrieval_df["example_id"].astype(str).isin(allowed)].copy()
def option_values(df: pd.DataFrame, col: str) -> list[str]:
if col not in df.columns:
return []
values = df[col].astype("string").fillna(MISSING_LABEL).unique().tolist()
return sorted([str(value) for value in values])