uvpatel7271's picture
Upload folder using huggingface_hub
605cd75 verified
raw
history blame
17.2 kB
"""Core OpenEnv environment for Python code review and repair tasks."""
from __future__ import annotations
from typing import List, Optional
from uuid import uuid4
from openenv.core.env_server.interfaces import Environment
from graders import grade_task
from models import (
HealthResponse,
HistoryEntry,
PythonCodeReviewAction,
PythonCodeReviewObservation,
PythonCodeReviewState,
RewardDetails,
TaskGrade,
)
from tasks import TaskSpec, get_task, list_task_descriptors, list_task_summaries, task_ids
# Reward shaping constants
INVALID_ACTION_PENALTY = 0.1
QUALITY_BONUS_SCALE = 0.15
ANALYZE_FAILURE_PENALTY = 0.05
RUN_FAILURE_PENALTY = 0.05
TIMEOUT_PENALTY = 0.1
SUBMIT_BASE_SCALE = 0.1
class PythonCodeReviewEnvironment(
Environment[PythonCodeReviewAction, PythonCodeReviewObservation, PythonCodeReviewState]
):
"""Production-style environment for reviewing and fixing Python code."""
SUPPORTS_CONCURRENT_SESSIONS = True
def __init__(self) -> None:
super().__init__()
self._task_order = list(task_ids())
self._task_cursor = -1
self._task: Optional[TaskSpec] = None
self._state = PythonCodeReviewState(episode_id=str(uuid4()))
self._done = False
self._last_status = "Call reset() to start."
self._last_reward = RewardDetails(value=0.0, reason="Environment initialized.")
self._best_visible_test_fraction = 0.0
self._best_quality_score = 0.0
self._full_correctness_awarded = False
self._syntax_reward_awarded = False
def reset(
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
task_id: Optional[str] = None,
**_: object,
) -> PythonCodeReviewObservation:
"""Reset the environment to the next deterministic task."""
del seed
# Select task
if task_id:
self._task = get_task(task_id)
self._task_cursor = self._task_order.index(task_id)
else:
self._task_cursor = (self._task_cursor + 1) % len(self._task_order)
self._task = get_task(self._task_order[self._task_cursor])
# Reset episode state
self._done = False
self._best_visible_test_fraction = 0.0
self._best_quality_score = 0.0
self._full_correctness_awarded = False
self._syntax_reward_awarded = False
self._last_status = "Inspect the code, edit it, run tests, then submit."
self._last_reward = RewardDetails(value=0.0, reason="Episode reset.")
self._state = PythonCodeReviewState(
episode_id=episode_id or str(uuid4()),
step_count=0,
task_id=self._task.task_id,
difficulty=self._task.difficulty,
task_kind=self._task.task_kind,
attempts_remaining=self._task.max_steps,
current_code=self._task.starter_code,
errors="",
test_results="Not run yet.",
history=[],
score=0.0,
done=False,
)
return self._build_observation()
def step(
self,
action: PythonCodeReviewAction,
timeout_s: Optional[float] = None,
**_: object,
) -> PythonCodeReviewObservation:
"""Apply one structured action."""
del timeout_s
if self._task is None:
return self.reset()
if self._done:
self._last_reward = RewardDetails(
value=-INVALID_ACTION_PENALTY,
invalid_action_penalty=INVALID_ACTION_PENALTY,
reason="Episode already completed.",
)
self._last_status = "Episode already completed. Call reset() to continue."
return self._build_observation()
self._state.step_count += 1
status = ""
reward = RewardDetails(value=0.0, reason="Action processed.")
# Dispatch to handler based on action type
if action.action_type == "analyze_code":
reward, status = self._handle_analyze()
elif action.action_type == "edit_code":
reward, status = self._handle_edit(action)
elif action.action_type == "run_tests":
reward, status = self._handle_run_tests()
elif action.action_type == "submit_solution":
reward, status = self._handle_submit()
else:
reward = RewardDetails(
value=-INVALID_ACTION_PENALTY,
invalid_action_penalty=INVALID_ACTION_PENALTY,
reason=f"Unsupported action_type: {action.action_type}",
)
status = f"Invalid action: unsupported action_type '{action.action_type}'."
self._last_reward = reward
self._last_status = status
self._state.attempts_remaining = max(self._task.max_steps - self._state.step_count, 0)
self._state.done = self._done
# Auto-submit if steps exhausted
if self._state.attempts_remaining == 0 and not self._done:
self._finalize_episode(auto_submit=True)
self._state.done = True
return self._build_observation()
@property
def state(self) -> PythonCodeReviewState:
"""Return the current environment state."""
return self._state.model_copy(deep=True)
def list_task_summaries(self) -> List[object]:
"""Return public task metadata."""
return list_task_summaries()
def list_tasks(self) -> List[object]:
"""Return all public task descriptors."""
return list_task_descriptors()
def get_task(self, task_id: str) -> object:
"""Return a single task descriptor."""
return get_task(task_id).to_descriptor()
def health(self) -> HealthResponse:
"""Return a simple health model."""
return HealthResponse(task_count=len(self._task_order))
def grade_task_submission(self, task_id: str, code: str) -> TaskGrade:
"""Expose deterministic grading outside of an active episode."""
return grade_task(code, get_task(task_id), include_hidden=True)
def _build_observation(self) -> PythonCodeReviewObservation:
"""Build current observation from state."""
return PythonCodeReviewObservation(
task_id=self._state.task_id or "",
title=self._task.title if self._task else "",
difficulty=self._state.difficulty or "easy",
task_kind=self._state.task_kind,
task_description=self._task.task_description if self._task else "",
current_code=self._state.current_code,
errors=self._state.errors,
test_results=self._state.test_results,
visible_tests=self._task.visible_tests if self._task else [],
history=self._state.history,
attempts_remaining=self._state.attempts_remaining,
last_action_status=self._last_status,
score=self._state.score,
reward=self._last_reward.value,
reward_details=self._last_reward,
done=self._done,
metadata={
"episode_id": self._state.episode_id,
"step_count": self._state.step_count,
"task_kind": self._state.task_kind,
},
)
def _handle_analyze(self) -> tuple[RewardDetails, str]:
"""Analyze code for errors and test status."""
if self._task is None:
return RewardDetails(value=0.0, reason="Invalid state"), "Error: task not loaded"
grade = grade_task(self._state.current_code, self._task, include_hidden=False)
error = grade.details.get("compile_error", "")
if error:
self._state.errors = error
self._state.test_results = "Compilation failed. Fix syntax first."
summary = f"Syntax error detected: {error}"
else:
self._state.errors = ""
if self._task.task_kind == "syntax_fix":
self._state.test_results = "Code compiles successfully."
summary = "Code compiles. Ready to submit."
else:
visible_total = len(self._task.visible_tests)
visible_passed = grade.tests_passed
self._state.test_results = f"Test run: {visible_passed}/{visible_total} passing."
summary = self._state.test_results
reward_value = round((grade.score * 0.05) - self._grade_penalty(grade, failure_penalty=ANALYZE_FAILURE_PENALTY), 6)
reward = RewardDetails(value=reward_value, reason=summary)
self._append_history("analyze_code", summary, reward.value)
self._sync_score(include_hidden=False)
return reward, summary
def _handle_edit(self, action: PythonCodeReviewAction) -> tuple[RewardDetails, str]:
"""Edit the code and compute reward for progress."""
if self._task is None:
return RewardDetails(value=0.0, reason="Invalid state"), "Error: task not loaded"
code = (action.code or "").strip()
if not code:
reward = RewardDetails(
value=-INVALID_ACTION_PENALTY,
invalid_action_penalty=INVALID_ACTION_PENALTY,
reason="Edit action requires non-empty code.",
)
status = "Invalid: edit_code requires code parameter."
self._append_history("edit_code", status, reward.value)
return reward, status
# Grade before and after
previous_grade = grade_task(self._state.current_code, self._task, include_hidden=False)
new_grade = grade_task(code, self._task, include_hidden=False)
self._state.current_code = code
# Update state
self._state.errors = new_grade.details.get("compile_error", "")
self._state.test_results = self._format_test_results(new_grade)
# Compute reward with shaping
syntax_reward = 0.0
if previous_grade.syntax_score < 1.0 and new_grade.syntax_score == 1.0:
syntax_reward = 0.2
self._syntax_reward_awarded = True
quality_delta = new_grade.quality_score - previous_grade.quality_score
quality_bonus = max(min(quality_delta * QUALITY_BONUS_SCALE, 0.1), -0.1)
if new_grade.quality_score > self._best_quality_score:
self._best_quality_score = new_grade.quality_score
progress_reward = 0.2 * (new_grade.score - previous_grade.score)
if new_grade.tests_total > 0:
current_test_fraction = new_grade.tests_passed / new_grade.tests_total
self._best_visible_test_fraction = max(self._best_visible_test_fraction, current_test_fraction)
penalty = self._grade_penalty(new_grade)
reward_value = round(progress_reward + syntax_reward + quality_bonus - penalty, 6)
status = "Code updated."
if self._state.errors:
status = f"Code updated with syntax issues: {self._state.errors}"
elif new_grade.tests_total > 0:
status = self._state.test_results
reward = RewardDetails(
value=reward_value,
syntax_reward=syntax_reward,
quality_bonus=round(quality_bonus, 6),
test_reward=round(progress_reward, 6),
timeout_penalty=TIMEOUT_PENALTY if new_grade.timed_out else 0.0,
reason=status,
)
self._append_history("edit_code", status, reward_value)
self._sync_score(include_hidden=False)
return reward, status
def _handle_run_tests(self) -> tuple[RewardDetails, str]:
"""Run tests and provide feedback."""
if self._task is None:
return RewardDetails(value=0.0, reason="Invalid state"), "Error: task not loaded"
grade = grade_task(self._state.current_code, self._task, include_hidden=False)
self._state.errors = grade.details.get("compile_error", "")
self._state.test_results = self._format_test_results(grade)
previous_score = self._state.score
progress_reward = 0.2 * (grade.score - previous_score)
completion_bonus = 0.05 if grade.tests_total > 0 and grade.tests_passed == grade.tests_total else 0.0
penalty = self._grade_penalty(grade, failure_penalty=RUN_FAILURE_PENALTY)
reward_value = round(progress_reward + completion_bonus - penalty, 6)
if grade.tests_total > 0:
current_fraction = grade.tests_passed / grade.tests_total
self._best_visible_test_fraction = max(self._best_visible_test_fraction, current_fraction)
status = self._state.test_results if not self._state.errors else self._state.errors
reward = RewardDetails(
value=reward_value,
test_reward=round(progress_reward + completion_bonus, 6),
timeout_penalty=TIMEOUT_PENALTY if grade.timed_out else 0.0,
reason=status,
)
self._append_history("run_tests", status, reward.value)
self._sync_score(include_hidden=False)
return reward, status
def _handle_submit(self) -> tuple[RewardDetails, str]:
"""Submit solution and finalize episode."""
if self._task is None:
return RewardDetails(value=0.0, reason="Invalid state"), "Error: task not loaded"
grade = grade_task(self._state.current_code, self._task, include_hidden=True)
self._state.errors = grade.details.get("compile_error", "")
self._state.test_results = self._format_test_results(grade)
# Compute final reward bonuses
correctness_bonus = 0.0
if grade.score >= 0.999999 and not self._full_correctness_awarded:
correctness_bonus = 0.5
self._full_correctness_awarded = True
penalty = self._grade_penalty(grade, failure_penalty=RUN_FAILURE_PENALTY)
reward_value = round((grade.score * SUBMIT_BASE_SCALE) + correctness_bonus - penalty, 6)
self._finalize_episode(auto_submit=False, grade=grade)
status = f"Solution submitted. Final score: {grade.score:.3f}"
reward = RewardDetails(
value=reward_value,
correctness_bonus=correctness_bonus,
timeout_penalty=TIMEOUT_PENALTY if grade.timed_out else 0.0,
reason=status,
)
self._append_history("submit_solution", status, reward_value)
return reward, status
def _finalize_episode(self, auto_submit: bool, grade: Optional[TaskGrade] = None) -> None:
"""Mark episode as done and set final score."""
if grade is None:
if self._task is None:
return
grade = grade_task(self._state.current_code, self._task, include_hidden=True)
self._state.errors = grade.details.get("compile_error", "")
self._state.test_results = self._format_test_results(grade)
self._state.score = grade.score
self._done = True
self._state.done = True
if auto_submit:
self._last_status = f"Step budget exhausted. Final score: {grade.score:.3f}"
def _sync_score(self, include_hidden: bool) -> None:
"""Update visible score based on current code."""
if self._task is None:
return
grade = grade_task(self._state.current_code, self._task, include_hidden=include_hidden)
# For visible runs, use a soft score; for hidden, it will be finalized on submit
if not include_hidden:
self._state.score = grade.score
def _format_test_results(self, grade: TaskGrade) -> str:
"""Format test results for display."""
if grade.tests_total == 0:
return "No tests available."
if grade.timed_out:
return "Test execution timed out."
return f"Tests: {grade.tests_passed}/{grade.tests_total} passing"
def _append_history(self, action_type: str, status: str, reward: float) -> None:
"""Append action to history."""
entry = HistoryEntry(
step=self._state.step_count,
action_type=action_type,
status=status,
reward=reward,
)
self._state.history.append(entry)
def _grade_penalty(self, grade: TaskGrade, failure_penalty: float = RUN_FAILURE_PENALTY) -> float:
"""Return a negative signal when the action leads to an obviously bad result."""
penalty = 0.0
if grade.details.get("compile_error"):
penalty += failure_penalty + grade.score
if grade.timed_out:
penalty += TIMEOUT_PENALTY
if grade.tests_total > 0 and grade.tests_passed == 0:
penalty += failure_penalty
return round(penalty, 6)
# Backwards-compatible aliases used elsewhere in the repo.
PythonEnvironment = PythonCodeReviewEnvironment
CodeReviewEnvironment = PythonCodeReviewEnvironment