import asyncio from fastapi.testclient import TestClient from openenv.core.env_server.mcp_types import CallToolAction from legacy_cobol_env.models import FinalSubmissionResult, RewardComponents from legacy_cobol_env.server.app import app from legacy_cobol_env.server.legacy_cobol_env_environment import ( MAX_STEPS, LegacyCobolEnvironment, ) from legacy_cobol_env.server.sandbox import CaseResult, EvaluationResult from legacy_cobol_env.tests.test_environment import GOOD_SOLUTION def call_obs(env: LegacyCobolEnvironment, tool_name: str, **arguments): return env.step(CallToolAction(tool_name=tool_name, arguments=arguments)) async def call_obs_async(env: LegacyCobolEnvironment, tool_name: str, **arguments): return await env.step_async(CallToolAction(tool_name=tool_name, arguments=arguments)) def result_data(observation): if hasattr(observation.result, "data"): return observation.result.data return observation.result def test_thirteenth_action_exceeds_max_steps_without_executing_tool(): env = LegacyCobolEnvironment() env.reset(task_id="payroll_net_pay_001") for _ in range(MAX_STEPS): obs = call_obs(env, "read_cobol_file", filename="PAYROLL.cbl") assert obs.done is False blocked = call_obs(env, "write_python_solution", code=GOOD_SOLUTION) blocked_result = result_data(blocked) assert blocked.done is True assert blocked.reward == 0.0 assert blocked_result["ok"] is False assert "max_steps" in blocked_result["error"] assert env.state.done is True assert env.state.step_count == MAX_STEPS assert env.state.draft_count == 0 assert env.state.last_tool != "write_python_solution" def test_post_done_steps_are_terminal_noops_without_state_mutation(): env = LegacyCobolEnvironment() env.reset(task_id="payroll_net_pay_001") written = result_data(call_obs(env, "write_python_solution", code=GOOD_SOLUTION)) final_obs = call_obs(env, "submit_final", draft_id=written["draft_id"]) assert final_obs.done is True state_before = env.state.model_dump() blocked = call_obs(env, "write_python_solution", code="def migrate(input_record):\n return ''\n") blocked_result = result_data(blocked) assert blocked.done is True assert blocked.reward == 0.0 assert blocked_result["ok"] is False assert "terminal" in blocked_result["error"] assert env.state.model_dump() == state_before def test_final_reward_response_is_typed_and_clamped(monkeypatch): env = LegacyCobolEnvironment() env.reset(task_id="payroll_net_pay_001") written = result_data(call_obs(env, "write_python_solution", code=GOOD_SOLUTION)) def out_of_bounds_components(hidden, fresh): return { "hidden_correctness": -0.25, "fresh_correctness": 1.25, "interface_contract": 2.0, "type_and_layout_fidelity": -1.0, "anti_hardcoding": 0.5, "safety": 9.0, } monkeypatch.setattr(env, "_reward_components", out_of_bounds_components) final = result_data(call_obs(env, "submit_final", draft_id=written["draft_id"])) typed_final = FinalSubmissionResult.model_validate(final) typed_components = RewardComponents.model_validate(final["components"]) component_values = typed_components.model_dump().values() assert isinstance(typed_final.public_score, float) assert all(isinstance(value, float) for value in component_values) assert all(0.0 <= value <= 1.0 for value in component_values) assert 0.0 <= typed_final.public_score <= 1.0 def test_fresh_timeout_counts_against_safety_component(): env = LegacyCobolEnvironment() env.reset(task_id="payroll_net_pay_001") case = CaseResult(case_id="case", passed=True, input_summary="case", actual="x", expected="x") hidden = EvaluationResult( syntax_ok=True, safety_ok=True, interface_ok=True, timed_out=False, passed=1, total=1, case_results=[case], ) fresh = EvaluationResult( syntax_ok=True, safety_ok=True, interface_ok=True, timed_out=True, passed=1, total=1, case_results=[case], ) components = RewardComponents.model_validate(env._reward_components(hidden, fresh)) assert components.safety == 0.0 def test_async_max_step_guard_matches_sync_behavior(): async def scenario(): env = LegacyCobolEnvironment() env.reset(task_id="payroll_net_pay_001") for _ in range(MAX_STEPS): obs = await call_obs_async(env, "read_cobol_file", filename="PAYROLL.cbl") assert obs.done is False blocked = await call_obs_async(env, "write_python_solution", code=GOOD_SOLUTION) return env, blocked, result_data(blocked) env, blocked, blocked_result = asyncio.run(scenario()) assert blocked.done is True assert blocked.reward == 0.0 assert blocked_result["ok"] is False assert "max_steps" in blocked_result["error"] assert env.state.step_count == MAX_STEPS assert env.state.draft_count == 0 def test_schema_exposes_project_typed_state_fields(): schema = TestClient(app).get("/schema").json() state_properties = schema["state"]["properties"] action_properties = schema["action"]["properties"] observation_properties = schema["observation"]["properties"] assert schema["action"]["title"] == "ToolActionWrapper" assert schema["observation"]["title"] == "ToolObservationWrapper" assert "tool_name" in action_properties assert "result" in observation_properties for field_name in [ "task_id", "draft_count", "visible_runs", "final_score", "reward_components", ]: assert field_name in state_properties def test_rest_reset_step_and_state_share_one_episode(): client = TestClient(app) reset = client.post("/reset", json={"task_id": "invoice_occurs_001"}) assert reset.status_code == 200 assert reset.json()["observation"]["result"]["ticket"]["task_id"] == "invoice_occurs_001" step = client.post( "/step", json={ "action": { "tool_name": "read_cobol_file", "arguments": {"filename": "INVTOTAL.cbl"}, } }, ) assert step.status_code == 200 step_body = step.json() assert step_body["observation"]["result"]["data"]["ok"] is True assert step_body["reward"] == 0.02 state = client.get("/state") assert state.status_code == 200 state_body = state.json() assert state_body["task_id"] == "invoice_occurs_001" assert state_body["files_read"] == ["INVTOTAL.cbl"] assert state_body["last_tool"] == "read_cobol_file" assert state_body["step_count"] == 1 def test_redundant_discovery_actions_do_not_repeat_progress_reward(): env = LegacyCobolEnvironment() env.reset(task_id="payroll_net_pay_001") first_read = call_obs(env, "read_cobol_file", filename="PAYROLL.cbl") duplicate_read = call_obs(env, "read_cobol_file", filename="PAYROLL.cbl") first_parse = call_obs(env, "parse_copybook_layout", filename="EMPLOYEE_PAY.cpy") duplicate_parse = call_obs(env, "parse_copybook_layout", filename="EMPLOYEE_PAY.cpy") first_rules = call_obs(env, "inspect_business_rules") duplicate_rules = call_obs(env, "inspect_business_rules") assert first_read.reward == 0.02 assert duplicate_read.reward == 0.0 assert first_parse.reward == 0.03 assert duplicate_parse.reward == 0.0 assert first_rules.reward == 0.01 assert duplicate_rules.reward == 0.0 def test_visible_test_reward_only_pays_for_new_progress(): env = LegacyCobolEnvironment() env.reset(task_id="payroll_net_pay_001") written = result_data(call_obs(env, "write_python_solution", code=GOOD_SOLUTION)) first_visible = call_obs(env, "run_visible_tests", draft_id=written["draft_id"]) repeated_visible = call_obs(env, "run_visible_tests", draft_id=written["draft_id"]) assert first_visible.reward == 0.1 assert repeated_visible.reward == 0.0