"""Terminal graders for the seeded DataOpsEnv benchmark.""" from __future__ import annotations import ast import json import logging import os import sqlite3 from typing import Any from server.dataops_env_environment import DataOpsEnvironment from server.safe_exec import run_python_code, run_python_script from server.task_specs import ( build_task_3_report, normalize_task_2_output_rows, normalize_task_3_rows, report_matches_expected, task_3_data_matches_expected, task_3_semantic_match_fraction_rows, task_3_semantic_match_fraction_text, ) logger = logging.getLogger(__name__) SCRIPT_TIMEOUT_S = 10 INTERNAL_STDOUT_LIMIT = 50_000 INTERNAL_STDERR_LIMIT = 10_000 def evaluate_task(task_id: str, env: DataOpsEnvironment) -> dict[str, Any]: graders = { "task_1_easy_anomaly": _grade_task_1, "task_2_medium_syntax": _grade_task_2, "task_3_hard_e2e": _grade_task_3, } grader = graders.get(task_id) if grader is None: return {"task_id": task_id, "score": 0.0, "details": {"error": "Unknown task"}} score, details = grader(env) return {"task_id": task_id, "score": round(score, 2), "details": details} def _grade_task_1(env: DataOpsEnvironment) -> tuple[float, dict[str, Any]]: if env.scenario.task_1 is None: return 0.0, {"error": "Task 1 scenario missing."} try: actual_rows = _current_transactions_rows(env.db_path) except Exception: logger.exception("Task 1 grading error") return 0.0, {"error": "Internal grading error."} expected_rows = list(env.scenario.task_1.expected_rows) corrupted_ids = set(env.scenario.task_1.corrupted_row_ids) actual_ids = {row["id"] for row in actual_rows} expected_ids = {row["id"] for row in expected_rows} corrupted_remaining = sorted(actual_ids & corrupted_ids) rewritten_corrupted = [ row for row in actual_rows if row["id"] in corrupted_ids and row["amount"] is not None ] valid_rows_intact = all( any(actual == expected for actual in actual_rows) for expected in expected_rows ) details: dict[str, Any] = { "expected_row_ids": sorted(expected_ids), "actual_row_ids": sorted(actual_ids), "corrupted_row_ids": sorted(corrupted_ids), "corrupted_remaining": corrupted_remaining, "valid_rows_intact": valid_rows_intact, } if actual_rows == expected_rows: details["reason"] = "Perfect - corrupted rows were deleted and all valid rows were preserved." details["components"] = { "exact_cleanup": {"score": 1.0, "max": 1.0, "passed": True}, } return 1.0, details if rewritten_corrupted: details["reason"] = "Corrupted rows were rewritten instead of being deleted." details["components"] = { "exact_cleanup": {"score": 0.0, "max": 1.0, "passed": False}, } return 0.0, details if valid_rows_intact and corrupted_remaining: fraction_removed = 1.0 - (len(corrupted_remaining) / max(len(corrupted_ids), 1)) score = round(0.25 * max(fraction_removed, 0.0), 4) details["reason"] = "Some corrupted rows were removed, but cleanup is incomplete." details["components"] = { "partial_cleanup": {"score": score, "max": 0.25, "passed": False}, } return score, details details["reason"] = "The transaction table does not match the required cleaned state." details["components"] = { "exact_cleanup": {"score": 0.0, "max": 1.0, "passed": False}, } return 0.0, details def _grade_task_2(env: DataOpsEnvironment) -> tuple[float, dict[str, Any]]: if env.scenario.task_2 is None: return 0.0, {"error": "Task 2 scenario missing."} script = os.path.join(env.workspace_dir, "broken_pipeline.py") if not os.path.isfile(script): return 0.0, { "reason": "broken_pipeline.py not found.", "components": { "script_present": {"score": 0.0, "max": 1.0, "passed": False}, }, } try: with open(script, encoding="utf-8") as f: source = f.read() static = _inspect_task_2_source(source) main_result = run_python_script( "broken_pipeline.py", cwd=env.workspace_dir, args=[], timeout_s=SCRIPT_TIMEOUT_S, stdout_limit=INTERNAL_STDOUT_LIMIT, stderr_limit=INTERNAL_STDERR_LIMIT, ) visible_result = _run_task_2_case_check( env.workspace_dir, env.scenario.task_2.visible_batch, env.scenario.task_2.visible_expected, ) hidden_result = _run_task_2_hidden_tests( env.workspace_dir, env.scenario.task_2.hidden_cases, env.scenario.task_2.hidden_expected, ) except Exception: logger.exception("Task 2 grading error") return 0.0, {"error": "Internal grading error."} if main_result.timed_out or visible_result["timed_out"] or hidden_result["timed_out"]: return 0.0, {"reason": "Script timed out.", "components": {}} hidden_score = round(0.60 * hidden_result["pass_fraction"], 4) visible_score = 0.25 if visible_result["passed"] and main_result.returncode == 0 else 0.0 execution_score = 0.15 if env.evidence.get("task_2", {}).get("verified_fix") else 0.0 components: dict[str, Any] = { "hidden_functional": { "score": hidden_score, "max": 0.60, "passed": hidden_result["passed"], }, "visible_pipeline": { "score": visible_score, "max": 0.25, "passed": visible_result["passed"] and main_result.returncode == 0, }, "execution_provenance": { "score": execution_score, "max": 0.15, "passed": bool(env.evidence.get("task_2", {}).get("verified_fix")), }, } score = round(sum(component["score"] for component in components.values()), 4) details = { "main_exit_code": main_result.returncode, "main_stdout": main_result.stdout[:500], "main_stderr": main_result.stderr[:500], "visible_batch_ok": visible_result["passed"], "hidden_tests_passed": hidden_result["passed"], "hidden_pass_fraction": hidden_result["pass_fraction"], "hidden_case_passes": hidden_result["case_passes"], "static_checks": static, "components": components, } if score == 1.0: details["reason"] = "Seeded hidden tests and the visible verification run both pass." elif hidden_result["passed"] and main_result.returncode == 0: details["reason"] = "The ETL transform is correct, but the agent never verified it through the run action." elif hidden_result["pass_fraction"] > 0 and main_result.returncode == 0: details["reason"] = "The repair improves the ETL transform, but it still fails some seeded cases." elif hidden_result["pass_fraction"] > 0: details["reason"] = "The core transform improved, but the runnable script entrypoint still drifts." elif main_result.returncode == 0: details["reason"] = "The script runs, but it does not yet produce the required normalized records." else: details["reason"] = "The repair is still incorrect or incomplete." return score, details def _grade_task_3(env: DataOpsEnvironment) -> tuple[float, dict[str, Any]]: if env.scenario.task_3 is None: return 0.0, {"error": "Task 3 scenario missing."} scenario = env.scenario.task_3 evidence = env.evidence.get("task_3", {}) expected_rows = list(scenario.expected_rows) expected_report = build_task_3_report(expected_rows, scenario.target_date) report_data = _load_task_3_data(env.workspace_dir, expected_rows) formatter = _run_task_3_formatter(env.workspace_dir, expected_rows, scenario.target_date) email = _score_task_3_email(env, expected_report) report_exact_and_proven = bool( report_data["matches_expected"] and evidence.get("report_data_matches_sql") ) formatter_exact_and_proven = bool( formatter["matches_expected"] and evidence.get("format_output_matches_expected") ) components: dict[str, Any] = { "sql_provenance": { "score": 0.20 if evidence.get("matching_sql_executed") else 0.0, "max": 0.20, "passed": bool(evidence.get("matching_sql_executed")), }, "report_data": { "score": 0.20 if report_exact_and_proven else 0.05 if report_data["matches_expected"] else 0.0, "max": 0.20, "passed": report_exact_and_proven, }, "formatter": { "score": 0.25 if formatter_exact_and_proven else 0.05 if formatter["runs"] else 0.0, "max": 0.25, "passed": formatter_exact_and_proven, }, "email": { "score": email["score"], "max": 0.35, "passed": email["passed"], }, } score = round(sum(component["score"] for component in components.values()), 4) details: dict[str, Any] = { "target_date": scenario.target_date, "expected_recipient": scenario.recipient, "expected_subject": scenario.subject, "report_data": report_data["details"], "formatter": formatter["details"], "email": email["details"], "evidence": evidence, "components": components, } if score == 1.0: details["reason"] = "Perfect - the seeded SQL slice, JSON output, formatter run, and final email all align." elif score >= 0.55: details["reason"] = "Strong progress - some of the seeded workflow is correct, but provenance is incomplete." elif score > 0: details["reason"] = "Partial progress - artifacts exist, but the end-to-end incident workflow is not proven." else: details["reason"] = "The seeded hard task is still unsolved." return score, details def _inspect_task_2_source(source: str) -> dict[str, Any]: try: tree = ast.parse(source) except SyntaxError as exc: return {"passed": False, "error": str(exc), "has_function": False} functions = [node for node in tree.body if isinstance(node, ast.FunctionDef)] target = next((node for node in functions if node.name == "process_data_stream"), None) passed = target is not None and len(target.args.args) == 1 return {"passed": passed, "has_function": target is not None} def _run_task_2_case_check( workspace_dir: str, batch: tuple[dict[str, Any], ...], expected: tuple[dict[str, Any], ...], ) -> dict[str, Any]: wrapper = f""" import importlib.util import json spec = importlib.util.spec_from_file_location("candidate_pipeline", "broken_pipeline.py") module = importlib.util.module_from_spec(spec) assert spec.loader is not None spec.loader.exec_module(module) batch = {json.dumps(list(batch))} results = module.process_data_stream(batch) print("__RESULT__=" + json.dumps(results)) """ result = run_python_code( wrapper, cwd=workspace_dir, timeout_s=SCRIPT_TIMEOUT_S, stdout_limit=INTERNAL_STDOUT_LIMIT, stderr_limit=INTERNAL_STDERR_LIMIT, ) payload = next( ( line[len("__RESULT__=") :] for line in result.stdout.splitlines() if line.startswith("__RESULT__=") ), "", ) try: parsed = json.loads(payload) if payload else None except json.JSONDecodeError: parsed = None normalised = normalize_task_2_output_rows(parsed) ok = result.returncode == 0 and normalised == list(expected) return { "passed": ok, "timed_out": result.timed_out, "stdout": result.stdout[:500], "stderr": result.stderr[:500], "actual": normalised, } def _run_task_2_hidden_tests( workspace_dir: str, hidden_cases: tuple[tuple[dict[str, Any], ...], ...], hidden_expected: tuple[tuple[dict[str, Any], ...], ...], ) -> dict[str, Any]: wrapper = f""" import importlib.util import json spec = importlib.util.spec_from_file_location("candidate_pipeline", "broken_pipeline.py") module = importlib.util.module_from_spec(spec) assert spec.loader is not None spec.loader.exec_module(module) cases = {json.dumps([list(batch) for batch in hidden_cases])} results = [module.process_data_stream(case) for case in cases] print("__RESULT__=" + json.dumps(results)) """ result = run_python_code( wrapper, cwd=workspace_dir, timeout_s=SCRIPT_TIMEOUT_S, stdout_limit=INTERNAL_STDOUT_LIMIT, stderr_limit=INTERNAL_STDERR_LIMIT, ) payload = next( ( line[len("__RESULT__=") :] for line in result.stdout.splitlines() if line.startswith("__RESULT__=") ), "", ) try: parsed = json.loads(payload) if payload else None except json.JSONDecodeError: parsed = None if not isinstance(parsed, list): parsed = [] actual_batches = [ normalize_task_2_output_rows(batch) for batch in parsed ] expected = [list(batch) for batch in hidden_expected] case_passes = [ actual == expected_case for actual, expected_case in zip(actual_batches, expected, strict=False) ] if len(case_passes) < len(expected): case_passes.extend([False] * (len(expected) - len(case_passes))) pass_fraction = ( sum(1 for passed in case_passes if passed) / len(expected) if expected else 0.0 ) return { "passed": result.returncode == 0 and len(actual_batches) == len(expected) and all(case_passes), "timed_out": result.timed_out, "stdout": result.stdout[:500], "stderr": result.stderr[:500], "actual": actual_batches, "case_passes": case_passes, "pass_fraction": round(pass_fraction, 4), } def _load_task_3_data( workspace_dir: str, expected_rows: list[dict[str, Any]] ) -> dict[str, Any]: report_json = os.path.join(workspace_dir, "report_data.json") if not os.path.isfile(report_json): return { "matches_expected": False, "details": {"exists": False, "reason": "report_data.json not found."}, } try: with open(report_json, encoding="utf-8") as f: payload = json.load(f) except (OSError, json.JSONDecodeError) as exc: return { "matches_expected": False, "details": {"exists": True, "reason": str(exc)}, } if not isinstance(payload, list): return { "matches_expected": False, "details": { "exists": True, "reason": "report_data.json must contain a JSON list.", }, } rows = normalize_task_3_rows(payload, require_headcount=True) matches_expected = bool(rows) and task_3_data_matches_expected( rows, expected_rows, require_headcount=True, ) semantic_fraction = task_3_semantic_match_fraction_rows(rows, expected_rows) return { "matches_expected": matches_expected, "details": { "exists": True, "rows_valid": bool(rows), "rows_match_expected": matches_expected, "semantic_fraction": round(semantic_fraction, 4), }, } def _run_task_3_formatter( workspace_dir: str, expected_rows: list[dict[str, Any]], target_date: str, ) -> dict[str, Any]: script = os.path.join(workspace_dir, "format_report.py") if not os.path.isfile(script): return { "runs": False, "matches_expected": False, "details": {"reason": "format_report.py not found."}, } try: result = run_python_script( "format_report.py", cwd=workspace_dir, args=["report_data.json"], timeout_s=SCRIPT_TIMEOUT_S, stdout_limit=INTERNAL_STDOUT_LIMIT, stderr_limit=INTERNAL_STDERR_LIMIT, ) except Exception as exc: return { "runs": False, "matches_expected": False, "details": {"reason": str(exc)}, } if result.timed_out: return { "runs": False, "matches_expected": False, "details": {"reason": "Formatter timed out."}, } stdout = (result.stdout or "").strip() matches_expected = result.returncode == 0 and report_matches_expected( stdout, expected_rows, target_date, ) return { "runs": result.returncode == 0, "matches_expected": matches_expected, "details": { "exit_code": result.returncode, "stdout": stdout[:500], "stderr": (result.stderr or "")[:500], "semantic_fraction": round( task_3_semantic_match_fraction_text(stdout, expected_rows, target_date), 4, ), }, } def _score_task_3_email( env: DataOpsEnvironment, expected_report: str ) -> dict[str, Any]: scenario = env.scenario.task_3 assert scenario is not None evidence = env.evidence.get("task_3", {}) outbox = env.email_outbox if not outbox: return { "score": 0.0, "passed": False, "details": {"reason": "No email sent."}, } email = outbox[-1] recipient_ok = email.get("to_email") == scenario.recipient subject_ok = email.get("subject") == scenario.subject body = str(email.get("body", "")).strip() body_ok = body == expected_report.strip() proven = bool(evidence.get("email_matches_formatter_output")) and len(outbox) == 1 score = 0.0 if recipient_ok: score += 0.05 if subject_ok: score += 0.05 if body_ok and proven: score += 0.25 return { "score": score, "passed": score == 0.35, "details": { "emails_sent": len(outbox), "recipient_ok": recipient_ok, "subject_ok": subject_ok, "body_ok": body_ok, "proven": proven, "semantic_fraction": round( task_3_semantic_match_fraction_text( body, list(scenario.expected_rows), scenario.target_date, ), 4, ), }, } def _current_transactions_rows(db_path: str) -> list[dict[str, Any]]: with sqlite3.connect(db_path) as conn: conn.row_factory = sqlite3.Row table_exists = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='transactions'" ).fetchone() if not table_exists: return [] rows = conn.execute( "SELECT id, user_id, amount, status FROM transactions ORDER BY id" ).fetchall() return [ { "id": int(row["id"]), "user_id": int(row["user_id"]), "amount": None if row["amount"] is None else round(float(row["amount"]), 2), "status": str(row["status"]), } for row in rows ]