Spaces:
Sleeping
Sleeping
| """Core OpenEnv environment for Python code review and repair tasks.""" | |
| from __future__ import annotations | |
| from typing import List, Optional | |
| from uuid import uuid4 | |
| from openenv.core.env_server.interfaces import Environment | |
| from graders import grade_task | |
| from models import ( | |
| HealthResponse, | |
| HistoryEntry, | |
| PythonCodeReviewAction, | |
| PythonCodeReviewObservation, | |
| PythonCodeReviewState, | |
| RewardDetails, | |
| TaskGrade, | |
| ) | |
| from tasks import TaskSpec, get_task, list_task_descriptors, list_task_summaries, task_ids | |
| # Reward shaping constants | |
| INVALID_ACTION_PENALTY = 0.1 | |
| QUALITY_BONUS_SCALE = 0.15 | |
| ANALYZE_FAILURE_PENALTY = 0.05 | |
| RUN_FAILURE_PENALTY = 0.05 | |
| TIMEOUT_PENALTY = 0.1 | |
| SUBMIT_BASE_SCALE = 0.1 | |
| class PythonCodeReviewEnvironment( | |
| Environment[PythonCodeReviewAction, PythonCodeReviewObservation, PythonCodeReviewState] | |
| ): | |
| """Production-style environment for reviewing and fixing Python code.""" | |
| SUPPORTS_CONCURRENT_SESSIONS = True | |
| def __init__(self) -> None: | |
| super().__init__() | |
| self._task_order = list(task_ids()) | |
| self._task_cursor = -1 | |
| self._task: Optional[TaskSpec] = None | |
| self._state = PythonCodeReviewState(episode_id=str(uuid4())) | |
| self._done = False | |
| self._last_status = "Call reset() to start." | |
| self._last_reward = RewardDetails(value=0.0, reason="Environment initialized.") | |
| self._best_visible_test_fraction = 0.0 | |
| self._best_quality_score = 0.0 | |
| self._full_correctness_awarded = False | |
| self._syntax_reward_awarded = False | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| task_id: Optional[str] = None, | |
| **_: object, | |
| ) -> PythonCodeReviewObservation: | |
| """Reset the environment to the next deterministic task.""" | |
| del seed | |
| # Select task | |
| if task_id: | |
| self._task = get_task(task_id) | |
| self._task_cursor = self._task_order.index(task_id) | |
| else: | |
| self._task_cursor = (self._task_cursor + 1) % len(self._task_order) | |
| self._task = get_task(self._task_order[self._task_cursor]) | |
| # Reset episode state | |
| self._done = False | |
| self._best_visible_test_fraction = 0.0 | |
| self._best_quality_score = 0.0 | |
| self._full_correctness_awarded = False | |
| self._syntax_reward_awarded = False | |
| self._last_status = "Inspect the code, edit it, run tests, then submit." | |
| self._last_reward = RewardDetails(value=0.0, reason="Episode reset.") | |
| self._state = PythonCodeReviewState( | |
| episode_id=episode_id or str(uuid4()), | |
| step_count=0, | |
| task_id=self._task.task_id, | |
| difficulty=self._task.difficulty, | |
| task_kind=self._task.task_kind, | |
| attempts_remaining=self._task.max_steps, | |
| current_code=self._task.starter_code, | |
| errors="", | |
| test_results="Not run yet.", | |
| history=[], | |
| score=0.0, | |
| done=False, | |
| ) | |
| return self._build_observation() | |
| def step( | |
| self, | |
| action: PythonCodeReviewAction, | |
| timeout_s: Optional[float] = None, | |
| **_: object, | |
| ) -> PythonCodeReviewObservation: | |
| """Apply one structured action.""" | |
| del timeout_s | |
| if self._task is None: | |
| return self.reset() | |
| if self._done: | |
| self._last_reward = RewardDetails( | |
| value=-INVALID_ACTION_PENALTY, | |
| invalid_action_penalty=INVALID_ACTION_PENALTY, | |
| reason="Episode already completed.", | |
| ) | |
| self._last_status = "Episode already completed. Call reset() to continue." | |
| return self._build_observation() | |
| self._state.step_count += 1 | |
| status = "" | |
| reward = RewardDetails(value=0.0, reason="Action processed.") | |
| # Dispatch to handler based on action type | |
| if action.action_type == "analyze_code": | |
| reward, status = self._handle_analyze() | |
| elif action.action_type == "edit_code": | |
| reward, status = self._handle_edit(action) | |
| elif action.action_type == "run_tests": | |
| reward, status = self._handle_run_tests() | |
| elif action.action_type == "submit_solution": | |
| reward, status = self._handle_submit() | |
| else: | |
| reward = RewardDetails( | |
| value=-INVALID_ACTION_PENALTY, | |
| invalid_action_penalty=INVALID_ACTION_PENALTY, | |
| reason=f"Unsupported action_type: {action.action_type}", | |
| ) | |
| status = f"Invalid action: unsupported action_type '{action.action_type}'." | |
| self._last_reward = reward | |
| self._last_status = status | |
| self._state.attempts_remaining = max(self._task.max_steps - self._state.step_count, 0) | |
| self._state.done = self._done | |
| # Auto-submit if steps exhausted | |
| if self._state.attempts_remaining == 0 and not self._done: | |
| self._finalize_episode(auto_submit=True) | |
| self._state.done = True | |
| return self._build_observation() | |
| def state(self) -> PythonCodeReviewState: | |
| """Return the current environment state.""" | |
| return self._state.model_copy(deep=True) | |
| def list_task_summaries(self) -> List[object]: | |
| """Return public task metadata.""" | |
| return list_task_summaries() | |
| def list_tasks(self) -> List[object]: | |
| """Return all public task descriptors.""" | |
| return list_task_descriptors() | |
| def get_task(self, task_id: str) -> object: | |
| """Return a single task descriptor.""" | |
| return get_task(task_id).to_descriptor() | |
| def health(self) -> HealthResponse: | |
| """Return a simple health model.""" | |
| return HealthResponse(task_count=len(self._task_order)) | |
| def grade_task_submission(self, task_id: str, code: str) -> TaskGrade: | |
| """Expose deterministic grading outside of an active episode.""" | |
| return grade_task(code, get_task(task_id), include_hidden=True) | |
| def _build_observation(self) -> PythonCodeReviewObservation: | |
| """Build current observation from state.""" | |
| return PythonCodeReviewObservation( | |
| task_id=self._state.task_id or "", | |
| title=self._task.title if self._task else "", | |
| difficulty=self._state.difficulty or "easy", | |
| task_kind=self._state.task_kind, | |
| task_description=self._task.task_description if self._task else "", | |
| current_code=self._state.current_code, | |
| errors=self._state.errors, | |
| test_results=self._state.test_results, | |
| visible_tests=self._task.visible_tests if self._task else [], | |
| history=self._state.history, | |
| attempts_remaining=self._state.attempts_remaining, | |
| last_action_status=self._last_status, | |
| score=self._state.score, | |
| reward=self._last_reward.value, | |
| reward_details=self._last_reward, | |
| done=self._done, | |
| metadata={ | |
| "episode_id": self._state.episode_id, | |
| "step_count": self._state.step_count, | |
| "task_kind": self._state.task_kind, | |
| }, | |
| ) | |
| def _handle_analyze(self) -> tuple[RewardDetails, str]: | |
| """Analyze code for errors and test status.""" | |
| if self._task is None: | |
| return RewardDetails(value=0.0, reason="Invalid state"), "Error: task not loaded" | |
| grade = grade_task(self._state.current_code, self._task, include_hidden=False) | |
| error = grade.details.get("compile_error", "") | |
| if error: | |
| self._state.errors = error | |
| self._state.test_results = "Compilation failed. Fix syntax first." | |
| summary = f"Syntax error detected: {error}" | |
| else: | |
| self._state.errors = "" | |
| if self._task.task_kind == "syntax_fix": | |
| self._state.test_results = "Code compiles successfully." | |
| summary = "Code compiles. Ready to submit." | |
| else: | |
| visible_total = len(self._task.visible_tests) | |
| visible_passed = grade.tests_passed | |
| self._state.test_results = f"Test run: {visible_passed}/{visible_total} passing." | |
| summary = self._state.test_results | |
| reward_value = round((grade.score * 0.05) - self._grade_penalty(grade, failure_penalty=ANALYZE_FAILURE_PENALTY), 6) | |
| reward = RewardDetails(value=reward_value, reason=summary) | |
| self._append_history("analyze_code", summary, reward.value) | |
| self._sync_score(include_hidden=False) | |
| return reward, summary | |
| def _handle_edit(self, action: PythonCodeReviewAction) -> tuple[RewardDetails, str]: | |
| """Edit the code and compute reward for progress.""" | |
| if self._task is None: | |
| return RewardDetails(value=0.0, reason="Invalid state"), "Error: task not loaded" | |
| code = (action.code or "").strip() | |
| if not code: | |
| reward = RewardDetails( | |
| value=-INVALID_ACTION_PENALTY, | |
| invalid_action_penalty=INVALID_ACTION_PENALTY, | |
| reason="Edit action requires non-empty code.", | |
| ) | |
| status = "Invalid: edit_code requires code parameter." | |
| self._append_history("edit_code", status, reward.value) | |
| return reward, status | |
| # Grade before and after | |
| previous_grade = grade_task(self._state.current_code, self._task, include_hidden=False) | |
| new_grade = grade_task(code, self._task, include_hidden=False) | |
| self._state.current_code = code | |
| # Update state | |
| self._state.errors = new_grade.details.get("compile_error", "") | |
| self._state.test_results = self._format_test_results(new_grade) | |
| # Compute reward with shaping | |
| syntax_reward = 0.0 | |
| if previous_grade.syntax_score < 1.0 and new_grade.syntax_score == 1.0: | |
| syntax_reward = 0.2 | |
| self._syntax_reward_awarded = True | |
| quality_delta = new_grade.quality_score - previous_grade.quality_score | |
| quality_bonus = max(min(quality_delta * QUALITY_BONUS_SCALE, 0.1), -0.1) | |
| if new_grade.quality_score > self._best_quality_score: | |
| self._best_quality_score = new_grade.quality_score | |
| progress_reward = 0.2 * (new_grade.score - previous_grade.score) | |
| if new_grade.tests_total > 0: | |
| current_test_fraction = new_grade.tests_passed / new_grade.tests_total | |
| self._best_visible_test_fraction = max(self._best_visible_test_fraction, current_test_fraction) | |
| penalty = self._grade_penalty(new_grade) | |
| reward_value = round(progress_reward + syntax_reward + quality_bonus - penalty, 6) | |
| status = "Code updated." | |
| if self._state.errors: | |
| status = f"Code updated with syntax issues: {self._state.errors}" | |
| elif new_grade.tests_total > 0: | |
| status = self._state.test_results | |
| reward = RewardDetails( | |
| value=reward_value, | |
| syntax_reward=syntax_reward, | |
| quality_bonus=round(quality_bonus, 6), | |
| test_reward=round(progress_reward, 6), | |
| timeout_penalty=TIMEOUT_PENALTY if new_grade.timed_out else 0.0, | |
| reason=status, | |
| ) | |
| self._append_history("edit_code", status, reward_value) | |
| self._sync_score(include_hidden=False) | |
| return reward, status | |
| def _handle_run_tests(self) -> tuple[RewardDetails, str]: | |
| """Run tests and provide feedback.""" | |
| if self._task is None: | |
| return RewardDetails(value=0.0, reason="Invalid state"), "Error: task not loaded" | |
| grade = grade_task(self._state.current_code, self._task, include_hidden=False) | |
| self._state.errors = grade.details.get("compile_error", "") | |
| self._state.test_results = self._format_test_results(grade) | |
| previous_score = self._state.score | |
| progress_reward = 0.2 * (grade.score - previous_score) | |
| completion_bonus = 0.05 if grade.tests_total > 0 and grade.tests_passed == grade.tests_total else 0.0 | |
| penalty = self._grade_penalty(grade, failure_penalty=RUN_FAILURE_PENALTY) | |
| reward_value = round(progress_reward + completion_bonus - penalty, 6) | |
| if grade.tests_total > 0: | |
| current_fraction = grade.tests_passed / grade.tests_total | |
| self._best_visible_test_fraction = max(self._best_visible_test_fraction, current_fraction) | |
| status = self._state.test_results if not self._state.errors else self._state.errors | |
| reward = RewardDetails( | |
| value=reward_value, | |
| test_reward=round(progress_reward + completion_bonus, 6), | |
| timeout_penalty=TIMEOUT_PENALTY if grade.timed_out else 0.0, | |
| reason=status, | |
| ) | |
| self._append_history("run_tests", status, reward.value) | |
| self._sync_score(include_hidden=False) | |
| return reward, status | |
| def _handle_submit(self) -> tuple[RewardDetails, str]: | |
| """Submit solution and finalize episode.""" | |
| if self._task is None: | |
| return RewardDetails(value=0.0, reason="Invalid state"), "Error: task not loaded" | |
| grade = grade_task(self._state.current_code, self._task, include_hidden=True) | |
| self._state.errors = grade.details.get("compile_error", "") | |
| self._state.test_results = self._format_test_results(grade) | |
| # Compute final reward bonuses | |
| correctness_bonus = 0.0 | |
| if grade.score >= 0.999999 and not self._full_correctness_awarded: | |
| correctness_bonus = 0.5 | |
| self._full_correctness_awarded = True | |
| penalty = self._grade_penalty(grade, failure_penalty=RUN_FAILURE_PENALTY) | |
| reward_value = round((grade.score * SUBMIT_BASE_SCALE) + correctness_bonus - penalty, 6) | |
| self._finalize_episode(auto_submit=False, grade=grade) | |
| status = f"Solution submitted. Final score: {grade.score:.3f}" | |
| reward = RewardDetails( | |
| value=reward_value, | |
| correctness_bonus=correctness_bonus, | |
| timeout_penalty=TIMEOUT_PENALTY if grade.timed_out else 0.0, | |
| reason=status, | |
| ) | |
| self._append_history("submit_solution", status, reward_value) | |
| return reward, status | |
| def _finalize_episode(self, auto_submit: bool, grade: Optional[TaskGrade] = None) -> None: | |
| """Mark episode as done and set final score.""" | |
| if grade is None: | |
| if self._task is None: | |
| return | |
| grade = grade_task(self._state.current_code, self._task, include_hidden=True) | |
| self._state.errors = grade.details.get("compile_error", "") | |
| self._state.test_results = self._format_test_results(grade) | |
| self._state.score = grade.score | |
| self._done = True | |
| self._state.done = True | |
| if auto_submit: | |
| self._last_status = f"Step budget exhausted. Final score: {grade.score:.3f}" | |
| def _sync_score(self, include_hidden: bool) -> None: | |
| """Update visible score based on current code.""" | |
| if self._task is None: | |
| return | |
| grade = grade_task(self._state.current_code, self._task, include_hidden=include_hidden) | |
| # For visible runs, use a soft score; for hidden, it will be finalized on submit | |
| if not include_hidden: | |
| self._state.score = grade.score | |
| def _format_test_results(self, grade: TaskGrade) -> str: | |
| """Format test results for display.""" | |
| if grade.tests_total == 0: | |
| return "No tests available." | |
| if grade.timed_out: | |
| return "Test execution timed out." | |
| return f"Tests: {grade.tests_passed}/{grade.tests_total} passing" | |
| def _append_history(self, action_type: str, status: str, reward: float) -> None: | |
| """Append action to history.""" | |
| entry = HistoryEntry( | |
| step=self._state.step_count, | |
| action_type=action_type, | |
| status=status, | |
| reward=reward, | |
| ) | |
| self._state.history.append(entry) | |
| def _grade_penalty(self, grade: TaskGrade, failure_penalty: float = RUN_FAILURE_PENALTY) -> float: | |
| """Return a negative signal when the action leads to an obviously bad result.""" | |
| penalty = 0.0 | |
| if grade.details.get("compile_error"): | |
| penalty += failure_penalty + grade.score | |
| if grade.timed_out: | |
| penalty += TIMEOUT_PENALTY | |
| if grade.tests_total > 0 and grade.tests_passed == 0: | |
| penalty += failure_penalty | |
| return round(penalty, 6) | |
| # Backwards-compatible aliases used elsewhere in the repo. | |
| PythonEnvironment = PythonCodeReviewEnvironment | |
| CodeReviewEnvironment = PythonCodeReviewEnvironment | |