Spaces:
Running
Running
| """Task grading engine — evaluates task completion and computes shaped rewards. | |
| All rewards are in the [0.0, 1.0] range. Only full task completion yields 1.0. | |
| Includes anti-reward-hacking defenses. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from pydantic import BaseModel, Field | |
| from models import SuccessCriteria, Task | |
| from server.services.environment_strategy import EnvironmentStrategy | |
| from server.services.episode_tracker import EpisodeTracker, StepRecord | |
| from server.services.resource_verifier import ResourceVerifier | |
| logger = logging.getLogger(__name__) | |
| class GradeResult(BaseModel): | |
| """Outcome of grading a single step.""" | |
| task_achieved: bool = False | |
| partial_progress: float = Field(default=0.0, ge=0.0, le=1.0) | |
| reward: float = Field(default=0.0, ge=0.0, le=1.0) | |
| reason: str = "" | |
| class TaskGrader: | |
| """Evaluates task completion and computes shaped rewards. | |
| Dispatches to different grading strategies based on which fields | |
| are populated on the task's ``SuccessCriteria``. | |
| """ | |
| def __init__(self, backend: EnvironmentStrategy) -> None: | |
| self._verifier = ResourceVerifier(backend) | |
| def grade( | |
| self, | |
| task: Task, | |
| tracker: EpisodeTracker, | |
| latest_step: StepRecord, | |
| chaos_occurred: bool = False, | |
| hints_used: int = 0, | |
| ) -> GradeResult: | |
| criteria = task.success_criteria | |
| # Dispatch based on populated criteria fields | |
| if criteria.state_checks: | |
| result = self._grade_state_checks(criteria, tracker) | |
| elif criteria.steps: | |
| result = self._grade_multi_step(criteria, tracker) | |
| elif criteria.resource_exists is not None: | |
| result = self._grade_resource_creation(criteria, latest_step) | |
| elif criteria.command_contains is not None: | |
| result = self._grade_command_match(criteria, latest_step) | |
| else: | |
| result = GradeResult(reason="no recognised success_criteria fields") | |
| # Compute shaped reward | |
| result.reward = self._compute_reward( | |
| result, latest_step, tracker, chaos_occurred, hints_used | |
| ) | |
| # Update tracker's previous progress (monotonic — never decrease) | |
| if result.partial_progress > tracker.previous_progress: | |
| tracker.previous_progress = result.partial_progress | |
| return result | |
| # -- Grading strategies --------------------------------------------------- | |
| def _grade_command_match( | |
| self, criteria: SuccessCriteria, latest_step: StepRecord | |
| ) -> GradeResult: | |
| """Warmup: check the latest command matches expected service + operation.""" | |
| cmd = latest_step.command.lower() | |
| contains = (criteria.command_contains or "").lower() | |
| operation = (criteria.operation or "").lower() | |
| contains_ok = contains != "" and contains in cmd | |
| operation_ok = operation != "" and operation in cmd | |
| succeeded = latest_step.success | |
| achieved = contains_ok and operation_ok and succeeded | |
| return GradeResult( | |
| task_achieved=achieved, | |
| partial_progress=1.0 if achieved else 0.0, | |
| reason=( | |
| f"command_match: contains={contains_ok}, " | |
| f"op={operation_ok}, success={succeeded}" | |
| ), | |
| ) | |
| def _grade_resource_creation( | |
| self, | |
| criteria: SuccessCriteria, | |
| latest_step: StepRecord, | |
| ) -> GradeResult: | |
| """Beginner: verify the resource actually exists in MiniStack.""" | |
| re_spec = criteria.resource_exists | |
| assert re_spec is not None | |
| service = re_spec.service | |
| name = re_spec.name | |
| exists = self._verifier.resource_exists(service, name) | |
| # Command matching gives partial credit (0.5) | |
| contains = (criteria.command_contains or "").lower() | |
| operation = (criteria.operation or "").lower() | |
| cmd = latest_step.command.lower() | |
| cmd_ok = contains in cmd and operation in cmd and latest_step.success | |
| if exists: | |
| progress = 1.0 | |
| elif cmd_ok: | |
| progress = 0.5 | |
| else: | |
| progress = 0.0 | |
| return GradeResult( | |
| task_achieved=exists, | |
| partial_progress=progress, | |
| reason=( | |
| f"resource_creation: exists={exists}, " | |
| f"cmd_ok={cmd_ok}, service={service}, name={name}" | |
| ), | |
| ) | |
| def _grade_multi_step( | |
| self, criteria: SuccessCriteria, tracker: EpisodeTracker | |
| ) -> GradeResult: | |
| """Intermediate/Advanced: check ordered step completion.""" | |
| steps = criteria.steps | |
| if not steps: | |
| return GradeResult(reason="empty steps list") | |
| completed = 0 | |
| for step in steps: | |
| if tracker.has_executed_operation(step.operation, step.resource): | |
| completed += 1 | |
| else: | |
| break # ordered — stop at first incomplete step | |
| total = len(steps) | |
| progress = completed / total if total > 0 else 0.0 | |
| # For advanced tasks with services requirement, also check services | |
| services_required = criteria.services | |
| services_met = all(tracker.has_used_service(svc) for svc in services_required) | |
| achieved = completed == total and (not services_required or services_met) | |
| return GradeResult( | |
| task_achieved=achieved, | |
| partial_progress=progress, | |
| reason=( | |
| f"multi_step: {completed}/{total} steps, " | |
| f"services_met={services_met if services_required else 'n/a'}" | |
| ), | |
| ) | |
| def _grade_state_checks( | |
| self, criteria: SuccessCriteria, tracker: EpisodeTracker | |
| ) -> GradeResult: | |
| """Expert/SRE: verify end-state via arbitrary commands. | |
| state_checks are the source of truth for task completion. | |
| steps (if present) provide partial progress signals only. | |
| """ | |
| state_checks = criteria.state_checks | |
| steps = criteria.steps | |
| # Evaluate state checks (ground truth) | |
| checks_passed = 0 | |
| for check in state_checks: | |
| check_dict = check.model_dump(exclude_none=True) | |
| if self._verifier.check_state(check_dict): | |
| checks_passed += 1 | |
| total_checks = len(state_checks) | |
| all_checks_pass = checks_passed == total_checks and total_checks > 0 | |
| # Evaluate steps for partial progress signal | |
| steps_completed = 0 | |
| for step in steps: | |
| if tracker.has_executed_operation(step.operation, step.resource): | |
| steps_completed += 1 | |
| else: | |
| break | |
| # Progress combines steps (for dense signal) and state checks | |
| total_steps = len(steps) | |
| if total_steps > 0: | |
| step_progress = steps_completed / total_steps | |
| else: | |
| step_progress = 0.0 | |
| # Weight: steps give up to 0.7, state checks give the remaining 0.3 | |
| if total_checks > 0: | |
| check_progress = checks_passed / total_checks | |
| progress = step_progress * 0.7 + check_progress * 0.3 | |
| else: | |
| progress = step_progress | |
| # Check services requirement | |
| services_required = criteria.services | |
| services_met = all(tracker.has_used_service(svc) for svc in services_required) | |
| # Task achieved only when ALL state checks pass | |
| achieved = all_checks_pass and (not services_required or services_met) | |
| return GradeResult( | |
| task_achieved=achieved, | |
| partial_progress=min(progress, 1.0), | |
| reason=( | |
| f"state_checks: {checks_passed}/{total_checks} passed, " | |
| f"steps: {steps_completed}/{total_steps}, " | |
| f"services_met={services_met if services_required else 'n/a'}" | |
| ), | |
| ) | |
| # -- Reward shaping ------------------------------------------------------- | |
| def _compute_reward( | |
| self, | |
| result: GradeResult, | |
| latest_step: StepRecord, | |
| tracker: EpisodeTracker, | |
| chaos_occurred: bool = False, | |
| hints_used: int = 0, | |
| ) -> float: | |
| """Compute a shaped reward in [0.0, 1.05].""" | |
| if result.task_achieved: | |
| base = 1.05 if chaos_occurred else 1.0 | |
| # Hint decay: 0.85^hints_used | |
| return base * (0.85**hints_used) | |
| # Base: partial progress scaled to 0.0–0.8 range | |
| progress_reward = result.partial_progress * 0.8 | |
| # Bonus for advancing progress (dense signal) | |
| progress_delta = result.partial_progress - tracker.previous_progress | |
| if progress_delta > 0: | |
| progress_reward += 0.1 | |
| # Penalty for failed commands | |
| if not latest_step.success: | |
| progress_reward *= 0.5 | |
| # Rollback penalty: wasteful create→delete pairs | |
| progress_reward -= 0.1 * tracker.detect_rollbacks() | |
| # Idempotency bonus: graceful "already exists" handling | |
| progress_reward += 0.02 * tracker.detect_idempotent_retries() | |
| # Hint decay: 0.85^hints_used | |
| if hints_used > 0: | |
| progress_reward *= 0.85**hints_used | |
| # Clamp to [0.0, 0.99] — never reach 1.0 without achieving | |
| return min(max(progress_reward, 0.0), 0.99) | |