File size: 8,137 Bytes
a537615 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 | import asyncio
from fastapi.testclient import TestClient
from openenv.core.env_server.mcp_types import CallToolAction
from legacy_cobol_env.models import FinalSubmissionResult, RewardComponents
from legacy_cobol_env.server.app import app
from legacy_cobol_env.server.legacy_cobol_env_environment import (
MAX_STEPS,
LegacyCobolEnvironment,
)
from legacy_cobol_env.server.sandbox import CaseResult, EvaluationResult
from legacy_cobol_env.tests.test_environment import GOOD_SOLUTION
def call_obs(env: LegacyCobolEnvironment, tool_name: str, **arguments):
return env.step(CallToolAction(tool_name=tool_name, arguments=arguments))
async def call_obs_async(env: LegacyCobolEnvironment, tool_name: str, **arguments):
return await env.step_async(CallToolAction(tool_name=tool_name, arguments=arguments))
def result_data(observation):
if hasattr(observation.result, "data"):
return observation.result.data
return observation.result
def test_thirteenth_action_exceeds_max_steps_without_executing_tool():
env = LegacyCobolEnvironment()
env.reset(task_id="payroll_net_pay_001")
for _ in range(MAX_STEPS):
obs = call_obs(env, "read_cobol_file", filename="PAYROLL.cbl")
assert obs.done is False
blocked = call_obs(env, "write_python_solution", code=GOOD_SOLUTION)
blocked_result = result_data(blocked)
assert blocked.done is True
assert blocked.reward == 0.0
assert blocked_result["ok"] is False
assert "max_steps" in blocked_result["error"]
assert env.state.done is True
assert env.state.step_count == MAX_STEPS
assert env.state.draft_count == 0
assert env.state.last_tool != "write_python_solution"
def test_post_done_steps_are_terminal_noops_without_state_mutation():
env = LegacyCobolEnvironment()
env.reset(task_id="payroll_net_pay_001")
written = result_data(call_obs(env, "write_python_solution", code=GOOD_SOLUTION))
final_obs = call_obs(env, "submit_final", draft_id=written["draft_id"])
assert final_obs.done is True
state_before = env.state.model_dump()
blocked = call_obs(env, "write_python_solution", code="def migrate(input_record):\n return ''\n")
blocked_result = result_data(blocked)
assert blocked.done is True
assert blocked.reward == 0.0
assert blocked_result["ok"] is False
assert "terminal" in blocked_result["error"]
assert env.state.model_dump() == state_before
def test_final_reward_response_is_typed_and_clamped(monkeypatch):
env = LegacyCobolEnvironment()
env.reset(task_id="payroll_net_pay_001")
written = result_data(call_obs(env, "write_python_solution", code=GOOD_SOLUTION))
def out_of_bounds_components(hidden, fresh):
return {
"hidden_correctness": -0.25,
"fresh_correctness": 1.25,
"interface_contract": 2.0,
"type_and_layout_fidelity": -1.0,
"anti_hardcoding": 0.5,
"safety": 9.0,
}
monkeypatch.setattr(env, "_reward_components", out_of_bounds_components)
final = result_data(call_obs(env, "submit_final", draft_id=written["draft_id"]))
typed_final = FinalSubmissionResult.model_validate(final)
typed_components = RewardComponents.model_validate(final["components"])
component_values = typed_components.model_dump().values()
assert isinstance(typed_final.public_score, float)
assert all(isinstance(value, float) for value in component_values)
assert all(0.0 <= value <= 1.0 for value in component_values)
assert 0.0 <= typed_final.public_score <= 1.0
def test_fresh_timeout_counts_against_safety_component():
env = LegacyCobolEnvironment()
env.reset(task_id="payroll_net_pay_001")
case = CaseResult(case_id="case", passed=True, input_summary="case", actual="x", expected="x")
hidden = EvaluationResult(
syntax_ok=True,
safety_ok=True,
interface_ok=True,
timed_out=False,
passed=1,
total=1,
case_results=[case],
)
fresh = EvaluationResult(
syntax_ok=True,
safety_ok=True,
interface_ok=True,
timed_out=True,
passed=1,
total=1,
case_results=[case],
)
components = RewardComponents.model_validate(env._reward_components(hidden, fresh))
assert components.safety == 0.0
def test_async_max_step_guard_matches_sync_behavior():
async def scenario():
env = LegacyCobolEnvironment()
env.reset(task_id="payroll_net_pay_001")
for _ in range(MAX_STEPS):
obs = await call_obs_async(env, "read_cobol_file", filename="PAYROLL.cbl")
assert obs.done is False
blocked = await call_obs_async(env, "write_python_solution", code=GOOD_SOLUTION)
return env, blocked, result_data(blocked)
env, blocked, blocked_result = asyncio.run(scenario())
assert blocked.done is True
assert blocked.reward == 0.0
assert blocked_result["ok"] is False
assert "max_steps" in blocked_result["error"]
assert env.state.step_count == MAX_STEPS
assert env.state.draft_count == 0
def test_schema_exposes_project_typed_state_fields():
schema = TestClient(app).get("/schema").json()
state_properties = schema["state"]["properties"]
action_properties = schema["action"]["properties"]
observation_properties = schema["observation"]["properties"]
assert schema["action"]["title"] == "ToolActionWrapper"
assert schema["observation"]["title"] == "ToolObservationWrapper"
assert "tool_name" in action_properties
assert "result" in observation_properties
for field_name in [
"task_id",
"draft_count",
"visible_runs",
"final_score",
"reward_components",
]:
assert field_name in state_properties
def test_rest_reset_step_and_state_share_one_episode():
client = TestClient(app)
reset = client.post("/reset", json={"task_id": "invoice_occurs_001"})
assert reset.status_code == 200
assert reset.json()["observation"]["result"]["ticket"]["task_id"] == "invoice_occurs_001"
step = client.post(
"/step",
json={
"action": {
"tool_name": "read_cobol_file",
"arguments": {"filename": "INVTOTAL.cbl"},
}
},
)
assert step.status_code == 200
step_body = step.json()
assert step_body["observation"]["result"]["data"]["ok"] is True
assert step_body["reward"] == 0.02
state = client.get("/state")
assert state.status_code == 200
state_body = state.json()
assert state_body["task_id"] == "invoice_occurs_001"
assert state_body["files_read"] == ["INVTOTAL.cbl"]
assert state_body["last_tool"] == "read_cobol_file"
assert state_body["step_count"] == 1
def test_redundant_discovery_actions_do_not_repeat_progress_reward():
env = LegacyCobolEnvironment()
env.reset(task_id="payroll_net_pay_001")
first_read = call_obs(env, "read_cobol_file", filename="PAYROLL.cbl")
duplicate_read = call_obs(env, "read_cobol_file", filename="PAYROLL.cbl")
first_parse = call_obs(env, "parse_copybook_layout", filename="EMPLOYEE_PAY.cpy")
duplicate_parse = call_obs(env, "parse_copybook_layout", filename="EMPLOYEE_PAY.cpy")
first_rules = call_obs(env, "inspect_business_rules")
duplicate_rules = call_obs(env, "inspect_business_rules")
assert first_read.reward == 0.02
assert duplicate_read.reward == 0.0
assert first_parse.reward == 0.03
assert duplicate_parse.reward == 0.0
assert first_rules.reward == 0.01
assert duplicate_rules.reward == 0.0
def test_visible_test_reward_only_pays_for_new_progress():
env = LegacyCobolEnvironment()
env.reset(task_id="payroll_net_pay_001")
written = result_data(call_obs(env, "write_python_solution", code=GOOD_SOLUTION))
first_visible = call_obs(env, "run_visible_tests", draft_id=written["draft_id"])
repeated_visible = call_obs(env, "run_visible_tests", draft_id=written["draft_id"])
assert first_visible.reward == 0.1
assert repeated_visible.reward == 0.0
|