File size: 9,322 Bytes
2073b3f
 
 
 
 
 
 
 
 
 
 
 
 
e56d042
2073b3f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e56d042
2073b3f
 
 
 
 
 
 
0f8f2c1
 
2073b3f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0f8f2c1
 
 
2073b3f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eea2be5
2073b3f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eea2be5
2073b3f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0f8f2c1
 
2073b3f
0f8f2c1
2073b3f
0f8f2c1
 
 
2073b3f
 
 
 
 
 
 
 
 
 
 
 
 
0f8f2c1
 
 
 
 
 
 
 
 
 
2073b3f
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
"""Task grading engine — evaluates task completion and computes shaped rewards.

All rewards are in the [0.0, 1.0] range. Only full task completion yields 1.0.
Includes anti-reward-hacking defenses.
"""

from __future__ import annotations

import logging

from pydantic import BaseModel, Field

from models import SuccessCriteria, Task
from server.services.environment_strategy import EnvironmentStrategy
from server.services.episode_tracker import EpisodeTracker, StepRecord
from server.services.resource_verifier import ResourceVerifier

logger = logging.getLogger(__name__)


class GradeResult(BaseModel):
    """Outcome of grading a single step."""

    task_achieved: bool = False
    partial_progress: float = Field(default=0.0, ge=0.0, le=1.0)
    reward: float = Field(default=0.0, ge=0.0, le=1.0)
    reason: str = ""


class TaskGrader:
    """Evaluates task completion and computes shaped rewards.

    Dispatches to different grading strategies based on which fields
    are populated on the task's ``SuccessCriteria``.
    """

    def __init__(self, backend: EnvironmentStrategy) -> None:
        self._verifier = ResourceVerifier(backend)

    def grade(
        self,
        task: Task,
        tracker: EpisodeTracker,
        latest_step: StepRecord,
        chaos_occurred: bool = False,
        hints_used: int = 0,
    ) -> GradeResult:
        criteria = task.success_criteria

        # Dispatch based on populated criteria fields
        if criteria.state_checks:
            result = self._grade_state_checks(criteria, tracker)
        elif criteria.steps:
            result = self._grade_multi_step(criteria, tracker)
        elif criteria.resource_exists is not None:
            result = self._grade_resource_creation(criteria, latest_step)
        elif criteria.command_contains is not None:
            result = self._grade_command_match(criteria, latest_step)
        else:
            result = GradeResult(reason="no recognised success_criteria fields")

        # Compute shaped reward
        result.reward = self._compute_reward(
            result, latest_step, tracker, chaos_occurred, hints_used
        )

        # Update tracker's previous progress (monotonic — never decrease)
        if result.partial_progress > tracker.previous_progress:
            tracker.previous_progress = result.partial_progress

        return result

    # -- Grading strategies ---------------------------------------------------

    def _grade_command_match(
        self, criteria: SuccessCriteria, latest_step: StepRecord
    ) -> GradeResult:
        """Warmup: check the latest command matches expected service + operation."""
        cmd = latest_step.command.lower()
        contains = (criteria.command_contains or "").lower()
        operation = (criteria.operation or "").lower()

        contains_ok = contains != "" and contains in cmd
        operation_ok = operation != "" and operation in cmd
        succeeded = latest_step.success
        achieved = contains_ok and operation_ok and succeeded

        return GradeResult(
            task_achieved=achieved,
            partial_progress=1.0 if achieved else 0.0,
            reason=(
                f"command_match: contains={contains_ok}, "
                f"op={operation_ok}, success={succeeded}"
            ),
        )

    def _grade_resource_creation(
        self,
        criteria: SuccessCriteria,
        latest_step: StepRecord,
    ) -> GradeResult:
        """Beginner: verify the resource actually exists in MiniStack."""
        re_spec = criteria.resource_exists
        assert re_spec is not None
        service = re_spec.service
        name = re_spec.name

        exists = self._verifier.resource_exists(service, name)

        # Command matching gives partial credit (0.5)
        contains = (criteria.command_contains or "").lower()
        operation = (criteria.operation or "").lower()
        cmd = latest_step.command.lower()
        cmd_ok = contains in cmd and operation in cmd and latest_step.success

        if exists:
            progress = 1.0
        elif cmd_ok:
            progress = 0.5
        else:
            progress = 0.0

        return GradeResult(
            task_achieved=exists,
            partial_progress=progress,
            reason=(
                f"resource_creation: exists={exists}, "
                f"cmd_ok={cmd_ok}, service={service}, name={name}"
            ),
        )

    def _grade_multi_step(
        self, criteria: SuccessCriteria, tracker: EpisodeTracker
    ) -> GradeResult:
        """Intermediate/Advanced: check ordered step completion."""
        steps = criteria.steps
        if not steps:
            return GradeResult(reason="empty steps list")

        completed = 0
        for step in steps:
            if tracker.has_executed_operation(step.operation, step.resource):
                completed += 1
            else:
                break  # ordered — stop at first incomplete step

        total = len(steps)
        progress = completed / total if total > 0 else 0.0

        # For advanced tasks with services requirement, also check services
        services_required = criteria.services
        services_met = all(tracker.has_used_service(svc) for svc in services_required)

        achieved = completed == total and (not services_required or services_met)

        return GradeResult(
            task_achieved=achieved,
            partial_progress=progress,
            reason=(
                f"multi_step: {completed}/{total} steps, "
                f"services_met={services_met if services_required else 'n/a'}"
            ),
        )

    def _grade_state_checks(
        self, criteria: SuccessCriteria, tracker: EpisodeTracker
    ) -> GradeResult:
        """Expert/SRE: verify end-state via arbitrary commands.

        state_checks are the source of truth for task completion.
        steps (if present) provide partial progress signals only.
        """
        state_checks = criteria.state_checks
        steps = criteria.steps

        # Evaluate state checks (ground truth)
        checks_passed = 0
        for check in state_checks:
            check_dict = check.model_dump(exclude_none=True)
            if self._verifier.check_state(check_dict):
                checks_passed += 1

        total_checks = len(state_checks)
        all_checks_pass = checks_passed == total_checks and total_checks > 0

        # Evaluate steps for partial progress signal
        steps_completed = 0
        for step in steps:
            if tracker.has_executed_operation(step.operation, step.resource):
                steps_completed += 1
            else:
                break

        # Progress combines steps (for dense signal) and state checks
        total_steps = len(steps)
        if total_steps > 0:
            step_progress = steps_completed / total_steps
        else:
            step_progress = 0.0

        # Weight: steps give up to 0.7, state checks give the remaining 0.3
        if total_checks > 0:
            check_progress = checks_passed / total_checks
            progress = step_progress * 0.7 + check_progress * 0.3
        else:
            progress = step_progress

        # Check services requirement
        services_required = criteria.services
        services_met = all(tracker.has_used_service(svc) for svc in services_required)

        # Task achieved only when ALL state checks pass
        achieved = all_checks_pass and (not services_required or services_met)

        return GradeResult(
            task_achieved=achieved,
            partial_progress=min(progress, 1.0),
            reason=(
                f"state_checks: {checks_passed}/{total_checks} passed, "
                f"steps: {steps_completed}/{total_steps}, "
                f"services_met={services_met if services_required else 'n/a'}"
            ),
        )

    # -- Reward shaping -------------------------------------------------------

    def _compute_reward(
        self,
        result: GradeResult,
        latest_step: StepRecord,
        tracker: EpisodeTracker,
        chaos_occurred: bool = False,
        hints_used: int = 0,
    ) -> float:
        """Compute a shaped reward in [0.0, 1.05]."""
        if result.task_achieved:
            base = 1.05 if chaos_occurred else 1.0
            # Hint decay: 0.85^hints_used
            return base * (0.85**hints_used)

        # Base: partial progress scaled to 0.0–0.8 range
        progress_reward = result.partial_progress * 0.8

        # Bonus for advancing progress (dense signal)
        progress_delta = result.partial_progress - tracker.previous_progress
        if progress_delta > 0:
            progress_reward += 0.1

        # Penalty for failed commands
        if not latest_step.success:
            progress_reward *= 0.5

        # Rollback penalty: wasteful create→delete pairs
        progress_reward -= 0.1 * tracker.detect_rollbacks()

        # Idempotency bonus: graceful "already exists" handling
        progress_reward += 0.02 * tracker.detect_idempotent_retries()

        # Hint decay: 0.85^hints_used
        if hints_used > 0:
            progress_reward *= 0.85**hints_used

        # Clamp to [0.0, 0.99] — never reach 1.0 without achieving
        return min(max(progress_reward, 0.0), 0.99)