"""High-signal regression tests for seeded grading and public API shape.""" from __future__ import annotations import json import shutil import sqlite3 import sys from pathlib import Path from fastapi.testclient import TestClient _ROOT = Path(__file__).resolve().parents[1] if str(_ROOT) not in sys.path: sys.path.insert(0, str(_ROOT)) from models import DataOpsAction # noqa: E402 from server.app import app # noqa: E402 from server.dataops_env_environment import DataOpsEnvironment # noqa: E402 from server.grading import evaluate_task # noqa: E402 from server.task_specs import build_task_3_report # noqa: E402 def _fixed_pipeline_script(visible_batch: list[dict[str, object]]) -> str: return f'''\ import json def process_data_stream(payloads): processed_records = [] for payload in payloads: if payload["status"] != "ready" or int(payload["amount_cents"]) <= 0: continue amount_usd = round(int(payload["amount_cents"]) / 100.0, 2) priority_band = ( "high" if int(payload["priority"]) >= 8 or amount_usd >= 500.0 else "normal" ) processed_records.append( {{ "order_id": payload["order_id"], "region": payload["region"], "amount_usd": amount_usd, "priority_band": priority_band, }} ) processed_records.sort(key=lambda item: (-item["amount_usd"], item["order_id"])) return processed_records if __name__ == "__main__": mock_batch = {visible_batch!r} print(json.dumps(process_data_stream(mock_batch), indent=2, sort_keys=True)) ''' def _visible_only_pipeline_stub( visible_batch: list[dict[str, object]], visible_expected: list[dict[str, object]], ) -> str: return f'''\ import json def process_data_stream(payloads): visible = {visible_batch!r} if payloads == visible: return {visible_expected!r} return [] if __name__ == "__main__": print(json.dumps({visible_expected!r}, indent=2, sort_keys=True)) ''' def _fixed_format_script(target_date: str) -> str: return f'''\ import json import sys def format_report(input_path): with open(input_path, encoding="utf-8") as f: records = json.load(f) lines = ["=== Daily Revenue Report ({target_date}) ===", ""] total_revenue = 0.0 for rec in records: dept = rec["department"] rev = float(rec["revenue"]) exp = float(rec["expenses"]) net = rev - exp lines.append(f"Department: {{dept}}") lines.append(f" Revenue: ${{rev:.2f}}") lines.append(f" Expenses: ${{exp:.2f}}") lines.append(f" Net: ${{net:.2f}}") lines.append("") total_revenue += rev lines.append(f"Total Revenue: ${{total_revenue:.2f}}") lines.append("=== End of Report ===") out = "\\n".join(lines) print(out) return out if __name__ == "__main__": if len(sys.argv) < 2: print("Usage: python format_report.py ", file=sys.stderr) sys.exit(1) format_report(sys.argv[1]) ''' def test_seeded_task_3_scenario_is_deterministic() -> None: env_a = DataOpsEnvironment() env_b = DataOpsEnvironment() try: env_a.reset(task_id="task_3_hard_e2e", seed=17) env_b.reset(task_id="task_3_hard_e2e", seed=17) assert env_a.scenario.task_3 == env_b.scenario.task_3 finally: env_a.close() env_b.close() def test_task_1_perfect_score_seeded() -> None: env = DataOpsEnvironment() env.reset(task_id="task_1_easy_anomaly", seed=7) try: obs = env.step( DataOpsAction( action_type="ExecuteSQL", payload={"query": "DELETE FROM transactions WHERE amount IS NULL"}, ) ) assert obs.status == "success" out = evaluate_task("task_1_easy_anomaly", env) assert out["score"] == 1.0 finally: env.close() shutil.rmtree(env.workspace_dir, ignore_errors=True) def test_task_1_seeded_valid_rows_include_non_null_edge_amounts() -> None: env = DataOpsEnvironment() env.reset(task_id="task_1_easy_anomaly", seed=7) try: scenario = env.scenario.task_1 assert scenario is not None amounts = [float(row["amount"]) for row in scenario.expected_rows] assert any(amount == 0.0 for amount in amounts) assert any(amount < 0.0 for amount in amounts) finally: env.close() shutil.rmtree(env.workspace_dir, ignore_errors=True) def test_task_1_rewriting_corrupted_rows_scores_zero() -> None: env = DataOpsEnvironment() env.reset(task_id="task_1_easy_anomaly", seed=7) try: with sqlite3.connect(env.db_path) as conn: conn.execute("UPDATE transactions SET amount = 0 WHERE amount IS NULL") conn.commit() out = evaluate_task("task_1_easy_anomaly", env) assert out["score"] == 0.0 finally: env.close() shutil.rmtree(env.workspace_dir, ignore_errors=True) def test_task_1_deleting_non_null_adjustments_is_penalized() -> None: env = DataOpsEnvironment() env.reset(task_id="task_1_easy_anomaly", seed=7) try: obs = env.step( DataOpsAction( action_type="ExecuteSQL", payload={ "query": "DELETE FROM transactions WHERE amount IS NULL OR amount <= 0" }, ) ) assert obs.status == "success" assert obs.reward is not None and obs.reward < 0 out = evaluate_task("task_1_easy_anomaly", env) assert out["score"] == 0.0 finally: env.close() shutil.rmtree(env.workspace_dir, ignore_errors=True) def test_reset_only_scores_zero_across_tasks() -> None: for task_id in ( "task_1_easy_anomaly", "task_2_medium_syntax", "task_3_hard_e2e", ): env = DataOpsEnvironment() try: env.reset(task_id=task_id, seed=7) out = evaluate_task(task_id, env) assert out["score"] == 0.0 finally: env.close() shutil.rmtree(env.workspace_dir, ignore_errors=True) def test_task_1_broad_delete_with_where_is_penalized() -> None: env = DataOpsEnvironment() env.reset(task_id="task_1_easy_anomaly", seed=7) try: obs = env.step( DataOpsAction( action_type="ExecuteSQL", payload={ "query": "DELETE FROM transactions WHERE amount IS NULL OR 1 = 1" }, ) ) assert obs.status == "success" assert obs.reward is not None and obs.reward < 0 assert env.evidence["task_1"]["destructive_sql_attempted"] is True out = evaluate_task("task_1_easy_anomaly", env) assert out["score"] == 0.0 finally: env.close() shutil.rmtree(env.workspace_dir, ignore_errors=True) def test_task_2_script_run_does_not_inherit_server_secrets(monkeypatch) -> None: monkeypatch.setenv("API_KEY", "super-secret-value") env = DataOpsEnvironment() env.reset(task_id="task_2_medium_syntax", seed=11) script = """\ import json import os def process_data_stream(payloads): return [] if __name__ == "__main__": print(json.dumps({"api_key": os.getenv("API_KEY"), "home": os.getenv("HOME")})) """ try: env.step( DataOpsAction( action_type="WriteFile", payload={"filepath": "broken_pipeline.py", "content": script}, ) ) run_obs = env.step( DataOpsAction( action_type="RunScript", payload={"filepath": "broken_pipeline.py", "args": []}, ) ) assert run_obs.status == "success" payload = json.loads((run_obs.stdout or "").strip()) assert payload["api_key"] is None assert payload["home"] == env.workspace_dir finally: env.close() shutil.rmtree(env.workspace_dir, ignore_errors=True) def test_task_2_perfect_score_seeded() -> None: env = DataOpsEnvironment() env.reset(task_id="task_2_medium_syntax", seed=11) scenario = env.scenario.task_2 assert scenario is not None try: read_obs = env.step( DataOpsAction( action_type="ReadFile", payload={"filepath": "broken_pipeline.py"}, ) ) assert read_obs.status == "success" write_obs = env.step( DataOpsAction( action_type="WriteFile", payload={ "filepath": "broken_pipeline.py", "content": _fixed_pipeline_script(list(scenario.visible_batch)), }, ) ) assert write_obs.status == "success" pre_run = evaluate_task("task_2_medium_syntax", env) assert 0.0 < pre_run["score"] < 1.0 run_obs = env.step( DataOpsAction( action_type="RunScript", payload={"filepath": "broken_pipeline.py", "args": []}, ) ) assert run_obs.status == "success" out = evaluate_task("task_2_medium_syntax", env) assert out["score"] == 1.0 finally: env.close() shutil.rmtree(env.workspace_dir, ignore_errors=True) def test_task_2_print_only_stub_does_not_get_full_credit() -> None: env = DataOpsEnvironment() env.reset(task_id="task_2_medium_syntax", seed=11) scenario = env.scenario.task_2 assert scenario is not None stub = _visible_only_pipeline_stub( list(scenario.visible_batch), list(scenario.visible_expected), ) try: env.step( DataOpsAction( action_type="WriteFile", payload={"filepath": "broken_pipeline.py", "content": stub}, ) ) out = evaluate_task("task_2_medium_syntax", env) assert out["score"] < 0.5 finally: env.close() shutil.rmtree(env.workspace_dir, ignore_errors=True) def test_task_3_sql_policy_rejects_literal_table_name_bypass() -> None: env = DataOpsEnvironment() env.reset(task_id="task_3_hard_e2e", seed=19) try: obs = env.step( DataOpsAction( action_type="ExecuteSQL", payload={ "query": ( "SELECT name FROM sqlite_master " "WHERE 'daily_reports' = 'daily_reports'" ) }, ) ) assert obs.status == "error" assert "disallowed" in obs.message.lower() finally: env.close() shutil.rmtree(env.workspace_dir, ignore_errors=True) def test_task_3_sql_policy_allows_cte_queries_over_daily_reports() -> None: env = DataOpsEnvironment() env.reset(task_id="task_3_hard_e2e", seed=19) scenario = env.scenario.task_3 assert scenario is not None try: obs = env.step( DataOpsAction( action_type="ExecuteSQL", payload={ "query": ( "WITH scoped AS (" "SELECT department, revenue, expenses, headcount " "FROM daily_reports " f"WHERE report_date = '{scenario.target_date}'" ") " "SELECT department, revenue, expenses, headcount " "FROM scoped ORDER BY department" ) }, ) ) assert obs.status == "success" assert obs.sql_results finally: env.close() shutil.rmtree(env.workspace_dir, ignore_errors=True) def test_task_3_perfect_score_requires_proven_workflow() -> None: env = DataOpsEnvironment() env.reset(task_id="task_3_hard_e2e", seed=19) scenario = env.scenario.task_3 assert scenario is not None try: query = ( "SELECT department, revenue, expenses, headcount " "FROM daily_reports " f"WHERE report_date = '{scenario.target_date}' " "ORDER BY department" ) sql_obs = env.step( DataOpsAction( action_type="ExecuteSQL", payload={"query": query}, ) ) assert sql_obs.status == "success" rows = sql_obs.sql_results assert rows is not None write_json = env.step( DataOpsAction( action_type="WriteFile", payload={"filepath": "report_data.json", "content": json.dumps(rows)}, ) ) assert write_json.status == "success" write_script = env.step( DataOpsAction( action_type="WriteFile", payload={ "filepath": "format_report.py", "content": _fixed_format_script(scenario.target_date), }, ) ) assert write_script.status == "success" run_obs = env.step( DataOpsAction( action_type="RunScript", payload={"filepath": "format_report.py", "args": ["report_data.json"]}, ) ) assert run_obs.status == "success" body = (run_obs.stdout or "").strip() email_obs = env.step( DataOpsAction( action_type="SendEmail", payload={ "to_email": scenario.recipient, "subject": scenario.subject, "body": body, }, ) ) assert email_obs.status == "success" out = evaluate_task("task_3_hard_e2e", env) assert out["score"] == 1.0 finally: env.close() shutil.rmtree(env.workspace_dir, ignore_errors=True) def test_task_3_equivalent_relative_input_path_still_scores_perfect() -> None: env = DataOpsEnvironment() env.reset(task_id="task_3_hard_e2e", seed=29) scenario = env.scenario.task_3 assert scenario is not None try: query = ( "SELECT department, revenue, expenses, headcount " "FROM daily_reports " f"WHERE report_date = '{scenario.target_date}' " "ORDER BY department" ) sql_obs = env.step( DataOpsAction( action_type="ExecuteSQL", payload={"query": query}, ) ) assert sql_obs.status == "success" rows = sql_obs.sql_results assert rows is not None env.step( DataOpsAction( action_type="WriteFile", payload={"filepath": "report_data.json", "content": json.dumps(rows)}, ) ) env.step( DataOpsAction( action_type="WriteFile", payload={ "filepath": "format_report.py", "content": _fixed_format_script(scenario.target_date), }, ) ) run_obs = env.step( DataOpsAction( action_type="RunScript", payload={"filepath": "format_report.py", "args": ["./report_data.json"]}, ) ) assert run_obs.status == "success" env.step( DataOpsAction( action_type="SendEmail", payload={ "to_email": scenario.recipient, "subject": scenario.subject, "body": (run_obs.stdout or "").strip(), }, ) ) out = evaluate_task("task_3_hard_e2e", env) assert out["score"] == 1.0 finally: env.close() shutil.rmtree(env.workspace_dir, ignore_errors=True) def test_task_3_fabricated_email_only_scores_low() -> None: env = DataOpsEnvironment() env.reset(task_id="task_3_hard_e2e", seed=23) scenario = env.scenario.task_3 assert scenario is not None try: fake_body = build_task_3_report(list(scenario.expected_rows), scenario.target_date) email_obs = env.step( DataOpsAction( action_type="SendEmail", payload={ "to_email": scenario.recipient, "subject": scenario.subject, "body": fake_body, }, ) ) assert email_obs.status == "success" out = evaluate_task("task_3_hard_e2e", env) assert out["score"] <= 0.10 finally: env.close() shutil.rmtree(env.workspace_dir, ignore_errors=True) def test_task_3_reading_formatter_source_awards_progress_signal() -> None: env = DataOpsEnvironment() env.reset(task_id="task_3_hard_e2e", seed=31) try: obs = env.step( DataOpsAction( action_type="ReadFile", payload={"filepath": "format_report.py"}, ) ) assert obs.status == "success" assert obs.reward is not None and obs.reward > 0 finally: env.close() shutil.rmtree(env.workspace_dir, ignore_errors=True) def test_tasks_endpoint_exposes_manifest_metadata() -> None: with TestClient(app) as client: response = client.get("/tasks") payload = response.json() assert response.status_code == 200 assert len(payload["tasks"]) == 3 assert payload["tasks"][0]["difficulty"] == "easy" assert "action_schema" in payload def test_public_grader_hides_details_by_default(monkeypatch) -> None: # Do not leak grader details when PUBLIC_GRADER_DETAILS is unset/false (ignore dev .env). monkeypatch.setenv("PUBLIC_GRADER_DETAILS", "false") with TestClient(app) as client: reset = client.post("/reset?task_id=task_1_easy_anomaly", json={"seed": 5}) assert reset.status_code == 200 grade = client.get("/grader") assert grade.status_code == 200 payload = grade.json() assert "score" in payload assert 0.0 < float(payload["score"]) < 1.0 assert "details" not in payload