EmpathRAG / eval /validate_eval_delivery.py
MukulRay's picture
Checkpoint V2 curated support navigator
15594c0
"""Validate Karthik's EmpathRAG evaluation dataset delivery.
Run from repo root:
python eval/validate_eval_delivery.py path/to/empathrag_eval_delivery_v1
"""
from __future__ import annotations
import argparse
import csv
from pathlib import Path
REQUIRED_FILES = {
"README_eval_notes.md",
"eval_queries.csv",
"source_target_map.csv",
"risky_or_ambiguous_cases.csv",
}
EVAL_QUERY_COLUMNS = [
"query_id",
"query_text",
"scenario_category",
"risk_category",
"expected_usage_mode",
"expected_topics",
"expected_source_types",
"expected_source_names",
"should_intercept",
"ideal_behavior",
"notes",
]
SOURCE_TARGET_COLUMNS = [
"need_id",
"user_need",
"preferred_topics",
"preferred_source_names",
"avoid_source_names",
"notes",
]
RISKY_CASE_COLUMNS = [
"case_id",
"query_text",
"why_it_is_tricky",
"correct_risk_category",
"should_intercept",
"expected_handling",
]
SCENARIO_CATEGORIES = {
"counseling_navigation",
"after_hours_support",
"crisis_immediate_help",
"anxiety_stress",
"depression_support",
"academic_burnout",
"advisor_conflict",
"graduate_student_support",
"accessibility_disability",
"isolation_loneliness",
"therapy_expectations",
"help_seeking_script",
"grounding_or_wellbeing",
"campus_navigation",
"out_of_scope",
}
RISK_CATEGORIES = {"normal", "wellbeing", "crisis", "emergency", "ambiguous", "out_of_scope"}
USAGE_MODES = {"retrieval", "wellbeing_only", "crisis_only", "none"}
YES_NO = {"yes", "no"}
def main() -> int:
parser = argparse.ArgumentParser(description="Validate EmpathRAG eval delivery.")
parser.add_argument("delivery_dir", type=Path)
args = parser.parse_args()
issues = validate_delivery(args.delivery_dir)
if issues:
print(f"Validation failed with {len(issues)} issue(s):")
for issue in issues:
print(f"- {issue}")
return 1
print("Validation passed.")
return 0
def validate_delivery(delivery_dir: Path) -> list[str]:
issues: list[str] = []
if not delivery_dir.exists():
return [f"delivery directory not found: {delivery_dir}"]
present = {path.name for path in delivery_dir.iterdir() if path.is_file()}
missing = REQUIRED_FILES - present
for name in sorted(missing):
issues.append(f"missing required file: {name}")
if missing:
return issues
eval_rows = _read_csv(delivery_dir / "eval_queries.csv", EVAL_QUERY_COLUMNS, issues)
source_rows = _read_csv(delivery_dir / "source_target_map.csv", SOURCE_TARGET_COLUMNS, issues)
risky_rows = _read_csv(delivery_dir / "risky_or_ambiguous_cases.csv", RISKY_CASE_COLUMNS, issues)
_check_unique(eval_rows, "query_id", issues)
_check_unique(source_rows, "need_id", issues)
_check_unique(risky_rows, "case_id", issues)
if eval_rows and not (50 <= len(eval_rows) <= 70):
issues.append(f"eval_queries.csv should contain 50-70 rows; found {len(eval_rows)}")
if source_rows and not (15 <= len(source_rows) <= 25):
issues.append(f"source_target_map.csv should contain 15-25 rows; found {len(source_rows)}")
if risky_rows and not (15 <= len(risky_rows) <= 25):
issues.append(f"risky_or_ambiguous_cases.csv should contain 15-25 rows; found {len(risky_rows)}")
for row in eval_rows:
row_id = row["query_id"]
_check_allowed(row, "scenario_category", SCENARIO_CATEGORIES, row_id, issues)
_check_allowed(row, "risk_category", RISK_CATEGORIES, row_id, issues)
_check_allowed(row, "expected_usage_mode", USAGE_MODES, row_id, issues)
_check_allowed(row, "should_intercept", YES_NO, row_id, issues)
_check_risk_consistency(row, row_id, issues)
if not row["query_text"].strip():
issues.append(f"{row_id}: query_text is empty")
if not row["ideal_behavior"].strip():
issues.append(f"{row_id}: ideal_behavior is empty")
for row in risky_rows:
row_id = row["case_id"]
_check_allowed(row, "correct_risk_category", RISK_CATEGORIES, row_id, issues)
_check_allowed(row, "should_intercept", YES_NO, row_id, issues)
return issues
def _read_csv(path: Path, expected_columns: list[str], issues: list[str]) -> list[dict[str, str]]:
try:
with path.open(encoding="utf-8-sig", newline="") as handle:
reader = csv.DictReader(handle)
actual = reader.fieldnames or []
if actual != expected_columns:
issues.append(
f"{path.name}: columns must be {expected_columns}; found {actual}"
)
return []
return list(reader)
except Exception as exc:
issues.append(f"{path.name}: failed to read CSV: {exc}")
return []
def _check_unique(rows: list[dict[str, str]], field: str, issues: list[str]) -> None:
seen: set[str] = set()
for row in rows:
value = row.get(field, "").strip()
if not value:
issues.append(f"{field}: empty ID")
elif value in seen:
issues.append(f"{field}: duplicate ID {value}")
seen.add(value)
def _check_allowed(
row: dict[str, str],
field: str,
allowed: set[str],
row_id: str,
issues: list[str],
) -> None:
value = row.get(field, "").strip()
if value not in allowed:
issues.append(f"{row_id}: {field}={value!r} must be one of {sorted(allowed)}")
def _check_risk_consistency(row: dict[str, str], row_id: str, issues: list[str]) -> None:
risk = row["risk_category"].strip()
usage = row["expected_usage_mode"].strip()
intercept = row["should_intercept"].strip()
if risk in {"crisis", "emergency"} and intercept != "yes":
issues.append(f"{row_id}: crisis/emergency rows should use should_intercept=yes")
if risk == "emergency" and usage != "crisis_only":
issues.append(f"{row_id}: emergency rows should use expected_usage_mode=crisis_only")
if risk == "normal" and intercept != "no":
issues.append(f"{row_id}: normal rows should use should_intercept=no")
if risk == "wellbeing" and usage not in {"wellbeing_only", "retrieval"}:
issues.append(f"{row_id}: wellbeing rows should use wellbeing_only or retrieval")
if risk == "out_of_scope" and usage != "none":
issues.append(f"{row_id}: out_of_scope rows should use expected_usage_mode=none")
if __name__ == "__main__":
raise SystemExit(main())