dataops-env / server /grading.py
visheshrathi's picture
Upload folder using huggingface_hub
f89b1ac verified
"""Terminal graders for the seeded DataOpsEnv benchmark."""
from __future__ import annotations
import ast
import json
import logging
import os
import sqlite3
from typing import Any
from server.dataops_env_environment import DataOpsEnvironment
from server.safe_exec import run_python_code, run_python_script
from server.task_specs import (
build_task_3_report,
normalize_task_2_output_rows,
normalize_task_3_rows,
report_matches_expected,
task_3_data_matches_expected,
task_3_semantic_match_fraction_rows,
task_3_semantic_match_fraction_text,
)
logger = logging.getLogger(__name__)
SCRIPT_TIMEOUT_S = 10
INTERNAL_STDOUT_LIMIT = 50_000
INTERNAL_STDERR_LIMIT = 10_000
def evaluate_task(task_id: str, env: DataOpsEnvironment) -> dict[str, Any]:
graders = {
"task_1_easy_anomaly": _grade_task_1,
"task_2_medium_syntax": _grade_task_2,
"task_3_hard_e2e": _grade_task_3,
}
grader = graders.get(task_id)
if grader is None:
return {"task_id": task_id, "score": 0.0, "details": {"error": "Unknown task"}}
score, details = grader(env)
return {"task_id": task_id, "score": round(score, 2), "details": details}
def _grade_task_1(env: DataOpsEnvironment) -> tuple[float, dict[str, Any]]:
if env.scenario.task_1 is None:
return 0.0, {"error": "Task 1 scenario missing."}
try:
actual_rows = _current_transactions_rows(env.db_path)
except Exception:
logger.exception("Task 1 grading error")
return 0.0, {"error": "Internal grading error."}
expected_rows = list(env.scenario.task_1.expected_rows)
corrupted_ids = set(env.scenario.task_1.corrupted_row_ids)
actual_ids = {row["id"] for row in actual_rows}
expected_ids = {row["id"] for row in expected_rows}
corrupted_remaining = sorted(actual_ids & corrupted_ids)
rewritten_corrupted = [
row for row in actual_rows if row["id"] in corrupted_ids and row["amount"] is not None
]
valid_rows_intact = all(
any(actual == expected for actual in actual_rows) for expected in expected_rows
)
details: dict[str, Any] = {
"expected_row_ids": sorted(expected_ids),
"actual_row_ids": sorted(actual_ids),
"corrupted_row_ids": sorted(corrupted_ids),
"corrupted_remaining": corrupted_remaining,
"valid_rows_intact": valid_rows_intact,
}
if actual_rows == expected_rows:
details["reason"] = "Perfect - corrupted rows were deleted and all valid rows were preserved."
details["components"] = {
"exact_cleanup": {"score": 1.0, "max": 1.0, "passed": True},
}
return 1.0, details
if rewritten_corrupted:
details["reason"] = "Corrupted rows were rewritten instead of being deleted."
details["components"] = {
"exact_cleanup": {"score": 0.0, "max": 1.0, "passed": False},
}
return 0.0, details
if valid_rows_intact and corrupted_remaining:
fraction_removed = 1.0 - (len(corrupted_remaining) / max(len(corrupted_ids), 1))
score = round(0.25 * max(fraction_removed, 0.0), 4)
details["reason"] = "Some corrupted rows were removed, but cleanup is incomplete."
details["components"] = {
"partial_cleanup": {"score": score, "max": 0.25, "passed": False},
}
return score, details
details["reason"] = "The transaction table does not match the required cleaned state."
details["components"] = {
"exact_cleanup": {"score": 0.0, "max": 1.0, "passed": False},
}
return 0.0, details
def _grade_task_2(env: DataOpsEnvironment) -> tuple[float, dict[str, Any]]:
if env.scenario.task_2 is None:
return 0.0, {"error": "Task 2 scenario missing."}
script = os.path.join(env.workspace_dir, "broken_pipeline.py")
if not os.path.isfile(script):
return 0.0, {
"reason": "broken_pipeline.py not found.",
"components": {
"script_present": {"score": 0.0, "max": 1.0, "passed": False},
},
}
try:
with open(script, encoding="utf-8") as f:
source = f.read()
static = _inspect_task_2_source(source)
main_result = run_python_script(
"broken_pipeline.py",
cwd=env.workspace_dir,
args=[],
timeout_s=SCRIPT_TIMEOUT_S,
stdout_limit=INTERNAL_STDOUT_LIMIT,
stderr_limit=INTERNAL_STDERR_LIMIT,
)
visible_result = _run_task_2_case_check(
env.workspace_dir,
env.scenario.task_2.visible_batch,
env.scenario.task_2.visible_expected,
)
hidden_result = _run_task_2_hidden_tests(
env.workspace_dir,
env.scenario.task_2.hidden_cases,
env.scenario.task_2.hidden_expected,
)
except Exception:
logger.exception("Task 2 grading error")
return 0.0, {"error": "Internal grading error."}
if main_result.timed_out or visible_result["timed_out"] or hidden_result["timed_out"]:
return 0.0, {"reason": "Script timed out.", "components": {}}
hidden_score = round(0.60 * hidden_result["pass_fraction"], 4)
visible_score = 0.25 if visible_result["passed"] and main_result.returncode == 0 else 0.0
execution_score = 0.15 if env.evidence.get("task_2", {}).get("verified_fix") else 0.0
components: dict[str, Any] = {
"hidden_functional": {
"score": hidden_score,
"max": 0.60,
"passed": hidden_result["passed"],
},
"visible_pipeline": {
"score": visible_score,
"max": 0.25,
"passed": visible_result["passed"] and main_result.returncode == 0,
},
"execution_provenance": {
"score": execution_score,
"max": 0.15,
"passed": bool(env.evidence.get("task_2", {}).get("verified_fix")),
},
}
score = round(sum(component["score"] for component in components.values()), 4)
details = {
"main_exit_code": main_result.returncode,
"main_stdout": main_result.stdout[:500],
"main_stderr": main_result.stderr[:500],
"visible_batch_ok": visible_result["passed"],
"hidden_tests_passed": hidden_result["passed"],
"hidden_pass_fraction": hidden_result["pass_fraction"],
"hidden_case_passes": hidden_result["case_passes"],
"static_checks": static,
"components": components,
}
if score == 1.0:
details["reason"] = "Seeded hidden tests and the visible verification run both pass."
elif hidden_result["passed"] and main_result.returncode == 0:
details["reason"] = "The ETL transform is correct, but the agent never verified it through the run action."
elif hidden_result["pass_fraction"] > 0 and main_result.returncode == 0:
details["reason"] = "The repair improves the ETL transform, but it still fails some seeded cases."
elif hidden_result["pass_fraction"] > 0:
details["reason"] = "The core transform improved, but the runnable script entrypoint still drifts."
elif main_result.returncode == 0:
details["reason"] = "The script runs, but it does not yet produce the required normalized records."
else:
details["reason"] = "The repair is still incorrect or incomplete."
return score, details
def _grade_task_3(env: DataOpsEnvironment) -> tuple[float, dict[str, Any]]:
if env.scenario.task_3 is None:
return 0.0, {"error": "Task 3 scenario missing."}
scenario = env.scenario.task_3
evidence = env.evidence.get("task_3", {})
expected_rows = list(scenario.expected_rows)
expected_report = build_task_3_report(expected_rows, scenario.target_date)
report_data = _load_task_3_data(env.workspace_dir, expected_rows)
formatter = _run_task_3_formatter(env.workspace_dir, expected_rows, scenario.target_date)
email = _score_task_3_email(env, expected_report)
report_exact_and_proven = bool(
report_data["matches_expected"] and evidence.get("report_data_matches_sql")
)
formatter_exact_and_proven = bool(
formatter["matches_expected"] and evidence.get("format_output_matches_expected")
)
components: dict[str, Any] = {
"sql_provenance": {
"score": 0.20 if evidence.get("matching_sql_executed") else 0.0,
"max": 0.20,
"passed": bool(evidence.get("matching_sql_executed")),
},
"report_data": {
"score": 0.20 if report_exact_and_proven else 0.05 if report_data["matches_expected"] else 0.0,
"max": 0.20,
"passed": report_exact_and_proven,
},
"formatter": {
"score": 0.25 if formatter_exact_and_proven else 0.05 if formatter["runs"] else 0.0,
"max": 0.25,
"passed": formatter_exact_and_proven,
},
"email": {
"score": email["score"],
"max": 0.35,
"passed": email["passed"],
},
}
score = round(sum(component["score"] for component in components.values()), 4)
details: dict[str, Any] = {
"target_date": scenario.target_date,
"expected_recipient": scenario.recipient,
"expected_subject": scenario.subject,
"report_data": report_data["details"],
"formatter": formatter["details"],
"email": email["details"],
"evidence": evidence,
"components": components,
}
if score == 1.0:
details["reason"] = "Perfect - the seeded SQL slice, JSON output, formatter run, and final email all align."
elif score >= 0.55:
details["reason"] = "Strong progress - some of the seeded workflow is correct, but provenance is incomplete."
elif score > 0:
details["reason"] = "Partial progress - artifacts exist, but the end-to-end incident workflow is not proven."
else:
details["reason"] = "The seeded hard task is still unsolved."
return score, details
def _inspect_task_2_source(source: str) -> dict[str, Any]:
try:
tree = ast.parse(source)
except SyntaxError as exc:
return {"passed": False, "error": str(exc), "has_function": False}
functions = [node for node in tree.body if isinstance(node, ast.FunctionDef)]
target = next((node for node in functions if node.name == "process_data_stream"), None)
passed = target is not None and len(target.args.args) == 1
return {"passed": passed, "has_function": target is not None}
def _run_task_2_case_check(
workspace_dir: str,
batch: tuple[dict[str, Any], ...],
expected: tuple[dict[str, Any], ...],
) -> dict[str, Any]:
wrapper = f"""
import importlib.util
import json
spec = importlib.util.spec_from_file_location("candidate_pipeline", "broken_pipeline.py")
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
batch = {json.dumps(list(batch))}
results = module.process_data_stream(batch)
print("__RESULT__=" + json.dumps(results))
"""
result = run_python_code(
wrapper,
cwd=workspace_dir,
timeout_s=SCRIPT_TIMEOUT_S,
stdout_limit=INTERNAL_STDOUT_LIMIT,
stderr_limit=INTERNAL_STDERR_LIMIT,
)
payload = next(
(
line[len("__RESULT__=") :]
for line in result.stdout.splitlines()
if line.startswith("__RESULT__=")
),
"",
)
try:
parsed = json.loads(payload) if payload else None
except json.JSONDecodeError:
parsed = None
normalised = normalize_task_2_output_rows(parsed)
ok = result.returncode == 0 and normalised == list(expected)
return {
"passed": ok,
"timed_out": result.timed_out,
"stdout": result.stdout[:500],
"stderr": result.stderr[:500],
"actual": normalised,
}
def _run_task_2_hidden_tests(
workspace_dir: str,
hidden_cases: tuple[tuple[dict[str, Any], ...], ...],
hidden_expected: tuple[tuple[dict[str, Any], ...], ...],
) -> dict[str, Any]:
wrapper = f"""
import importlib.util
import json
spec = importlib.util.spec_from_file_location("candidate_pipeline", "broken_pipeline.py")
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
cases = {json.dumps([list(batch) for batch in hidden_cases])}
results = [module.process_data_stream(case) for case in cases]
print("__RESULT__=" + json.dumps(results))
"""
result = run_python_code(
wrapper,
cwd=workspace_dir,
timeout_s=SCRIPT_TIMEOUT_S,
stdout_limit=INTERNAL_STDOUT_LIMIT,
stderr_limit=INTERNAL_STDERR_LIMIT,
)
payload = next(
(
line[len("__RESULT__=") :]
for line in result.stdout.splitlines()
if line.startswith("__RESULT__=")
),
"",
)
try:
parsed = json.loads(payload) if payload else None
except json.JSONDecodeError:
parsed = None
if not isinstance(parsed, list):
parsed = []
actual_batches = [
normalize_task_2_output_rows(batch)
for batch in parsed
]
expected = [list(batch) for batch in hidden_expected]
case_passes = [
actual == expected_case
for actual, expected_case in zip(actual_batches, expected, strict=False)
]
if len(case_passes) < len(expected):
case_passes.extend([False] * (len(expected) - len(case_passes)))
pass_fraction = (
sum(1 for passed in case_passes if passed) / len(expected)
if expected
else 0.0
)
return {
"passed": result.returncode == 0 and len(actual_batches) == len(expected) and all(case_passes),
"timed_out": result.timed_out,
"stdout": result.stdout[:500],
"stderr": result.stderr[:500],
"actual": actual_batches,
"case_passes": case_passes,
"pass_fraction": round(pass_fraction, 4),
}
def _load_task_3_data(
workspace_dir: str, expected_rows: list[dict[str, Any]]
) -> dict[str, Any]:
report_json = os.path.join(workspace_dir, "report_data.json")
if not os.path.isfile(report_json):
return {
"matches_expected": False,
"details": {"exists": False, "reason": "report_data.json not found."},
}
try:
with open(report_json, encoding="utf-8") as f:
payload = json.load(f)
except (OSError, json.JSONDecodeError) as exc:
return {
"matches_expected": False,
"details": {"exists": True, "reason": str(exc)},
}
if not isinstance(payload, list):
return {
"matches_expected": False,
"details": {
"exists": True,
"reason": "report_data.json must contain a JSON list.",
},
}
rows = normalize_task_3_rows(payload, require_headcount=True)
matches_expected = bool(rows) and task_3_data_matches_expected(
rows,
expected_rows,
require_headcount=True,
)
semantic_fraction = task_3_semantic_match_fraction_rows(rows, expected_rows)
return {
"matches_expected": matches_expected,
"details": {
"exists": True,
"rows_valid": bool(rows),
"rows_match_expected": matches_expected,
"semantic_fraction": round(semantic_fraction, 4),
},
}
def _run_task_3_formatter(
workspace_dir: str,
expected_rows: list[dict[str, Any]],
target_date: str,
) -> dict[str, Any]:
script = os.path.join(workspace_dir, "format_report.py")
if not os.path.isfile(script):
return {
"runs": False,
"matches_expected": False,
"details": {"reason": "format_report.py not found."},
}
try:
result = run_python_script(
"format_report.py",
cwd=workspace_dir,
args=["report_data.json"],
timeout_s=SCRIPT_TIMEOUT_S,
stdout_limit=INTERNAL_STDOUT_LIMIT,
stderr_limit=INTERNAL_STDERR_LIMIT,
)
except Exception as exc:
return {
"runs": False,
"matches_expected": False,
"details": {"reason": str(exc)},
}
if result.timed_out:
return {
"runs": False,
"matches_expected": False,
"details": {"reason": "Formatter timed out."},
}
stdout = (result.stdout or "").strip()
matches_expected = result.returncode == 0 and report_matches_expected(
stdout,
expected_rows,
target_date,
)
return {
"runs": result.returncode == 0,
"matches_expected": matches_expected,
"details": {
"exit_code": result.returncode,
"stdout": stdout[:500],
"stderr": (result.stderr or "")[:500],
"semantic_fraction": round(
task_3_semantic_match_fraction_text(stdout, expected_rows, target_date),
4,
),
},
}
def _score_task_3_email(
env: DataOpsEnvironment, expected_report: str
) -> dict[str, Any]:
scenario = env.scenario.task_3
assert scenario is not None
evidence = env.evidence.get("task_3", {})
outbox = env.email_outbox
if not outbox:
return {
"score": 0.0,
"passed": False,
"details": {"reason": "No email sent."},
}
email = outbox[-1]
recipient_ok = email.get("to_email") == scenario.recipient
subject_ok = email.get("subject") == scenario.subject
body = str(email.get("body", "")).strip()
body_ok = body == expected_report.strip()
proven = bool(evidence.get("email_matches_formatter_output")) and len(outbox) == 1
score = 0.0
if recipient_ok:
score += 0.05
if subject_ok:
score += 0.05
if body_ok and proven:
score += 0.25
return {
"score": score,
"passed": score == 0.35,
"details": {
"emails_sent": len(outbox),
"recipient_ok": recipient_ok,
"subject_ok": subject_ok,
"body_ok": body_ok,
"proven": proven,
"semantic_fraction": round(
task_3_semantic_match_fraction_text(
body,
list(scenario.expected_rows),
scenario.target_date,
),
4,
),
},
}
def _current_transactions_rows(db_path: str) -> list[dict[str, Any]]:
with sqlite3.connect(db_path) as conn:
conn.row_factory = sqlite3.Row
table_exists = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='transactions'"
).fetchone()
if not table_exists:
return []
rows = conn.execute(
"SELECT id, user_id, amount, status FROM transactions ORDER BY id"
).fetchall()
return [
{
"id": int(row["id"]),
"user_id": int(row["user_id"]),
"amount": None if row["amount"] is None else round(float(row["amount"]), 2),
"status": str(row["status"]),
}
for row in rows
]