Spaces:
Sleeping
Sleeping
| """Terminal graders for the seeded DataOpsEnv benchmark.""" | |
| from __future__ import annotations | |
| import ast | |
| import json | |
| import logging | |
| import os | |
| import sqlite3 | |
| from typing import Any | |
| from server.dataops_env_environment import DataOpsEnvironment | |
| from server.safe_exec import run_python_code, run_python_script | |
| from server.task_specs import ( | |
| build_task_3_report, | |
| normalize_task_2_output_rows, | |
| normalize_task_3_rows, | |
| report_matches_expected, | |
| task_3_data_matches_expected, | |
| task_3_semantic_match_fraction_rows, | |
| task_3_semantic_match_fraction_text, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| SCRIPT_TIMEOUT_S = 10 | |
| INTERNAL_STDOUT_LIMIT = 50_000 | |
| INTERNAL_STDERR_LIMIT = 10_000 | |
| def evaluate_task(task_id: str, env: DataOpsEnvironment) -> dict[str, Any]: | |
| graders = { | |
| "task_1_easy_anomaly": _grade_task_1, | |
| "task_2_medium_syntax": _grade_task_2, | |
| "task_3_hard_e2e": _grade_task_3, | |
| } | |
| grader = graders.get(task_id) | |
| if grader is None: | |
| return {"task_id": task_id, "score": 0.0, "details": {"error": "Unknown task"}} | |
| score, details = grader(env) | |
| return {"task_id": task_id, "score": round(score, 2), "details": details} | |
| def _grade_task_1(env: DataOpsEnvironment) -> tuple[float, dict[str, Any]]: | |
| if env.scenario.task_1 is None: | |
| return 0.0, {"error": "Task 1 scenario missing."} | |
| try: | |
| actual_rows = _current_transactions_rows(env.db_path) | |
| except Exception: | |
| logger.exception("Task 1 grading error") | |
| return 0.0, {"error": "Internal grading error."} | |
| expected_rows = list(env.scenario.task_1.expected_rows) | |
| corrupted_ids = set(env.scenario.task_1.corrupted_row_ids) | |
| actual_ids = {row["id"] for row in actual_rows} | |
| expected_ids = {row["id"] for row in expected_rows} | |
| corrupted_remaining = sorted(actual_ids & corrupted_ids) | |
| rewritten_corrupted = [ | |
| row for row in actual_rows if row["id"] in corrupted_ids and row["amount"] is not None | |
| ] | |
| valid_rows_intact = all( | |
| any(actual == expected for actual in actual_rows) for expected in expected_rows | |
| ) | |
| details: dict[str, Any] = { | |
| "expected_row_ids": sorted(expected_ids), | |
| "actual_row_ids": sorted(actual_ids), | |
| "corrupted_row_ids": sorted(corrupted_ids), | |
| "corrupted_remaining": corrupted_remaining, | |
| "valid_rows_intact": valid_rows_intact, | |
| } | |
| if actual_rows == expected_rows: | |
| details["reason"] = "Perfect - corrupted rows were deleted and all valid rows were preserved." | |
| details["components"] = { | |
| "exact_cleanup": {"score": 1.0, "max": 1.0, "passed": True}, | |
| } | |
| return 1.0, details | |
| if rewritten_corrupted: | |
| details["reason"] = "Corrupted rows were rewritten instead of being deleted." | |
| details["components"] = { | |
| "exact_cleanup": {"score": 0.0, "max": 1.0, "passed": False}, | |
| } | |
| return 0.0, details | |
| if valid_rows_intact and corrupted_remaining: | |
| fraction_removed = 1.0 - (len(corrupted_remaining) / max(len(corrupted_ids), 1)) | |
| score = round(0.25 * max(fraction_removed, 0.0), 4) | |
| details["reason"] = "Some corrupted rows were removed, but cleanup is incomplete." | |
| details["components"] = { | |
| "partial_cleanup": {"score": score, "max": 0.25, "passed": False}, | |
| } | |
| return score, details | |
| details["reason"] = "The transaction table does not match the required cleaned state." | |
| details["components"] = { | |
| "exact_cleanup": {"score": 0.0, "max": 1.0, "passed": False}, | |
| } | |
| return 0.0, details | |
| def _grade_task_2(env: DataOpsEnvironment) -> tuple[float, dict[str, Any]]: | |
| if env.scenario.task_2 is None: | |
| return 0.0, {"error": "Task 2 scenario missing."} | |
| script = os.path.join(env.workspace_dir, "broken_pipeline.py") | |
| if not os.path.isfile(script): | |
| return 0.0, { | |
| "reason": "broken_pipeline.py not found.", | |
| "components": { | |
| "script_present": {"score": 0.0, "max": 1.0, "passed": False}, | |
| }, | |
| } | |
| try: | |
| with open(script, encoding="utf-8") as f: | |
| source = f.read() | |
| static = _inspect_task_2_source(source) | |
| main_result = run_python_script( | |
| "broken_pipeline.py", | |
| cwd=env.workspace_dir, | |
| args=[], | |
| timeout_s=SCRIPT_TIMEOUT_S, | |
| stdout_limit=INTERNAL_STDOUT_LIMIT, | |
| stderr_limit=INTERNAL_STDERR_LIMIT, | |
| ) | |
| visible_result = _run_task_2_case_check( | |
| env.workspace_dir, | |
| env.scenario.task_2.visible_batch, | |
| env.scenario.task_2.visible_expected, | |
| ) | |
| hidden_result = _run_task_2_hidden_tests( | |
| env.workspace_dir, | |
| env.scenario.task_2.hidden_cases, | |
| env.scenario.task_2.hidden_expected, | |
| ) | |
| except Exception: | |
| logger.exception("Task 2 grading error") | |
| return 0.0, {"error": "Internal grading error."} | |
| if main_result.timed_out or visible_result["timed_out"] or hidden_result["timed_out"]: | |
| return 0.0, {"reason": "Script timed out.", "components": {}} | |
| hidden_score = round(0.60 * hidden_result["pass_fraction"], 4) | |
| visible_score = 0.25 if visible_result["passed"] and main_result.returncode == 0 else 0.0 | |
| execution_score = 0.15 if env.evidence.get("task_2", {}).get("verified_fix") else 0.0 | |
| components: dict[str, Any] = { | |
| "hidden_functional": { | |
| "score": hidden_score, | |
| "max": 0.60, | |
| "passed": hidden_result["passed"], | |
| }, | |
| "visible_pipeline": { | |
| "score": visible_score, | |
| "max": 0.25, | |
| "passed": visible_result["passed"] and main_result.returncode == 0, | |
| }, | |
| "execution_provenance": { | |
| "score": execution_score, | |
| "max": 0.15, | |
| "passed": bool(env.evidence.get("task_2", {}).get("verified_fix")), | |
| }, | |
| } | |
| score = round(sum(component["score"] for component in components.values()), 4) | |
| details = { | |
| "main_exit_code": main_result.returncode, | |
| "main_stdout": main_result.stdout[:500], | |
| "main_stderr": main_result.stderr[:500], | |
| "visible_batch_ok": visible_result["passed"], | |
| "hidden_tests_passed": hidden_result["passed"], | |
| "hidden_pass_fraction": hidden_result["pass_fraction"], | |
| "hidden_case_passes": hidden_result["case_passes"], | |
| "static_checks": static, | |
| "components": components, | |
| } | |
| if score == 1.0: | |
| details["reason"] = "Seeded hidden tests and the visible verification run both pass." | |
| elif hidden_result["passed"] and main_result.returncode == 0: | |
| details["reason"] = "The ETL transform is correct, but the agent never verified it through the run action." | |
| elif hidden_result["pass_fraction"] > 0 and main_result.returncode == 0: | |
| details["reason"] = "The repair improves the ETL transform, but it still fails some seeded cases." | |
| elif hidden_result["pass_fraction"] > 0: | |
| details["reason"] = "The core transform improved, but the runnable script entrypoint still drifts." | |
| elif main_result.returncode == 0: | |
| details["reason"] = "The script runs, but it does not yet produce the required normalized records." | |
| else: | |
| details["reason"] = "The repair is still incorrect or incomplete." | |
| return score, details | |
| def _grade_task_3(env: DataOpsEnvironment) -> tuple[float, dict[str, Any]]: | |
| if env.scenario.task_3 is None: | |
| return 0.0, {"error": "Task 3 scenario missing."} | |
| scenario = env.scenario.task_3 | |
| evidence = env.evidence.get("task_3", {}) | |
| expected_rows = list(scenario.expected_rows) | |
| expected_report = build_task_3_report(expected_rows, scenario.target_date) | |
| report_data = _load_task_3_data(env.workspace_dir, expected_rows) | |
| formatter = _run_task_3_formatter(env.workspace_dir, expected_rows, scenario.target_date) | |
| email = _score_task_3_email(env, expected_report) | |
| report_exact_and_proven = bool( | |
| report_data["matches_expected"] and evidence.get("report_data_matches_sql") | |
| ) | |
| formatter_exact_and_proven = bool( | |
| formatter["matches_expected"] and evidence.get("format_output_matches_expected") | |
| ) | |
| components: dict[str, Any] = { | |
| "sql_provenance": { | |
| "score": 0.20 if evidence.get("matching_sql_executed") else 0.0, | |
| "max": 0.20, | |
| "passed": bool(evidence.get("matching_sql_executed")), | |
| }, | |
| "report_data": { | |
| "score": 0.20 if report_exact_and_proven else 0.05 if report_data["matches_expected"] else 0.0, | |
| "max": 0.20, | |
| "passed": report_exact_and_proven, | |
| }, | |
| "formatter": { | |
| "score": 0.25 if formatter_exact_and_proven else 0.05 if formatter["runs"] else 0.0, | |
| "max": 0.25, | |
| "passed": formatter_exact_and_proven, | |
| }, | |
| "email": { | |
| "score": email["score"], | |
| "max": 0.35, | |
| "passed": email["passed"], | |
| }, | |
| } | |
| score = round(sum(component["score"] for component in components.values()), 4) | |
| details: dict[str, Any] = { | |
| "target_date": scenario.target_date, | |
| "expected_recipient": scenario.recipient, | |
| "expected_subject": scenario.subject, | |
| "report_data": report_data["details"], | |
| "formatter": formatter["details"], | |
| "email": email["details"], | |
| "evidence": evidence, | |
| "components": components, | |
| } | |
| if score == 1.0: | |
| details["reason"] = "Perfect - the seeded SQL slice, JSON output, formatter run, and final email all align." | |
| elif score >= 0.55: | |
| details["reason"] = "Strong progress - some of the seeded workflow is correct, but provenance is incomplete." | |
| elif score > 0: | |
| details["reason"] = "Partial progress - artifacts exist, but the end-to-end incident workflow is not proven." | |
| else: | |
| details["reason"] = "The seeded hard task is still unsolved." | |
| return score, details | |
| def _inspect_task_2_source(source: str) -> dict[str, Any]: | |
| try: | |
| tree = ast.parse(source) | |
| except SyntaxError as exc: | |
| return {"passed": False, "error": str(exc), "has_function": False} | |
| functions = [node for node in tree.body if isinstance(node, ast.FunctionDef)] | |
| target = next((node for node in functions if node.name == "process_data_stream"), None) | |
| passed = target is not None and len(target.args.args) == 1 | |
| return {"passed": passed, "has_function": target is not None} | |
| def _run_task_2_case_check( | |
| workspace_dir: str, | |
| batch: tuple[dict[str, Any], ...], | |
| expected: tuple[dict[str, Any], ...], | |
| ) -> dict[str, Any]: | |
| wrapper = f""" | |
| import importlib.util | |
| import json | |
| spec = importlib.util.spec_from_file_location("candidate_pipeline", "broken_pipeline.py") | |
| module = importlib.util.module_from_spec(spec) | |
| assert spec.loader is not None | |
| spec.loader.exec_module(module) | |
| batch = {json.dumps(list(batch))} | |
| results = module.process_data_stream(batch) | |
| print("__RESULT__=" + json.dumps(results)) | |
| """ | |
| result = run_python_code( | |
| wrapper, | |
| cwd=workspace_dir, | |
| timeout_s=SCRIPT_TIMEOUT_S, | |
| stdout_limit=INTERNAL_STDOUT_LIMIT, | |
| stderr_limit=INTERNAL_STDERR_LIMIT, | |
| ) | |
| payload = next( | |
| ( | |
| line[len("__RESULT__=") :] | |
| for line in result.stdout.splitlines() | |
| if line.startswith("__RESULT__=") | |
| ), | |
| "", | |
| ) | |
| try: | |
| parsed = json.loads(payload) if payload else None | |
| except json.JSONDecodeError: | |
| parsed = None | |
| normalised = normalize_task_2_output_rows(parsed) | |
| ok = result.returncode == 0 and normalised == list(expected) | |
| return { | |
| "passed": ok, | |
| "timed_out": result.timed_out, | |
| "stdout": result.stdout[:500], | |
| "stderr": result.stderr[:500], | |
| "actual": normalised, | |
| } | |
| def _run_task_2_hidden_tests( | |
| workspace_dir: str, | |
| hidden_cases: tuple[tuple[dict[str, Any], ...], ...], | |
| hidden_expected: tuple[tuple[dict[str, Any], ...], ...], | |
| ) -> dict[str, Any]: | |
| wrapper = f""" | |
| import importlib.util | |
| import json | |
| spec = importlib.util.spec_from_file_location("candidate_pipeline", "broken_pipeline.py") | |
| module = importlib.util.module_from_spec(spec) | |
| assert spec.loader is not None | |
| spec.loader.exec_module(module) | |
| cases = {json.dumps([list(batch) for batch in hidden_cases])} | |
| results = [module.process_data_stream(case) for case in cases] | |
| print("__RESULT__=" + json.dumps(results)) | |
| """ | |
| result = run_python_code( | |
| wrapper, | |
| cwd=workspace_dir, | |
| timeout_s=SCRIPT_TIMEOUT_S, | |
| stdout_limit=INTERNAL_STDOUT_LIMIT, | |
| stderr_limit=INTERNAL_STDERR_LIMIT, | |
| ) | |
| payload = next( | |
| ( | |
| line[len("__RESULT__=") :] | |
| for line in result.stdout.splitlines() | |
| if line.startswith("__RESULT__=") | |
| ), | |
| "", | |
| ) | |
| try: | |
| parsed = json.loads(payload) if payload else None | |
| except json.JSONDecodeError: | |
| parsed = None | |
| if not isinstance(parsed, list): | |
| parsed = [] | |
| actual_batches = [ | |
| normalize_task_2_output_rows(batch) | |
| for batch in parsed | |
| ] | |
| expected = [list(batch) for batch in hidden_expected] | |
| case_passes = [ | |
| actual == expected_case | |
| for actual, expected_case in zip(actual_batches, expected, strict=False) | |
| ] | |
| if len(case_passes) < len(expected): | |
| case_passes.extend([False] * (len(expected) - len(case_passes))) | |
| pass_fraction = ( | |
| sum(1 for passed in case_passes if passed) / len(expected) | |
| if expected | |
| else 0.0 | |
| ) | |
| return { | |
| "passed": result.returncode == 0 and len(actual_batches) == len(expected) and all(case_passes), | |
| "timed_out": result.timed_out, | |
| "stdout": result.stdout[:500], | |
| "stderr": result.stderr[:500], | |
| "actual": actual_batches, | |
| "case_passes": case_passes, | |
| "pass_fraction": round(pass_fraction, 4), | |
| } | |
| def _load_task_3_data( | |
| workspace_dir: str, expected_rows: list[dict[str, Any]] | |
| ) -> dict[str, Any]: | |
| report_json = os.path.join(workspace_dir, "report_data.json") | |
| if not os.path.isfile(report_json): | |
| return { | |
| "matches_expected": False, | |
| "details": {"exists": False, "reason": "report_data.json not found."}, | |
| } | |
| try: | |
| with open(report_json, encoding="utf-8") as f: | |
| payload = json.load(f) | |
| except (OSError, json.JSONDecodeError) as exc: | |
| return { | |
| "matches_expected": False, | |
| "details": {"exists": True, "reason": str(exc)}, | |
| } | |
| if not isinstance(payload, list): | |
| return { | |
| "matches_expected": False, | |
| "details": { | |
| "exists": True, | |
| "reason": "report_data.json must contain a JSON list.", | |
| }, | |
| } | |
| rows = normalize_task_3_rows(payload, require_headcount=True) | |
| matches_expected = bool(rows) and task_3_data_matches_expected( | |
| rows, | |
| expected_rows, | |
| require_headcount=True, | |
| ) | |
| semantic_fraction = task_3_semantic_match_fraction_rows(rows, expected_rows) | |
| return { | |
| "matches_expected": matches_expected, | |
| "details": { | |
| "exists": True, | |
| "rows_valid": bool(rows), | |
| "rows_match_expected": matches_expected, | |
| "semantic_fraction": round(semantic_fraction, 4), | |
| }, | |
| } | |
| def _run_task_3_formatter( | |
| workspace_dir: str, | |
| expected_rows: list[dict[str, Any]], | |
| target_date: str, | |
| ) -> dict[str, Any]: | |
| script = os.path.join(workspace_dir, "format_report.py") | |
| if not os.path.isfile(script): | |
| return { | |
| "runs": False, | |
| "matches_expected": False, | |
| "details": {"reason": "format_report.py not found."}, | |
| } | |
| try: | |
| result = run_python_script( | |
| "format_report.py", | |
| cwd=workspace_dir, | |
| args=["report_data.json"], | |
| timeout_s=SCRIPT_TIMEOUT_S, | |
| stdout_limit=INTERNAL_STDOUT_LIMIT, | |
| stderr_limit=INTERNAL_STDERR_LIMIT, | |
| ) | |
| except Exception as exc: | |
| return { | |
| "runs": False, | |
| "matches_expected": False, | |
| "details": {"reason": str(exc)}, | |
| } | |
| if result.timed_out: | |
| return { | |
| "runs": False, | |
| "matches_expected": False, | |
| "details": {"reason": "Formatter timed out."}, | |
| } | |
| stdout = (result.stdout or "").strip() | |
| matches_expected = result.returncode == 0 and report_matches_expected( | |
| stdout, | |
| expected_rows, | |
| target_date, | |
| ) | |
| return { | |
| "runs": result.returncode == 0, | |
| "matches_expected": matches_expected, | |
| "details": { | |
| "exit_code": result.returncode, | |
| "stdout": stdout[:500], | |
| "stderr": (result.stderr or "")[:500], | |
| "semantic_fraction": round( | |
| task_3_semantic_match_fraction_text(stdout, expected_rows, target_date), | |
| 4, | |
| ), | |
| }, | |
| } | |
| def _score_task_3_email( | |
| env: DataOpsEnvironment, expected_report: str | |
| ) -> dict[str, Any]: | |
| scenario = env.scenario.task_3 | |
| assert scenario is not None | |
| evidence = env.evidence.get("task_3", {}) | |
| outbox = env.email_outbox | |
| if not outbox: | |
| return { | |
| "score": 0.0, | |
| "passed": False, | |
| "details": {"reason": "No email sent."}, | |
| } | |
| email = outbox[-1] | |
| recipient_ok = email.get("to_email") == scenario.recipient | |
| subject_ok = email.get("subject") == scenario.subject | |
| body = str(email.get("body", "")).strip() | |
| body_ok = body == expected_report.strip() | |
| proven = bool(evidence.get("email_matches_formatter_output")) and len(outbox) == 1 | |
| score = 0.0 | |
| if recipient_ok: | |
| score += 0.05 | |
| if subject_ok: | |
| score += 0.05 | |
| if body_ok and proven: | |
| score += 0.25 | |
| return { | |
| "score": score, | |
| "passed": score == 0.35, | |
| "details": { | |
| "emails_sent": len(outbox), | |
| "recipient_ok": recipient_ok, | |
| "subject_ok": subject_ok, | |
| "body_ok": body_ok, | |
| "proven": proven, | |
| "semantic_fraction": round( | |
| task_3_semantic_match_fraction_text( | |
| body, | |
| list(scenario.expected_rows), | |
| scenario.target_date, | |
| ), | |
| 4, | |
| ), | |
| }, | |
| } | |
| def _current_transactions_rows(db_path: str) -> list[dict[str, Any]]: | |
| with sqlite3.connect(db_path) as conn: | |
| conn.row_factory = sqlite3.Row | |
| table_exists = conn.execute( | |
| "SELECT name FROM sqlite_master WHERE type='table' AND name='transactions'" | |
| ).fetchone() | |
| if not table_exists: | |
| return [] | |
| rows = conn.execute( | |
| "SELECT id, user_id, amount, status FROM transactions ORDER BY id" | |
| ).fetchall() | |
| return [ | |
| { | |
| "id": int(row["id"]), | |
| "user_id": int(row["user_id"]), | |
| "amount": None if row["amount"] is None else round(float(row["amount"]), 2), | |
| "status": str(row["status"]), | |
| } | |
| for row in rows | |
| ] | |