Spaces:
Sleeping
Sleeping
| """High-signal regression tests for seeded grading and public API shape.""" | |
| from __future__ import annotations | |
| import json | |
| import shutil | |
| import sqlite3 | |
| import sys | |
| from pathlib import Path | |
| from fastapi.testclient import TestClient | |
| _ROOT = Path(__file__).resolve().parents[1] | |
| if str(_ROOT) not in sys.path: | |
| sys.path.insert(0, str(_ROOT)) | |
| from models import DataOpsAction # noqa: E402 | |
| from server.app import app # noqa: E402 | |
| from server.dataops_env_environment import DataOpsEnvironment # noqa: E402 | |
| from server.grading import evaluate_task # noqa: E402 | |
| from server.task_specs import build_task_3_report # noqa: E402 | |
| def _fixed_pipeline_script(visible_batch: list[dict[str, object]]) -> str: | |
| return f'''\ | |
| import json | |
| def process_data_stream(payloads): | |
| processed_records = [] | |
| for payload in payloads: | |
| if payload["status"] != "ready" or int(payload["amount_cents"]) <= 0: | |
| continue | |
| amount_usd = round(int(payload["amount_cents"]) / 100.0, 2) | |
| priority_band = ( | |
| "high" | |
| if int(payload["priority"]) >= 8 or amount_usd >= 500.0 | |
| else "normal" | |
| ) | |
| processed_records.append( | |
| {{ | |
| "order_id": payload["order_id"], | |
| "region": payload["region"], | |
| "amount_usd": amount_usd, | |
| "priority_band": priority_band, | |
| }} | |
| ) | |
| processed_records.sort(key=lambda item: (-item["amount_usd"], item["order_id"])) | |
| return processed_records | |
| if __name__ == "__main__": | |
| mock_batch = {visible_batch!r} | |
| print(json.dumps(process_data_stream(mock_batch), indent=2, sort_keys=True)) | |
| ''' | |
| def _visible_only_pipeline_stub( | |
| visible_batch: list[dict[str, object]], | |
| visible_expected: list[dict[str, object]], | |
| ) -> str: | |
| return f'''\ | |
| import json | |
| def process_data_stream(payloads): | |
| visible = {visible_batch!r} | |
| if payloads == visible: | |
| return {visible_expected!r} | |
| return [] | |
| if __name__ == "__main__": | |
| print(json.dumps({visible_expected!r}, indent=2, sort_keys=True)) | |
| ''' | |
| def _fixed_format_script(target_date: str) -> str: | |
| return f'''\ | |
| import json | |
| import sys | |
| def format_report(input_path): | |
| with open(input_path, encoding="utf-8") as f: | |
| records = json.load(f) | |
| lines = ["=== Daily Revenue Report ({target_date}) ===", ""] | |
| total_revenue = 0.0 | |
| for rec in records: | |
| dept = rec["department"] | |
| rev = float(rec["revenue"]) | |
| exp = float(rec["expenses"]) | |
| net = rev - exp | |
| lines.append(f"Department: {{dept}}") | |
| lines.append(f" Revenue: ${{rev:.2f}}") | |
| lines.append(f" Expenses: ${{exp:.2f}}") | |
| lines.append(f" Net: ${{net:.2f}}") | |
| lines.append("") | |
| total_revenue += rev | |
| lines.append(f"Total Revenue: ${{total_revenue:.2f}}") | |
| lines.append("=== End of Report ===") | |
| out = "\\n".join(lines) | |
| print(out) | |
| return out | |
| if __name__ == "__main__": | |
| if len(sys.argv) < 2: | |
| print("Usage: python format_report.py <input.json>", file=sys.stderr) | |
| sys.exit(1) | |
| format_report(sys.argv[1]) | |
| ''' | |
| def test_seeded_task_3_scenario_is_deterministic() -> None: | |
| env_a = DataOpsEnvironment() | |
| env_b = DataOpsEnvironment() | |
| try: | |
| env_a.reset(task_id="task_3_hard_e2e", seed=17) | |
| env_b.reset(task_id="task_3_hard_e2e", seed=17) | |
| assert env_a.scenario.task_3 == env_b.scenario.task_3 | |
| finally: | |
| env_a.close() | |
| env_b.close() | |
| def test_task_1_perfect_score_seeded() -> None: | |
| env = DataOpsEnvironment() | |
| env.reset(task_id="task_1_easy_anomaly", seed=7) | |
| try: | |
| obs = env.step( | |
| DataOpsAction( | |
| action_type="ExecuteSQL", | |
| payload={"query": "DELETE FROM transactions WHERE amount IS NULL"}, | |
| ) | |
| ) | |
| assert obs.status == "success" | |
| out = evaluate_task("task_1_easy_anomaly", env) | |
| assert out["score"] == 1.0 | |
| finally: | |
| env.close() | |
| shutil.rmtree(env.workspace_dir, ignore_errors=True) | |
| def test_task_1_seeded_valid_rows_include_non_null_edge_amounts() -> None: | |
| env = DataOpsEnvironment() | |
| env.reset(task_id="task_1_easy_anomaly", seed=7) | |
| try: | |
| scenario = env.scenario.task_1 | |
| assert scenario is not None | |
| amounts = [float(row["amount"]) for row in scenario.expected_rows] | |
| assert any(amount == 0.0 for amount in amounts) | |
| assert any(amount < 0.0 for amount in amounts) | |
| finally: | |
| env.close() | |
| shutil.rmtree(env.workspace_dir, ignore_errors=True) | |
| def test_task_1_rewriting_corrupted_rows_scores_zero() -> None: | |
| env = DataOpsEnvironment() | |
| env.reset(task_id="task_1_easy_anomaly", seed=7) | |
| try: | |
| with sqlite3.connect(env.db_path) as conn: | |
| conn.execute("UPDATE transactions SET amount = 0 WHERE amount IS NULL") | |
| conn.commit() | |
| out = evaluate_task("task_1_easy_anomaly", env) | |
| assert out["score"] == 0.0 | |
| finally: | |
| env.close() | |
| shutil.rmtree(env.workspace_dir, ignore_errors=True) | |
| def test_task_1_deleting_non_null_adjustments_is_penalized() -> None: | |
| env = DataOpsEnvironment() | |
| env.reset(task_id="task_1_easy_anomaly", seed=7) | |
| try: | |
| obs = env.step( | |
| DataOpsAction( | |
| action_type="ExecuteSQL", | |
| payload={ | |
| "query": "DELETE FROM transactions WHERE amount IS NULL OR amount <= 0" | |
| }, | |
| ) | |
| ) | |
| assert obs.status == "success" | |
| assert obs.reward is not None and obs.reward < 0 | |
| out = evaluate_task("task_1_easy_anomaly", env) | |
| assert out["score"] == 0.0 | |
| finally: | |
| env.close() | |
| shutil.rmtree(env.workspace_dir, ignore_errors=True) | |
| def test_reset_only_scores_zero_across_tasks() -> None: | |
| for task_id in ( | |
| "task_1_easy_anomaly", | |
| "task_2_medium_syntax", | |
| "task_3_hard_e2e", | |
| ): | |
| env = DataOpsEnvironment() | |
| try: | |
| env.reset(task_id=task_id, seed=7) | |
| out = evaluate_task(task_id, env) | |
| assert out["score"] == 0.0 | |
| finally: | |
| env.close() | |
| shutil.rmtree(env.workspace_dir, ignore_errors=True) | |
| def test_task_1_broad_delete_with_where_is_penalized() -> None: | |
| env = DataOpsEnvironment() | |
| env.reset(task_id="task_1_easy_anomaly", seed=7) | |
| try: | |
| obs = env.step( | |
| DataOpsAction( | |
| action_type="ExecuteSQL", | |
| payload={ | |
| "query": "DELETE FROM transactions WHERE amount IS NULL OR 1 = 1" | |
| }, | |
| ) | |
| ) | |
| assert obs.status == "success" | |
| assert obs.reward is not None and obs.reward < 0 | |
| assert env.evidence["task_1"]["destructive_sql_attempted"] is True | |
| out = evaluate_task("task_1_easy_anomaly", env) | |
| assert out["score"] == 0.0 | |
| finally: | |
| env.close() | |
| shutil.rmtree(env.workspace_dir, ignore_errors=True) | |
| def test_task_2_script_run_does_not_inherit_server_secrets(monkeypatch) -> None: | |
| monkeypatch.setenv("API_KEY", "super-secret-value") | |
| env = DataOpsEnvironment() | |
| env.reset(task_id="task_2_medium_syntax", seed=11) | |
| script = """\ | |
| import json | |
| import os | |
| def process_data_stream(payloads): | |
| return [] | |
| if __name__ == "__main__": | |
| print(json.dumps({"api_key": os.getenv("API_KEY"), "home": os.getenv("HOME")})) | |
| """ | |
| try: | |
| env.step( | |
| DataOpsAction( | |
| action_type="WriteFile", | |
| payload={"filepath": "broken_pipeline.py", "content": script}, | |
| ) | |
| ) | |
| run_obs = env.step( | |
| DataOpsAction( | |
| action_type="RunScript", | |
| payload={"filepath": "broken_pipeline.py", "args": []}, | |
| ) | |
| ) | |
| assert run_obs.status == "success" | |
| payload = json.loads((run_obs.stdout or "").strip()) | |
| assert payload["api_key"] is None | |
| assert payload["home"] == env.workspace_dir | |
| finally: | |
| env.close() | |
| shutil.rmtree(env.workspace_dir, ignore_errors=True) | |
| def test_task_2_perfect_score_seeded() -> None: | |
| env = DataOpsEnvironment() | |
| env.reset(task_id="task_2_medium_syntax", seed=11) | |
| scenario = env.scenario.task_2 | |
| assert scenario is not None | |
| try: | |
| read_obs = env.step( | |
| DataOpsAction( | |
| action_type="ReadFile", | |
| payload={"filepath": "broken_pipeline.py"}, | |
| ) | |
| ) | |
| assert read_obs.status == "success" | |
| write_obs = env.step( | |
| DataOpsAction( | |
| action_type="WriteFile", | |
| payload={ | |
| "filepath": "broken_pipeline.py", | |
| "content": _fixed_pipeline_script(list(scenario.visible_batch)), | |
| }, | |
| ) | |
| ) | |
| assert write_obs.status == "success" | |
| pre_run = evaluate_task("task_2_medium_syntax", env) | |
| assert 0.0 < pre_run["score"] < 1.0 | |
| run_obs = env.step( | |
| DataOpsAction( | |
| action_type="RunScript", | |
| payload={"filepath": "broken_pipeline.py", "args": []}, | |
| ) | |
| ) | |
| assert run_obs.status == "success" | |
| out = evaluate_task("task_2_medium_syntax", env) | |
| assert out["score"] == 1.0 | |
| finally: | |
| env.close() | |
| shutil.rmtree(env.workspace_dir, ignore_errors=True) | |
| def test_task_2_print_only_stub_does_not_get_full_credit() -> None: | |
| env = DataOpsEnvironment() | |
| env.reset(task_id="task_2_medium_syntax", seed=11) | |
| scenario = env.scenario.task_2 | |
| assert scenario is not None | |
| stub = _visible_only_pipeline_stub( | |
| list(scenario.visible_batch), | |
| list(scenario.visible_expected), | |
| ) | |
| try: | |
| env.step( | |
| DataOpsAction( | |
| action_type="WriteFile", | |
| payload={"filepath": "broken_pipeline.py", "content": stub}, | |
| ) | |
| ) | |
| out = evaluate_task("task_2_medium_syntax", env) | |
| assert out["score"] < 0.5 | |
| finally: | |
| env.close() | |
| shutil.rmtree(env.workspace_dir, ignore_errors=True) | |
| def test_task_3_sql_policy_rejects_literal_table_name_bypass() -> None: | |
| env = DataOpsEnvironment() | |
| env.reset(task_id="task_3_hard_e2e", seed=19) | |
| try: | |
| obs = env.step( | |
| DataOpsAction( | |
| action_type="ExecuteSQL", | |
| payload={ | |
| "query": ( | |
| "SELECT name FROM sqlite_master " | |
| "WHERE 'daily_reports' = 'daily_reports'" | |
| ) | |
| }, | |
| ) | |
| ) | |
| assert obs.status == "error" | |
| assert "disallowed" in obs.message.lower() | |
| finally: | |
| env.close() | |
| shutil.rmtree(env.workspace_dir, ignore_errors=True) | |
| def test_task_3_sql_policy_allows_cte_queries_over_daily_reports() -> None: | |
| env = DataOpsEnvironment() | |
| env.reset(task_id="task_3_hard_e2e", seed=19) | |
| scenario = env.scenario.task_3 | |
| assert scenario is not None | |
| try: | |
| obs = env.step( | |
| DataOpsAction( | |
| action_type="ExecuteSQL", | |
| payload={ | |
| "query": ( | |
| "WITH scoped AS (" | |
| "SELECT department, revenue, expenses, headcount " | |
| "FROM daily_reports " | |
| f"WHERE report_date = '{scenario.target_date}'" | |
| ") " | |
| "SELECT department, revenue, expenses, headcount " | |
| "FROM scoped ORDER BY department" | |
| ) | |
| }, | |
| ) | |
| ) | |
| assert obs.status == "success" | |
| assert obs.sql_results | |
| finally: | |
| env.close() | |
| shutil.rmtree(env.workspace_dir, ignore_errors=True) | |
| def test_task_3_perfect_score_requires_proven_workflow() -> None: | |
| env = DataOpsEnvironment() | |
| env.reset(task_id="task_3_hard_e2e", seed=19) | |
| scenario = env.scenario.task_3 | |
| assert scenario is not None | |
| try: | |
| query = ( | |
| "SELECT department, revenue, expenses, headcount " | |
| "FROM daily_reports " | |
| f"WHERE report_date = '{scenario.target_date}' " | |
| "ORDER BY department" | |
| ) | |
| sql_obs = env.step( | |
| DataOpsAction( | |
| action_type="ExecuteSQL", | |
| payload={"query": query}, | |
| ) | |
| ) | |
| assert sql_obs.status == "success" | |
| rows = sql_obs.sql_results | |
| assert rows is not None | |
| write_json = env.step( | |
| DataOpsAction( | |
| action_type="WriteFile", | |
| payload={"filepath": "report_data.json", "content": json.dumps(rows)}, | |
| ) | |
| ) | |
| assert write_json.status == "success" | |
| write_script = env.step( | |
| DataOpsAction( | |
| action_type="WriteFile", | |
| payload={ | |
| "filepath": "format_report.py", | |
| "content": _fixed_format_script(scenario.target_date), | |
| }, | |
| ) | |
| ) | |
| assert write_script.status == "success" | |
| run_obs = env.step( | |
| DataOpsAction( | |
| action_type="RunScript", | |
| payload={"filepath": "format_report.py", "args": ["report_data.json"]}, | |
| ) | |
| ) | |
| assert run_obs.status == "success" | |
| body = (run_obs.stdout or "").strip() | |
| email_obs = env.step( | |
| DataOpsAction( | |
| action_type="SendEmail", | |
| payload={ | |
| "to_email": scenario.recipient, | |
| "subject": scenario.subject, | |
| "body": body, | |
| }, | |
| ) | |
| ) | |
| assert email_obs.status == "success" | |
| out = evaluate_task("task_3_hard_e2e", env) | |
| assert out["score"] == 1.0 | |
| finally: | |
| env.close() | |
| shutil.rmtree(env.workspace_dir, ignore_errors=True) | |
| def test_task_3_equivalent_relative_input_path_still_scores_perfect() -> None: | |
| env = DataOpsEnvironment() | |
| env.reset(task_id="task_3_hard_e2e", seed=29) | |
| scenario = env.scenario.task_3 | |
| assert scenario is not None | |
| try: | |
| query = ( | |
| "SELECT department, revenue, expenses, headcount " | |
| "FROM daily_reports " | |
| f"WHERE report_date = '{scenario.target_date}' " | |
| "ORDER BY department" | |
| ) | |
| sql_obs = env.step( | |
| DataOpsAction( | |
| action_type="ExecuteSQL", | |
| payload={"query": query}, | |
| ) | |
| ) | |
| assert sql_obs.status == "success" | |
| rows = sql_obs.sql_results | |
| assert rows is not None | |
| env.step( | |
| DataOpsAction( | |
| action_type="WriteFile", | |
| payload={"filepath": "report_data.json", "content": json.dumps(rows)}, | |
| ) | |
| ) | |
| env.step( | |
| DataOpsAction( | |
| action_type="WriteFile", | |
| payload={ | |
| "filepath": "format_report.py", | |
| "content": _fixed_format_script(scenario.target_date), | |
| }, | |
| ) | |
| ) | |
| run_obs = env.step( | |
| DataOpsAction( | |
| action_type="RunScript", | |
| payload={"filepath": "format_report.py", "args": ["./report_data.json"]}, | |
| ) | |
| ) | |
| assert run_obs.status == "success" | |
| env.step( | |
| DataOpsAction( | |
| action_type="SendEmail", | |
| payload={ | |
| "to_email": scenario.recipient, | |
| "subject": scenario.subject, | |
| "body": (run_obs.stdout or "").strip(), | |
| }, | |
| ) | |
| ) | |
| out = evaluate_task("task_3_hard_e2e", env) | |
| assert out["score"] == 1.0 | |
| finally: | |
| env.close() | |
| shutil.rmtree(env.workspace_dir, ignore_errors=True) | |
| def test_task_3_fabricated_email_only_scores_low() -> None: | |
| env = DataOpsEnvironment() | |
| env.reset(task_id="task_3_hard_e2e", seed=23) | |
| scenario = env.scenario.task_3 | |
| assert scenario is not None | |
| try: | |
| fake_body = build_task_3_report(list(scenario.expected_rows), scenario.target_date) | |
| email_obs = env.step( | |
| DataOpsAction( | |
| action_type="SendEmail", | |
| payload={ | |
| "to_email": scenario.recipient, | |
| "subject": scenario.subject, | |
| "body": fake_body, | |
| }, | |
| ) | |
| ) | |
| assert email_obs.status == "success" | |
| out = evaluate_task("task_3_hard_e2e", env) | |
| assert out["score"] <= 0.10 | |
| finally: | |
| env.close() | |
| shutil.rmtree(env.workspace_dir, ignore_errors=True) | |
| def test_task_3_reading_formatter_source_awards_progress_signal() -> None: | |
| env = DataOpsEnvironment() | |
| env.reset(task_id="task_3_hard_e2e", seed=31) | |
| try: | |
| obs = env.step( | |
| DataOpsAction( | |
| action_type="ReadFile", | |
| payload={"filepath": "format_report.py"}, | |
| ) | |
| ) | |
| assert obs.status == "success" | |
| assert obs.reward is not None and obs.reward > 0 | |
| finally: | |
| env.close() | |
| shutil.rmtree(env.workspace_dir, ignore_errors=True) | |
| def test_tasks_endpoint_exposes_manifest_metadata() -> None: | |
| with TestClient(app) as client: | |
| response = client.get("/tasks") | |
| payload = response.json() | |
| assert response.status_code == 200 | |
| assert len(payload["tasks"]) == 3 | |
| assert payload["tasks"][0]["difficulty"] == "easy" | |
| assert "action_schema" in payload | |
| def test_public_grader_hides_details_by_default(monkeypatch) -> None: | |
| # Do not leak grader details when PUBLIC_GRADER_DETAILS is unset/false (ignore dev .env). | |
| monkeypatch.setenv("PUBLIC_GRADER_DETAILS", "false") | |
| with TestClient(app) as client: | |
| reset = client.post("/reset?task_id=task_1_easy_anomaly", json={"seed": 5}) | |
| assert reset.status_code == 200 | |
| grade = client.get("/grader") | |
| assert grade.status_code == 200 | |
| payload = grade.json() | |
| assert "score" in payload | |
| assert 0.0 < float(payload["score"]) < 1.0 | |
| assert "details" not in payload | |