File size: 8,137 Bytes
a537615
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import asyncio

from fastapi.testclient import TestClient
from openenv.core.env_server.mcp_types import CallToolAction

from legacy_cobol_env.models import FinalSubmissionResult, RewardComponents
from legacy_cobol_env.server.app import app
from legacy_cobol_env.server.legacy_cobol_env_environment import (
    MAX_STEPS,
    LegacyCobolEnvironment,
)
from legacy_cobol_env.server.sandbox import CaseResult, EvaluationResult
from legacy_cobol_env.tests.test_environment import GOOD_SOLUTION


def call_obs(env: LegacyCobolEnvironment, tool_name: str, **arguments):
    return env.step(CallToolAction(tool_name=tool_name, arguments=arguments))


async def call_obs_async(env: LegacyCobolEnvironment, tool_name: str, **arguments):
    return await env.step_async(CallToolAction(tool_name=tool_name, arguments=arguments))


def result_data(observation):
    if hasattr(observation.result, "data"):
        return observation.result.data
    return observation.result


def test_thirteenth_action_exceeds_max_steps_without_executing_tool():
    env = LegacyCobolEnvironment()
    env.reset(task_id="payroll_net_pay_001")

    for _ in range(MAX_STEPS):
        obs = call_obs(env, "read_cobol_file", filename="PAYROLL.cbl")
        assert obs.done is False

    blocked = call_obs(env, "write_python_solution", code=GOOD_SOLUTION)
    blocked_result = result_data(blocked)

    assert blocked.done is True
    assert blocked.reward == 0.0
    assert blocked_result["ok"] is False
    assert "max_steps" in blocked_result["error"]
    assert env.state.done is True
    assert env.state.step_count == MAX_STEPS
    assert env.state.draft_count == 0
    assert env.state.last_tool != "write_python_solution"


def test_post_done_steps_are_terminal_noops_without_state_mutation():
    env = LegacyCobolEnvironment()
    env.reset(task_id="payroll_net_pay_001")

    written = result_data(call_obs(env, "write_python_solution", code=GOOD_SOLUTION))
    final_obs = call_obs(env, "submit_final", draft_id=written["draft_id"])
    assert final_obs.done is True

    state_before = env.state.model_dump()
    blocked = call_obs(env, "write_python_solution", code="def migrate(input_record):\n    return ''\n")
    blocked_result = result_data(blocked)

    assert blocked.done is True
    assert blocked.reward == 0.0
    assert blocked_result["ok"] is False
    assert "terminal" in blocked_result["error"]
    assert env.state.model_dump() == state_before


def test_final_reward_response_is_typed_and_clamped(monkeypatch):
    env = LegacyCobolEnvironment()
    env.reset(task_id="payroll_net_pay_001")
    written = result_data(call_obs(env, "write_python_solution", code=GOOD_SOLUTION))

    def out_of_bounds_components(hidden, fresh):
        return {
            "hidden_correctness": -0.25,
            "fresh_correctness": 1.25,
            "interface_contract": 2.0,
            "type_and_layout_fidelity": -1.0,
            "anti_hardcoding": 0.5,
            "safety": 9.0,
        }

    monkeypatch.setattr(env, "_reward_components", out_of_bounds_components)

    final = result_data(call_obs(env, "submit_final", draft_id=written["draft_id"]))

    typed_final = FinalSubmissionResult.model_validate(final)
    typed_components = RewardComponents.model_validate(final["components"])
    component_values = typed_components.model_dump().values()

    assert isinstance(typed_final.public_score, float)
    assert all(isinstance(value, float) for value in component_values)
    assert all(0.0 <= value <= 1.0 for value in component_values)
    assert 0.0 <= typed_final.public_score <= 1.0


def test_fresh_timeout_counts_against_safety_component():
    env = LegacyCobolEnvironment()
    env.reset(task_id="payroll_net_pay_001")
    case = CaseResult(case_id="case", passed=True, input_summary="case", actual="x", expected="x")
    hidden = EvaluationResult(
        syntax_ok=True,
        safety_ok=True,
        interface_ok=True,
        timed_out=False,
        passed=1,
        total=1,
        case_results=[case],
    )
    fresh = EvaluationResult(
        syntax_ok=True,
        safety_ok=True,
        interface_ok=True,
        timed_out=True,
        passed=1,
        total=1,
        case_results=[case],
    )

    components = RewardComponents.model_validate(env._reward_components(hidden, fresh))

    assert components.safety == 0.0


def test_async_max_step_guard_matches_sync_behavior():
    async def scenario():
        env = LegacyCobolEnvironment()
        env.reset(task_id="payroll_net_pay_001")

        for _ in range(MAX_STEPS):
            obs = await call_obs_async(env, "read_cobol_file", filename="PAYROLL.cbl")
            assert obs.done is False

        blocked = await call_obs_async(env, "write_python_solution", code=GOOD_SOLUTION)
        return env, blocked, result_data(blocked)

    env, blocked, blocked_result = asyncio.run(scenario())

    assert blocked.done is True
    assert blocked.reward == 0.0
    assert blocked_result["ok"] is False
    assert "max_steps" in blocked_result["error"]
    assert env.state.step_count == MAX_STEPS
    assert env.state.draft_count == 0


def test_schema_exposes_project_typed_state_fields():
    schema = TestClient(app).get("/schema").json()

    state_properties = schema["state"]["properties"]
    action_properties = schema["action"]["properties"]
    observation_properties = schema["observation"]["properties"]

    assert schema["action"]["title"] == "ToolActionWrapper"
    assert schema["observation"]["title"] == "ToolObservationWrapper"
    assert "tool_name" in action_properties
    assert "result" in observation_properties
    for field_name in [
        "task_id",
        "draft_count",
        "visible_runs",
        "final_score",
        "reward_components",
    ]:
        assert field_name in state_properties


def test_rest_reset_step_and_state_share_one_episode():
    client = TestClient(app)

    reset = client.post("/reset", json={"task_id": "invoice_occurs_001"})
    assert reset.status_code == 200
    assert reset.json()["observation"]["result"]["ticket"]["task_id"] == "invoice_occurs_001"

    step = client.post(
        "/step",
        json={
            "action": {
                "tool_name": "read_cobol_file",
                "arguments": {"filename": "INVTOTAL.cbl"},
            }
        },
    )
    assert step.status_code == 200
    step_body = step.json()
    assert step_body["observation"]["result"]["data"]["ok"] is True
    assert step_body["reward"] == 0.02

    state = client.get("/state")
    assert state.status_code == 200
    state_body = state.json()
    assert state_body["task_id"] == "invoice_occurs_001"
    assert state_body["files_read"] == ["INVTOTAL.cbl"]
    assert state_body["last_tool"] == "read_cobol_file"
    assert state_body["step_count"] == 1


def test_redundant_discovery_actions_do_not_repeat_progress_reward():
    env = LegacyCobolEnvironment()
    env.reset(task_id="payroll_net_pay_001")

    first_read = call_obs(env, "read_cobol_file", filename="PAYROLL.cbl")
    duplicate_read = call_obs(env, "read_cobol_file", filename="PAYROLL.cbl")
    first_parse = call_obs(env, "parse_copybook_layout", filename="EMPLOYEE_PAY.cpy")
    duplicate_parse = call_obs(env, "parse_copybook_layout", filename="EMPLOYEE_PAY.cpy")
    first_rules = call_obs(env, "inspect_business_rules")
    duplicate_rules = call_obs(env, "inspect_business_rules")

    assert first_read.reward == 0.02
    assert duplicate_read.reward == 0.0
    assert first_parse.reward == 0.03
    assert duplicate_parse.reward == 0.0
    assert first_rules.reward == 0.01
    assert duplicate_rules.reward == 0.0


def test_visible_test_reward_only_pays_for_new_progress():
    env = LegacyCobolEnvironment()
    env.reset(task_id="payroll_net_pay_001")

    written = result_data(call_obs(env, "write_python_solution", code=GOOD_SOLUTION))
    first_visible = call_obs(env, "run_visible_tests", draft_id=written["draft_id"])
    repeated_visible = call_obs(env, "run_visible_tests", draft_id=written["draft_id"])

    assert first_visible.reward == 0.1
    assert repeated_visible.reward == 0.0