| """Validate Karthik's EmpathRAG evaluation dataset delivery. |
| |
| Run from repo root: |
| python eval/validate_eval_delivery.py path/to/empathrag_eval_delivery_v1 |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import csv |
| from pathlib import Path |
|
|
|
|
| REQUIRED_FILES = { |
| "README_eval_notes.md", |
| "eval_queries.csv", |
| "source_target_map.csv", |
| "risky_or_ambiguous_cases.csv", |
| } |
|
|
| EVAL_QUERY_COLUMNS = [ |
| "query_id", |
| "query_text", |
| "scenario_category", |
| "risk_category", |
| "expected_usage_mode", |
| "expected_topics", |
| "expected_source_types", |
| "expected_source_names", |
| "should_intercept", |
| "ideal_behavior", |
| "notes", |
| ] |
|
|
| SOURCE_TARGET_COLUMNS = [ |
| "need_id", |
| "user_need", |
| "preferred_topics", |
| "preferred_source_names", |
| "avoid_source_names", |
| "notes", |
| ] |
|
|
| RISKY_CASE_COLUMNS = [ |
| "case_id", |
| "query_text", |
| "why_it_is_tricky", |
| "correct_risk_category", |
| "should_intercept", |
| "expected_handling", |
| ] |
|
|
| SCENARIO_CATEGORIES = { |
| "counseling_navigation", |
| "after_hours_support", |
| "crisis_immediate_help", |
| "anxiety_stress", |
| "depression_support", |
| "academic_burnout", |
| "advisor_conflict", |
| "graduate_student_support", |
| "accessibility_disability", |
| "isolation_loneliness", |
| "therapy_expectations", |
| "help_seeking_script", |
| "grounding_or_wellbeing", |
| "campus_navigation", |
| "out_of_scope", |
| } |
|
|
| RISK_CATEGORIES = {"normal", "wellbeing", "crisis", "emergency", "ambiguous", "out_of_scope"} |
| USAGE_MODES = {"retrieval", "wellbeing_only", "crisis_only", "none"} |
| YES_NO = {"yes", "no"} |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser(description="Validate EmpathRAG eval delivery.") |
| parser.add_argument("delivery_dir", type=Path) |
| args = parser.parse_args() |
|
|
| issues = validate_delivery(args.delivery_dir) |
| if issues: |
| print(f"Validation failed with {len(issues)} issue(s):") |
| for issue in issues: |
| print(f"- {issue}") |
| return 1 |
|
|
| print("Validation passed.") |
| return 0 |
|
|
|
|
| def validate_delivery(delivery_dir: Path) -> list[str]: |
| issues: list[str] = [] |
| if not delivery_dir.exists(): |
| return [f"delivery directory not found: {delivery_dir}"] |
|
|
| present = {path.name for path in delivery_dir.iterdir() if path.is_file()} |
| missing = REQUIRED_FILES - present |
| for name in sorted(missing): |
| issues.append(f"missing required file: {name}") |
| if missing: |
| return issues |
|
|
| eval_rows = _read_csv(delivery_dir / "eval_queries.csv", EVAL_QUERY_COLUMNS, issues) |
| source_rows = _read_csv(delivery_dir / "source_target_map.csv", SOURCE_TARGET_COLUMNS, issues) |
| risky_rows = _read_csv(delivery_dir / "risky_or_ambiguous_cases.csv", RISKY_CASE_COLUMNS, issues) |
|
|
| _check_unique(eval_rows, "query_id", issues) |
| _check_unique(source_rows, "need_id", issues) |
| _check_unique(risky_rows, "case_id", issues) |
|
|
| if eval_rows and not (50 <= len(eval_rows) <= 70): |
| issues.append(f"eval_queries.csv should contain 50-70 rows; found {len(eval_rows)}") |
| if source_rows and not (15 <= len(source_rows) <= 25): |
| issues.append(f"source_target_map.csv should contain 15-25 rows; found {len(source_rows)}") |
| if risky_rows and not (15 <= len(risky_rows) <= 25): |
| issues.append(f"risky_or_ambiguous_cases.csv should contain 15-25 rows; found {len(risky_rows)}") |
|
|
| for row in eval_rows: |
| row_id = row["query_id"] |
| _check_allowed(row, "scenario_category", SCENARIO_CATEGORIES, row_id, issues) |
| _check_allowed(row, "risk_category", RISK_CATEGORIES, row_id, issues) |
| _check_allowed(row, "expected_usage_mode", USAGE_MODES, row_id, issues) |
| _check_allowed(row, "should_intercept", YES_NO, row_id, issues) |
| _check_risk_consistency(row, row_id, issues) |
| if not row["query_text"].strip(): |
| issues.append(f"{row_id}: query_text is empty") |
| if not row["ideal_behavior"].strip(): |
| issues.append(f"{row_id}: ideal_behavior is empty") |
|
|
| for row in risky_rows: |
| row_id = row["case_id"] |
| _check_allowed(row, "correct_risk_category", RISK_CATEGORIES, row_id, issues) |
| _check_allowed(row, "should_intercept", YES_NO, row_id, issues) |
|
|
| return issues |
|
|
|
|
| def _read_csv(path: Path, expected_columns: list[str], issues: list[str]) -> list[dict[str, str]]: |
| try: |
| with path.open(encoding="utf-8-sig", newline="") as handle: |
| reader = csv.DictReader(handle) |
| actual = reader.fieldnames or [] |
| if actual != expected_columns: |
| issues.append( |
| f"{path.name}: columns must be {expected_columns}; found {actual}" |
| ) |
| return [] |
| return list(reader) |
| except Exception as exc: |
| issues.append(f"{path.name}: failed to read CSV: {exc}") |
| return [] |
|
|
|
|
| def _check_unique(rows: list[dict[str, str]], field: str, issues: list[str]) -> None: |
| seen: set[str] = set() |
| for row in rows: |
| value = row.get(field, "").strip() |
| if not value: |
| issues.append(f"{field}: empty ID") |
| elif value in seen: |
| issues.append(f"{field}: duplicate ID {value}") |
| seen.add(value) |
|
|
|
|
| def _check_allowed( |
| row: dict[str, str], |
| field: str, |
| allowed: set[str], |
| row_id: str, |
| issues: list[str], |
| ) -> None: |
| value = row.get(field, "").strip() |
| if value not in allowed: |
| issues.append(f"{row_id}: {field}={value!r} must be one of {sorted(allowed)}") |
|
|
|
|
| def _check_risk_consistency(row: dict[str, str], row_id: str, issues: list[str]) -> None: |
| risk = row["risk_category"].strip() |
| usage = row["expected_usage_mode"].strip() |
| intercept = row["should_intercept"].strip() |
| if risk in {"crisis", "emergency"} and intercept != "yes": |
| issues.append(f"{row_id}: crisis/emergency rows should use should_intercept=yes") |
| if risk == "emergency" and usage != "crisis_only": |
| issues.append(f"{row_id}: emergency rows should use expected_usage_mode=crisis_only") |
| if risk == "normal" and intercept != "no": |
| issues.append(f"{row_id}: normal rows should use should_intercept=no") |
| if risk == "wellbeing" and usage not in {"wellbeing_only", "retrieval"}: |
| issues.append(f"{row_id}: wellbeing rows should use wellbeing_only or retrieval") |
| if risk == "out_of_scope" and usage != "none": |
| issues.append(f"{row_id}: out_of_scope rows should use expected_usage_mode=none") |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|