dataops-env / tests /test_grading.py
visheshrathi's picture
Upload folder using huggingface_hub
a1b343c verified
"""High-signal regression tests for seeded grading and public API shape."""
from __future__ import annotations
import json
import shutil
import sqlite3
import sys
from pathlib import Path
from fastapi.testclient import TestClient
_ROOT = Path(__file__).resolve().parents[1]
if str(_ROOT) not in sys.path:
sys.path.insert(0, str(_ROOT))
from models import DataOpsAction # noqa: E402
from server.app import app # noqa: E402
from server.dataops_env_environment import DataOpsEnvironment # noqa: E402
from server.grading import evaluate_task # noqa: E402
from server.task_specs import build_task_3_report # noqa: E402
def _fixed_pipeline_script(visible_batch: list[dict[str, object]]) -> str:
return f'''\
import json
def process_data_stream(payloads):
processed_records = []
for payload in payloads:
if payload["status"] != "ready" or int(payload["amount_cents"]) <= 0:
continue
amount_usd = round(int(payload["amount_cents"]) / 100.0, 2)
priority_band = (
"high"
if int(payload["priority"]) >= 8 or amount_usd >= 500.0
else "normal"
)
processed_records.append(
{{
"order_id": payload["order_id"],
"region": payload["region"],
"amount_usd": amount_usd,
"priority_band": priority_band,
}}
)
processed_records.sort(key=lambda item: (-item["amount_usd"], item["order_id"]))
return processed_records
if __name__ == "__main__":
mock_batch = {visible_batch!r}
print(json.dumps(process_data_stream(mock_batch), indent=2, sort_keys=True))
'''
def _visible_only_pipeline_stub(
visible_batch: list[dict[str, object]],
visible_expected: list[dict[str, object]],
) -> str:
return f'''\
import json
def process_data_stream(payloads):
visible = {visible_batch!r}
if payloads == visible:
return {visible_expected!r}
return []
if __name__ == "__main__":
print(json.dumps({visible_expected!r}, indent=2, sort_keys=True))
'''
def _fixed_format_script(target_date: str) -> str:
return f'''\
import json
import sys
def format_report(input_path):
with open(input_path, encoding="utf-8") as f:
records = json.load(f)
lines = ["=== Daily Revenue Report ({target_date}) ===", ""]
total_revenue = 0.0
for rec in records:
dept = rec["department"]
rev = float(rec["revenue"])
exp = float(rec["expenses"])
net = rev - exp
lines.append(f"Department: {{dept}}")
lines.append(f" Revenue: ${{rev:.2f}}")
lines.append(f" Expenses: ${{exp:.2f}}")
lines.append(f" Net: ${{net:.2f}}")
lines.append("")
total_revenue += rev
lines.append(f"Total Revenue: ${{total_revenue:.2f}}")
lines.append("=== End of Report ===")
out = "\\n".join(lines)
print(out)
return out
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python format_report.py <input.json>", file=sys.stderr)
sys.exit(1)
format_report(sys.argv[1])
'''
def test_seeded_task_3_scenario_is_deterministic() -> None:
env_a = DataOpsEnvironment()
env_b = DataOpsEnvironment()
try:
env_a.reset(task_id="task_3_hard_e2e", seed=17)
env_b.reset(task_id="task_3_hard_e2e", seed=17)
assert env_a.scenario.task_3 == env_b.scenario.task_3
finally:
env_a.close()
env_b.close()
def test_task_1_perfect_score_seeded() -> None:
env = DataOpsEnvironment()
env.reset(task_id="task_1_easy_anomaly", seed=7)
try:
obs = env.step(
DataOpsAction(
action_type="ExecuteSQL",
payload={"query": "DELETE FROM transactions WHERE amount IS NULL"},
)
)
assert obs.status == "success"
out = evaluate_task("task_1_easy_anomaly", env)
assert out["score"] == 1.0
finally:
env.close()
shutil.rmtree(env.workspace_dir, ignore_errors=True)
def test_task_1_seeded_valid_rows_include_non_null_edge_amounts() -> None:
env = DataOpsEnvironment()
env.reset(task_id="task_1_easy_anomaly", seed=7)
try:
scenario = env.scenario.task_1
assert scenario is not None
amounts = [float(row["amount"]) for row in scenario.expected_rows]
assert any(amount == 0.0 for amount in amounts)
assert any(amount < 0.0 for amount in amounts)
finally:
env.close()
shutil.rmtree(env.workspace_dir, ignore_errors=True)
def test_task_1_rewriting_corrupted_rows_scores_zero() -> None:
env = DataOpsEnvironment()
env.reset(task_id="task_1_easy_anomaly", seed=7)
try:
with sqlite3.connect(env.db_path) as conn:
conn.execute("UPDATE transactions SET amount = 0 WHERE amount IS NULL")
conn.commit()
out = evaluate_task("task_1_easy_anomaly", env)
assert out["score"] == 0.0
finally:
env.close()
shutil.rmtree(env.workspace_dir, ignore_errors=True)
def test_task_1_deleting_non_null_adjustments_is_penalized() -> None:
env = DataOpsEnvironment()
env.reset(task_id="task_1_easy_anomaly", seed=7)
try:
obs = env.step(
DataOpsAction(
action_type="ExecuteSQL",
payload={
"query": "DELETE FROM transactions WHERE amount IS NULL OR amount <= 0"
},
)
)
assert obs.status == "success"
assert obs.reward is not None and obs.reward < 0
out = evaluate_task("task_1_easy_anomaly", env)
assert out["score"] == 0.0
finally:
env.close()
shutil.rmtree(env.workspace_dir, ignore_errors=True)
def test_reset_only_scores_zero_across_tasks() -> None:
for task_id in (
"task_1_easy_anomaly",
"task_2_medium_syntax",
"task_3_hard_e2e",
):
env = DataOpsEnvironment()
try:
env.reset(task_id=task_id, seed=7)
out = evaluate_task(task_id, env)
assert out["score"] == 0.0
finally:
env.close()
shutil.rmtree(env.workspace_dir, ignore_errors=True)
def test_task_1_broad_delete_with_where_is_penalized() -> None:
env = DataOpsEnvironment()
env.reset(task_id="task_1_easy_anomaly", seed=7)
try:
obs = env.step(
DataOpsAction(
action_type="ExecuteSQL",
payload={
"query": "DELETE FROM transactions WHERE amount IS NULL OR 1 = 1"
},
)
)
assert obs.status == "success"
assert obs.reward is not None and obs.reward < 0
assert env.evidence["task_1"]["destructive_sql_attempted"] is True
out = evaluate_task("task_1_easy_anomaly", env)
assert out["score"] == 0.0
finally:
env.close()
shutil.rmtree(env.workspace_dir, ignore_errors=True)
def test_task_2_script_run_does_not_inherit_server_secrets(monkeypatch) -> None:
monkeypatch.setenv("API_KEY", "super-secret-value")
env = DataOpsEnvironment()
env.reset(task_id="task_2_medium_syntax", seed=11)
script = """\
import json
import os
def process_data_stream(payloads):
return []
if __name__ == "__main__":
print(json.dumps({"api_key": os.getenv("API_KEY"), "home": os.getenv("HOME")}))
"""
try:
env.step(
DataOpsAction(
action_type="WriteFile",
payload={"filepath": "broken_pipeline.py", "content": script},
)
)
run_obs = env.step(
DataOpsAction(
action_type="RunScript",
payload={"filepath": "broken_pipeline.py", "args": []},
)
)
assert run_obs.status == "success"
payload = json.loads((run_obs.stdout or "").strip())
assert payload["api_key"] is None
assert payload["home"] == env.workspace_dir
finally:
env.close()
shutil.rmtree(env.workspace_dir, ignore_errors=True)
def test_task_2_perfect_score_seeded() -> None:
env = DataOpsEnvironment()
env.reset(task_id="task_2_medium_syntax", seed=11)
scenario = env.scenario.task_2
assert scenario is not None
try:
read_obs = env.step(
DataOpsAction(
action_type="ReadFile",
payload={"filepath": "broken_pipeline.py"},
)
)
assert read_obs.status == "success"
write_obs = env.step(
DataOpsAction(
action_type="WriteFile",
payload={
"filepath": "broken_pipeline.py",
"content": _fixed_pipeline_script(list(scenario.visible_batch)),
},
)
)
assert write_obs.status == "success"
pre_run = evaluate_task("task_2_medium_syntax", env)
assert 0.0 < pre_run["score"] < 1.0
run_obs = env.step(
DataOpsAction(
action_type="RunScript",
payload={"filepath": "broken_pipeline.py", "args": []},
)
)
assert run_obs.status == "success"
out = evaluate_task("task_2_medium_syntax", env)
assert out["score"] == 1.0
finally:
env.close()
shutil.rmtree(env.workspace_dir, ignore_errors=True)
def test_task_2_print_only_stub_does_not_get_full_credit() -> None:
env = DataOpsEnvironment()
env.reset(task_id="task_2_medium_syntax", seed=11)
scenario = env.scenario.task_2
assert scenario is not None
stub = _visible_only_pipeline_stub(
list(scenario.visible_batch),
list(scenario.visible_expected),
)
try:
env.step(
DataOpsAction(
action_type="WriteFile",
payload={"filepath": "broken_pipeline.py", "content": stub},
)
)
out = evaluate_task("task_2_medium_syntax", env)
assert out["score"] < 0.5
finally:
env.close()
shutil.rmtree(env.workspace_dir, ignore_errors=True)
def test_task_3_sql_policy_rejects_literal_table_name_bypass() -> None:
env = DataOpsEnvironment()
env.reset(task_id="task_3_hard_e2e", seed=19)
try:
obs = env.step(
DataOpsAction(
action_type="ExecuteSQL",
payload={
"query": (
"SELECT name FROM sqlite_master "
"WHERE 'daily_reports' = 'daily_reports'"
)
},
)
)
assert obs.status == "error"
assert "disallowed" in obs.message.lower()
finally:
env.close()
shutil.rmtree(env.workspace_dir, ignore_errors=True)
def test_task_3_sql_policy_allows_cte_queries_over_daily_reports() -> None:
env = DataOpsEnvironment()
env.reset(task_id="task_3_hard_e2e", seed=19)
scenario = env.scenario.task_3
assert scenario is not None
try:
obs = env.step(
DataOpsAction(
action_type="ExecuteSQL",
payload={
"query": (
"WITH scoped AS ("
"SELECT department, revenue, expenses, headcount "
"FROM daily_reports "
f"WHERE report_date = '{scenario.target_date}'"
") "
"SELECT department, revenue, expenses, headcount "
"FROM scoped ORDER BY department"
)
},
)
)
assert obs.status == "success"
assert obs.sql_results
finally:
env.close()
shutil.rmtree(env.workspace_dir, ignore_errors=True)
def test_task_3_perfect_score_requires_proven_workflow() -> None:
env = DataOpsEnvironment()
env.reset(task_id="task_3_hard_e2e", seed=19)
scenario = env.scenario.task_3
assert scenario is not None
try:
query = (
"SELECT department, revenue, expenses, headcount "
"FROM daily_reports "
f"WHERE report_date = '{scenario.target_date}' "
"ORDER BY department"
)
sql_obs = env.step(
DataOpsAction(
action_type="ExecuteSQL",
payload={"query": query},
)
)
assert sql_obs.status == "success"
rows = sql_obs.sql_results
assert rows is not None
write_json = env.step(
DataOpsAction(
action_type="WriteFile",
payload={"filepath": "report_data.json", "content": json.dumps(rows)},
)
)
assert write_json.status == "success"
write_script = env.step(
DataOpsAction(
action_type="WriteFile",
payload={
"filepath": "format_report.py",
"content": _fixed_format_script(scenario.target_date),
},
)
)
assert write_script.status == "success"
run_obs = env.step(
DataOpsAction(
action_type="RunScript",
payload={"filepath": "format_report.py", "args": ["report_data.json"]},
)
)
assert run_obs.status == "success"
body = (run_obs.stdout or "").strip()
email_obs = env.step(
DataOpsAction(
action_type="SendEmail",
payload={
"to_email": scenario.recipient,
"subject": scenario.subject,
"body": body,
},
)
)
assert email_obs.status == "success"
out = evaluate_task("task_3_hard_e2e", env)
assert out["score"] == 1.0
finally:
env.close()
shutil.rmtree(env.workspace_dir, ignore_errors=True)
def test_task_3_equivalent_relative_input_path_still_scores_perfect() -> None:
env = DataOpsEnvironment()
env.reset(task_id="task_3_hard_e2e", seed=29)
scenario = env.scenario.task_3
assert scenario is not None
try:
query = (
"SELECT department, revenue, expenses, headcount "
"FROM daily_reports "
f"WHERE report_date = '{scenario.target_date}' "
"ORDER BY department"
)
sql_obs = env.step(
DataOpsAction(
action_type="ExecuteSQL",
payload={"query": query},
)
)
assert sql_obs.status == "success"
rows = sql_obs.sql_results
assert rows is not None
env.step(
DataOpsAction(
action_type="WriteFile",
payload={"filepath": "report_data.json", "content": json.dumps(rows)},
)
)
env.step(
DataOpsAction(
action_type="WriteFile",
payload={
"filepath": "format_report.py",
"content": _fixed_format_script(scenario.target_date),
},
)
)
run_obs = env.step(
DataOpsAction(
action_type="RunScript",
payload={"filepath": "format_report.py", "args": ["./report_data.json"]},
)
)
assert run_obs.status == "success"
env.step(
DataOpsAction(
action_type="SendEmail",
payload={
"to_email": scenario.recipient,
"subject": scenario.subject,
"body": (run_obs.stdout or "").strip(),
},
)
)
out = evaluate_task("task_3_hard_e2e", env)
assert out["score"] == 1.0
finally:
env.close()
shutil.rmtree(env.workspace_dir, ignore_errors=True)
def test_task_3_fabricated_email_only_scores_low() -> None:
env = DataOpsEnvironment()
env.reset(task_id="task_3_hard_e2e", seed=23)
scenario = env.scenario.task_3
assert scenario is not None
try:
fake_body = build_task_3_report(list(scenario.expected_rows), scenario.target_date)
email_obs = env.step(
DataOpsAction(
action_type="SendEmail",
payload={
"to_email": scenario.recipient,
"subject": scenario.subject,
"body": fake_body,
},
)
)
assert email_obs.status == "success"
out = evaluate_task("task_3_hard_e2e", env)
assert out["score"] <= 0.10
finally:
env.close()
shutil.rmtree(env.workspace_dir, ignore_errors=True)
def test_task_3_reading_formatter_source_awards_progress_signal() -> None:
env = DataOpsEnvironment()
env.reset(task_id="task_3_hard_e2e", seed=31)
try:
obs = env.step(
DataOpsAction(
action_type="ReadFile",
payload={"filepath": "format_report.py"},
)
)
assert obs.status == "success"
assert obs.reward is not None and obs.reward > 0
finally:
env.close()
shutil.rmtree(env.workspace_dir, ignore_errors=True)
def test_tasks_endpoint_exposes_manifest_metadata() -> None:
with TestClient(app) as client:
response = client.get("/tasks")
payload = response.json()
assert response.status_code == 200
assert len(payload["tasks"]) == 3
assert payload["tasks"][0]["difficulty"] == "easy"
assert "action_schema" in payload
def test_public_grader_hides_details_by_default(monkeypatch) -> None:
# Do not leak grader details when PUBLIC_GRADER_DETAILS is unset/false (ignore dev .env).
monkeypatch.setenv("PUBLIC_GRADER_DETAILS", "false")
with TestClient(app) as client:
reset = client.post("/reset?task_id=task_1_easy_anomaly", json={"seed": 5})
assert reset.status_code == 200
grade = client.get("/grader")
assert grade.status_code == 200
payload = grade.json()
assert "score" in payload
assert 0.0 < float(payload["score"]) < 1.0
assert "details" not in payload