from __future__ import annotations import argparse import json import os from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import pandas as pd from src.core.cleaning import clean_dataframe from src.core.query import QuerySpec, FilterSpec, execute_query, plan_query_with_llm @dataclass class CaseResult: case_id: str passed: bool details: str def _load_benchmarks(path: Path) -> Dict[str, Any]: return json.loads(path.read_text()) def _spec_from_dict(d: Dict[str, Any]) -> QuerySpec: filters = [FilterSpec(**f) for f in d.get("filters", [])] return QuerySpec( select=d.get("select", []), filters=filters, distinct=bool(d.get("distinct", True)), limit=int(d.get("limit", 50)), ) def _check_expected(result_df: pd.DataFrame, expected: Dict[str, Any]) -> Tuple[bool, str]: et = expected.get("type") if et == "set_equals": col = expected["column"] want = set(expected["values"]) if col not in result_df.columns: return False, f"Missing expected column '{col}'. Columns: {list(result_df.columns)}" got = set([x for x in result_df[col].dropna().astype(str).tolist()]) missing = want - got extra = got - want if missing or extra: return False, f"Set mismatch. Missing={sorted(missing)} Extra={sorted(extra)}" return True, "OK" if et == "row_count_gte": min_rows = int(expected["min_rows"]) n = len(result_df) return (n >= min_rows), f"Rows={n}, expected >= {min_rows}" if et == "row_count_equals": want = int(expected["rows"]) n = len(result_df) return (n == want), f"Rows={n}, expected == {want}" return False, f"Unknown expected.type '{et}'" def run(args: argparse.Namespace) -> int: bench = _load_benchmarks(Path(args.benchmarks)) df_raw = pd.read_csv(args.csv) df, report = clean_dataframe(df_raw) results: List[CaseResult] = [] for case in bench["cases"]: cid = case["id"] mode = case.get("mode", "spec") expected = case["expected"] try: if mode == "spec": spec = _spec_from_dict(case["spec"]) elif mode == "llm": if not args.api_key and not os.getenv("OPENAI_API_KEY"): results.append(CaseResult(cid, False, "No API key for LLM mode")) continue api_key = args.api_key or os.getenv("OPENAI_API_KEY", "") spec = plan_query_with_llm(case["question"], df, api_key=api_key, model=args.model) else: results.append(CaseResult(cid, False, f"Unknown mode '{mode}'")) continue out = execute_query(spec, df) ok, details = _check_expected(out, expected) results.append(CaseResult(cid, ok, details)) except Exception as e: results.append(CaseResult(cid, False, f"Exception: {e}")) passed = sum(1 for r in results if r.passed) total = len(results) print("\n=== Cleaning report ===") print({"rows": report.rows, "fixes": report.fixes, "warnings": report.warnings}) print("\n=== Benchmark results ===") for r in results: status = "PASS" if r.passed else "FAIL" print(f"[{status}] {r.case_id}: {r.details}") print(f"\nSummary: {passed}/{total} passed") return 0 if passed == total else 1 if __name__ == "__main__": p = argparse.ArgumentParser(description="Run benchmark evaluation for the AI Data Validation Agent") p.add_argument("--csv", required=True, help="Path to CSV dataset") p.add_argument("--benchmarks", default="src/eval/benchmarks.json", help="Path to benchmarks.json") p.add_argument("--api-key", default="", help="OpenAI API key (optional; only needed for llm-mode cases)") p.add_argument("--model", default="gpt-4.1-mini", help="Model for llm-mode cases") raise SystemExit(run(p.parse_args()))