"""Core OpenEnv environment for Python code review and repair tasks.""" from __future__ import annotations from typing import List, Optional from uuid import uuid4 from openenv.core.env_server.interfaces import Environment from graders import grade_task from models import ( HealthResponse, HistoryEntry, PythonCodeReviewAction, PythonCodeReviewObservation, PythonCodeReviewState, RewardDetails, TaskGrade, ) from tasks import TaskSpec, get_task, list_task_descriptors, list_task_summaries, task_ids # Reward shaping constants INVALID_ACTION_PENALTY = 0.1 QUALITY_BONUS_SCALE = 0.15 ANALYZE_FAILURE_PENALTY = 0.05 RUN_FAILURE_PENALTY = 0.05 TIMEOUT_PENALTY = 0.1 SUBMIT_BASE_SCALE = 0.1 class PythonCodeReviewEnvironment( Environment[PythonCodeReviewAction, PythonCodeReviewObservation, PythonCodeReviewState] ): """Production-style environment for reviewing and fixing Python code.""" SUPPORTS_CONCURRENT_SESSIONS = True def __init__(self) -> None: super().__init__() self._task_order = list(task_ids()) self._task_cursor = -1 self._task: Optional[TaskSpec] = None self._state = PythonCodeReviewState(episode_id=str(uuid4())) self._done = False self._last_status = "Call reset() to start." self._last_reward = RewardDetails(value=0.0, reason="Environment initialized.") self._best_visible_test_fraction = 0.0 self._best_quality_score = 0.0 self._full_correctness_awarded = False self._syntax_reward_awarded = False def reset( self, seed: Optional[int] = None, episode_id: Optional[str] = None, task_id: Optional[str] = None, **_: object, ) -> PythonCodeReviewObservation: """Reset the environment to the next deterministic task.""" del seed # Select task if task_id: self._task = get_task(task_id) self._task_cursor = self._task_order.index(task_id) else: self._task_cursor = (self._task_cursor + 1) % len(self._task_order) self._task = get_task(self._task_order[self._task_cursor]) # Reset episode state self._done = False self._best_visible_test_fraction = 0.0 self._best_quality_score = 0.0 self._full_correctness_awarded = False self._syntax_reward_awarded = False self._last_status = "Inspect the code, edit it, run tests, then submit." self._last_reward = RewardDetails(value=0.0, reason="Episode reset.") self._state = PythonCodeReviewState( episode_id=episode_id or str(uuid4()), step_count=0, task_id=self._task.task_id, difficulty=self._task.difficulty, task_kind=self._task.task_kind, attempts_remaining=self._task.max_steps, current_code=self._task.starter_code, errors="", test_results="Not run yet.", history=[], score=0.0, done=False, ) return self._build_observation() def step( self, action: PythonCodeReviewAction, timeout_s: Optional[float] = None, **_: object, ) -> PythonCodeReviewObservation: """Apply one structured action.""" del timeout_s if self._task is None: return self.reset() if self._done: self._last_reward = RewardDetails( value=-INVALID_ACTION_PENALTY, invalid_action_penalty=INVALID_ACTION_PENALTY, reason="Episode already completed.", ) self._last_status = "Episode already completed. Call reset() to continue." return self._build_observation() self._state.step_count += 1 status = "" reward = RewardDetails(value=0.0, reason="Action processed.") # Dispatch to handler based on action type if action.action_type == "analyze_code": reward, status = self._handle_analyze() elif action.action_type == "edit_code": reward, status = self._handle_edit(action) elif action.action_type == "run_tests": reward, status = self._handle_run_tests() elif action.action_type == "submit_solution": reward, status = self._handle_submit() else: reward = RewardDetails( value=-INVALID_ACTION_PENALTY, invalid_action_penalty=INVALID_ACTION_PENALTY, reason=f"Unsupported action_type: {action.action_type}", ) status = f"Invalid action: unsupported action_type '{action.action_type}'." self._last_reward = reward self._last_status = status self._state.attempts_remaining = max(self._task.max_steps - self._state.step_count, 0) self._state.done = self._done # Auto-submit if steps exhausted if self._state.attempts_remaining == 0 and not self._done: self._finalize_episode(auto_submit=True) self._state.done = True return self._build_observation() @property def state(self) -> PythonCodeReviewState: """Return the current environment state.""" return self._state.model_copy(deep=True) def list_task_summaries(self) -> List[object]: """Return public task metadata.""" return list_task_summaries() def list_tasks(self) -> List[object]: """Return all public task descriptors.""" return list_task_descriptors() def get_task(self, task_id: str) -> object: """Return a single task descriptor.""" return get_task(task_id).to_descriptor() def health(self) -> HealthResponse: """Return a simple health model.""" return HealthResponse(task_count=len(self._task_order)) def grade_task_submission(self, task_id: str, code: str) -> TaskGrade: """Expose deterministic grading outside of an active episode.""" return grade_task(code, get_task(task_id), include_hidden=True) def _build_observation(self) -> PythonCodeReviewObservation: """Build current observation from state.""" return PythonCodeReviewObservation( task_id=self._state.task_id or "", title=self._task.title if self._task else "", difficulty=self._state.difficulty or "easy", task_kind=self._state.task_kind, task_description=self._task.task_description if self._task else "", current_code=self._state.current_code, errors=self._state.errors, test_results=self._state.test_results, visible_tests=self._task.visible_tests if self._task else [], history=self._state.history, attempts_remaining=self._state.attempts_remaining, last_action_status=self._last_status, score=self._state.score, reward=self._last_reward.value, reward_details=self._last_reward, done=self._done, metadata={ "episode_id": self._state.episode_id, "step_count": self._state.step_count, "task_kind": self._state.task_kind, }, ) def _handle_analyze(self) -> tuple[RewardDetails, str]: """Analyze code for errors and test status.""" if self._task is None: return RewardDetails(value=0.0, reason="Invalid state"), "Error: task not loaded" grade = grade_task(self._state.current_code, self._task, include_hidden=False) error = grade.details.get("compile_error", "") if error: self._state.errors = error self._state.test_results = "Compilation failed. Fix syntax first." summary = f"Syntax error detected: {error}" else: self._state.errors = "" if self._task.task_kind == "syntax_fix": self._state.test_results = "Code compiles successfully." summary = "Code compiles. Ready to submit." else: visible_total = len(self._task.visible_tests) visible_passed = grade.tests_passed self._state.test_results = f"Test run: {visible_passed}/{visible_total} passing." summary = self._state.test_results reward_value = round((grade.score * 0.05) - self._grade_penalty(grade, failure_penalty=ANALYZE_FAILURE_PENALTY), 6) reward = RewardDetails(value=reward_value, reason=summary) self._append_history("analyze_code", summary, reward.value) self._sync_score(include_hidden=False) return reward, summary def _handle_edit(self, action: PythonCodeReviewAction) -> tuple[RewardDetails, str]: """Edit the code and compute reward for progress.""" if self._task is None: return RewardDetails(value=0.0, reason="Invalid state"), "Error: task not loaded" code = (action.code or "").strip() if not code: reward = RewardDetails( value=-INVALID_ACTION_PENALTY, invalid_action_penalty=INVALID_ACTION_PENALTY, reason="Edit action requires non-empty code.", ) status = "Invalid: edit_code requires code parameter." self._append_history("edit_code", status, reward.value) return reward, status # Grade before and after previous_grade = grade_task(self._state.current_code, self._task, include_hidden=False) new_grade = grade_task(code, self._task, include_hidden=False) self._state.current_code = code # Update state self._state.errors = new_grade.details.get("compile_error", "") self._state.test_results = self._format_test_results(new_grade) # Compute reward with shaping syntax_reward = 0.0 if previous_grade.syntax_score < 1.0 and new_grade.syntax_score == 1.0: syntax_reward = 0.2 self._syntax_reward_awarded = True quality_delta = new_grade.quality_score - previous_grade.quality_score quality_bonus = max(min(quality_delta * QUALITY_BONUS_SCALE, 0.1), -0.1) if new_grade.quality_score > self._best_quality_score: self._best_quality_score = new_grade.quality_score progress_reward = 0.2 * (new_grade.score - previous_grade.score) if new_grade.tests_total > 0: current_test_fraction = new_grade.tests_passed / new_grade.tests_total self._best_visible_test_fraction = max(self._best_visible_test_fraction, current_test_fraction) penalty = self._grade_penalty(new_grade) reward_value = round(progress_reward + syntax_reward + quality_bonus - penalty, 6) status = "Code updated." if self._state.errors: status = f"Code updated with syntax issues: {self._state.errors}" elif new_grade.tests_total > 0: status = self._state.test_results reward = RewardDetails( value=reward_value, syntax_reward=syntax_reward, quality_bonus=round(quality_bonus, 6), test_reward=round(progress_reward, 6), timeout_penalty=TIMEOUT_PENALTY if new_grade.timed_out else 0.0, reason=status, ) self._append_history("edit_code", status, reward_value) self._sync_score(include_hidden=False) return reward, status def _handle_run_tests(self) -> tuple[RewardDetails, str]: """Run tests and provide feedback.""" if self._task is None: return RewardDetails(value=0.0, reason="Invalid state"), "Error: task not loaded" grade = grade_task(self._state.current_code, self._task, include_hidden=False) self._state.errors = grade.details.get("compile_error", "") self._state.test_results = self._format_test_results(grade) previous_score = self._state.score progress_reward = 0.2 * (grade.score - previous_score) completion_bonus = 0.05 if grade.tests_total > 0 and grade.tests_passed == grade.tests_total else 0.0 penalty = self._grade_penalty(grade, failure_penalty=RUN_FAILURE_PENALTY) reward_value = round(progress_reward + completion_bonus - penalty, 6) if grade.tests_total > 0: current_fraction = grade.tests_passed / grade.tests_total self._best_visible_test_fraction = max(self._best_visible_test_fraction, current_fraction) status = self._state.test_results if not self._state.errors else self._state.errors reward = RewardDetails( value=reward_value, test_reward=round(progress_reward + completion_bonus, 6), timeout_penalty=TIMEOUT_PENALTY if grade.timed_out else 0.0, reason=status, ) self._append_history("run_tests", status, reward.value) self._sync_score(include_hidden=False) return reward, status def _handle_submit(self) -> tuple[RewardDetails, str]: """Submit solution and finalize episode.""" if self._task is None: return RewardDetails(value=0.0, reason="Invalid state"), "Error: task not loaded" grade = grade_task(self._state.current_code, self._task, include_hidden=True) self._state.errors = grade.details.get("compile_error", "") self._state.test_results = self._format_test_results(grade) # Compute final reward bonuses correctness_bonus = 0.0 if grade.score >= 0.999999 and not self._full_correctness_awarded: correctness_bonus = 0.5 self._full_correctness_awarded = True penalty = self._grade_penalty(grade, failure_penalty=RUN_FAILURE_PENALTY) reward_value = round((grade.score * SUBMIT_BASE_SCALE) + correctness_bonus - penalty, 6) self._finalize_episode(auto_submit=False, grade=grade) status = f"Solution submitted. Final score: {grade.score:.3f}" reward = RewardDetails( value=reward_value, correctness_bonus=correctness_bonus, timeout_penalty=TIMEOUT_PENALTY if grade.timed_out else 0.0, reason=status, ) self._append_history("submit_solution", status, reward_value) return reward, status def _finalize_episode(self, auto_submit: bool, grade: Optional[TaskGrade] = None) -> None: """Mark episode as done and set final score.""" if grade is None: if self._task is None: return grade = grade_task(self._state.current_code, self._task, include_hidden=True) self._state.errors = grade.details.get("compile_error", "") self._state.test_results = self._format_test_results(grade) self._state.score = grade.score self._done = True self._state.done = True if auto_submit: self._last_status = f"Step budget exhausted. Final score: {grade.score:.3f}" def _sync_score(self, include_hidden: bool) -> None: """Update visible score based on current code.""" if self._task is None: return grade = grade_task(self._state.current_code, self._task, include_hidden=include_hidden) # For visible runs, use a soft score; for hidden, it will be finalized on submit if not include_hidden: self._state.score = grade.score def _format_test_results(self, grade: TaskGrade) -> str: """Format test results for display.""" if grade.tests_total == 0: return "No tests available." if grade.timed_out: return "Test execution timed out." return f"Tests: {grade.tests_passed}/{grade.tests_total} passing" def _append_history(self, action_type: str, status: str, reward: float) -> None: """Append action to history.""" entry = HistoryEntry( step=self._state.step_count, action_type=action_type, status=status, reward=reward, ) self._state.history.append(entry) def _grade_penalty(self, grade: TaskGrade, failure_penalty: float = RUN_FAILURE_PENALTY) -> float: """Return a negative signal when the action leads to an obviously bad result.""" penalty = 0.0 if grade.details.get("compile_error"): penalty += failure_penalty + grade.score if grade.timed_out: penalty += TIMEOUT_PENALTY if grade.tests_total > 0 and grade.tests_passed == 0: penalty += failure_penalty return round(penalty, 6) # Backwards-compatible aliases used elsewhere in the repo. PythonEnvironment = PythonCodeReviewEnvironment CodeReviewEnvironment = PythonCodeReviewEnvironment