Spaces:
No application file
No application file
| """ | |
| Core environment logic for Code Debugging Challenge. | |
| """ | |
| import uuid | |
| import random | |
| import sys | |
| from io import StringIO | |
| from typing import Optional | |
| from openenv.core.env_server import Environment | |
| from ..models import DebugAction, DebugObservation, DebugState | |
| # Bug database with various Python bugs | |
| BUG_DATABASE = [ | |
| { | |
| "buggy_code": "def add_numbers(a, b):\n return a + b\n\nresult = add_numbers(5)\nprint(result)", | |
| "fixed_code": "def add_numbers(a, b):\n return a + b\n\nresult = add_numbers(5, 3)\nprint(result)", | |
| "expected_output": "8", | |
| "test_inputs": [], | |
| "hint": "Function is called with wrong number of arguments", | |
| "bug_type": "argument_count" | |
| }, | |
| { | |
| "buggy_code": "numbers = [1, 2, 3, 4, 5]\ntotal = 0\nfor i in range(len(numbers)):\n total += i\nprint(total)", | |
| "fixed_code": "numbers = [1, 2, 3, 4, 5]\ntotal = 0\nfor i in range(len(numbers)):\n total += numbers[i]\nprint(total)", | |
| "expected_output": "15", | |
| "test_inputs": [], | |
| "hint": "Loop variable is not being used correctly", | |
| "bug_type": "logic_error" | |
| }, | |
| { | |
| "buggy_code": "def divide(a, b):\n return a / b\n\nprint(divide(10, 0))", | |
| "fixed_code": "def divide(a, b):\n if b == 0:\n return 'Error: Division by zero'\n return a / b\n\nprint(divide(10, 0))", | |
| "expected_output": "Error: Division by zero", | |
| "test_inputs": [], | |
| "hint": "Need to handle edge case when dividing by zero", | |
| "bug_type": "exception_handling" | |
| }, | |
| { | |
| "buggy_code": "text = 'Hello World'\nprint(text[100])", | |
| "fixed_code": "text = 'Hello World'\nif len(text) > 100:\n print(text[100])\nelse:\n print('Index out of range')", | |
| "expected_output": "Index out of range", | |
| "test_inputs": [], | |
| "hint": "Index is out of bounds for the string", | |
| "bug_type": "index_error" | |
| }, | |
| { | |
| "buggy_code": "def factorial(n):\n if n == 0:\n return 1\n return n * factorial(n)\n\nprint(factorial(5))", | |
| "fixed_code": "def factorial(n):\n if n == 0:\n return 1\n return n * factorial(n - 1)\n\nprint(factorial(5))", | |
| "expected_output": "120", | |
| "test_inputs": [], | |
| "hint": "Recursive call is not reducing the problem size", | |
| "bug_type": "infinite_recursion" | |
| }, | |
| { | |
| "buggy_code": "name = 'Alice'\nage = 25\nprint('My name is ' + name + ' and I am ' + age + ' years old')", | |
| "fixed_code": "name = 'Alice'\nage = 25\nprint('My name is ' + name + ' and I am ' + str(age) + ' years old')", | |
| "expected_output": "My name is Alice and I am 25 years old", | |
| "test_inputs": [], | |
| "hint": "Cannot concatenate string and integer directly", | |
| "bug_type": "type_error" | |
| }, | |
| { | |
| "buggy_code": "my_dict = {'a': 1, 'b': 2}\nprint(my_dict['c'])", | |
| "fixed_code": "my_dict = {'a': 1, 'b': 2}\nprint(my_dict.get('c', 'Key not found'))", | |
| "expected_output": "Key not found", | |
| "test_inputs": [], | |
| "hint": "Key does not exist in dictionary", | |
| "bug_type": "key_error" | |
| }, | |
| ] | |
| class DebugEnvironment(Environment): | |
| """Code Debugging Challenge Environment.""" | |
| supports_concurrent_sessions = True | |
| def __init__(self): | |
| super().__init__() | |
| self._state = DebugState( | |
| episode_id=str(uuid.uuid4()), | |
| total_problems=len(BUG_DATABASE) | |
| ) | |
| self.current_problem = None | |
| def reset(self) -> DebugObservation: | |
| """Reset environment and return initial observation.""" | |
| self._state = DebugState( | |
| episode_id=str(uuid.uuid4()), | |
| total_problems=len(BUG_DATABASE) | |
| ) | |
| self.current_problem = random.choice(BUG_DATABASE) | |
| return DebugObservation( | |
| buggy_code=self.current_problem["buggy_code"], | |
| expected_output=self.current_problem["expected_output"], | |
| test_inputs=self.current_problem.get("test_inputs", []), | |
| attempts_remaining=self._state.max_attempts, | |
| success=False | |
| ) | |
| def step(self, action: DebugAction) -> DebugObservation: | |
| """Execute one step in the environment.""" | |
| self._state.attempts_made += 1 | |
| if action.action_type == "analyze": | |
| return self._handle_analyze() | |
| elif action.action_type == "fix": | |
| return self._handle_fix(action.content) | |
| elif action.action_type == "test": | |
| return self._handle_test() | |
| elif action.action_type == "submit": | |
| return self._handle_submit() | |
| else: | |
| return DebugObservation( | |
| buggy_code=self.current_problem["buggy_code"], | |
| expected_output=self.current_problem["expected_output"], | |
| test_inputs=self.current_problem.get("test_inputs", []), | |
| error_message="Invalid action type", | |
| attempts_remaining=self._state.max_attempts - self._state.attempts_made, | |
| success=False | |
| ) | |
| def _handle_analyze(self) -> DebugObservation: | |
| """Handle analyze action.""" | |
| return DebugObservation( | |
| buggy_code=self.current_problem["buggy_code"], | |
| expected_output=self.current_problem["expected_output"], | |
| test_inputs=self.current_problem.get("test_inputs", []), | |
| attempts_remaining=self._state.max_attempts - self._state.attempts_made, | |
| success=False | |
| ) | |
| def _handle_fix(self, code_fix: Optional[str]) -> DebugObservation: | |
| """Handle fix action.""" | |
| if code_fix is None: | |
| return DebugObservation( | |
| buggy_code=self.current_problem["buggy_code"], | |
| expected_output=self.current_problem["expected_output"], | |
| test_inputs=self.current_problem.get("test_inputs", []), | |
| error_message="No fix provided", | |
| attempts_remaining=self._state.max_attempts - self._state.attempts_made, | |
| success=False | |
| ) | |
| output, error = self._execute_code(code_fix) | |
| if error: | |
| return DebugObservation( | |
| buggy_code=self.current_problem["buggy_code"], | |
| expected_output=self.current_problem["expected_output"], | |
| test_inputs=self.current_problem.get("test_inputs", []), | |
| current_output=output, | |
| error_message=error, | |
| attempts_remaining=self._state.max_attempts - self._state.attempts_made, | |
| hint=self.current_problem["hint"] if self._state.attempts_made >= 2 else None, | |
| success=False | |
| ) | |
| if output.strip() == self.current_problem["expected_output"].strip(): | |
| self._state.solved = True | |
| self._state.score += 1.0 | |
| return DebugObservation( | |
| buggy_code=self.current_problem["buggy_code"], | |
| expected_output=self.current_problem["expected_output"], | |
| test_inputs=self.current_problem.get("test_inputs", []), | |
| current_output=output, | |
| attempts_remaining=self._state.max_attempts - self._state.attempts_made, | |
| success=True | |
| ) | |
| else: | |
| return DebugObservation( | |
| buggy_code=self.current_problem["buggy_code"], | |
| expected_output=self.current_problem["expected_output"], | |
| test_inputs=self.current_problem.get("test_inputs", []), | |
| current_output=output, | |
| error_message=f"Output mismatch. Got: {output.strip()}, Expected: {self.current_problem['expected_output'].strip()}", | |
| attempts_remaining=self._state.max_attempts - self._state.attempts_made, | |
| hint=self.current_problem["hint"] if self._state.attempts_made >= 2 else None, | |
| success=False | |
| ) | |
| def _handle_test(self) -> DebugObservation: | |
| """Handle test action - run the buggy code to see the error.""" | |
| output, error = self._execute_code(self.current_problem["buggy_code"]) | |
| return DebugObservation( | |
| buggy_code=self.current_problem["buggy_code"], | |
| expected_output=self.current_problem["expected_output"], | |
| test_inputs=self.current_problem.get("test_inputs", []), | |
| current_output=output, | |
| error_message=error, | |
| attempts_remaining=self._state.max_attempts - self._state.attempts_made, | |
| success=False | |
| ) | |
| def _handle_submit(self) -> DebugObservation: | |
| """Handle early submission without fixing.""" | |
| return DebugObservation( | |
| buggy_code=self.current_problem["buggy_code"], | |
| expected_output=self.current_problem["expected_output"], | |
| test_inputs=self.current_problem.get("test_inputs", []), | |
| attempts_remaining=0, | |
| success=False | |
| ) | |
| def _execute_code(self, code: str) -> tuple[str, Optional[str]]: | |
| """Safely execute code and capture output/errors.""" | |
| old_stdout = sys.stdout | |
| sys.stdout = StringIO() | |
| try: | |
| exec(code, {}) | |
| output = sys.stdout.getvalue() | |
| error = None | |
| except Exception as e: | |
| output = sys.stdout.getvalue() | |
| error = f"{type(e).__name__}: {str(e)}" | |
| finally: | |
| sys.stdout = old_stdout | |
| return output, error | |
| def state(self) -> DebugState: | |
| """Return current environment state.""" | |
| return self._state | |
| def reward(self, observation: DebugObservation) -> float: | |
| """Compute reward based on observation.""" | |
| if observation.success: | |
| return 1.0 | |
| if observation.error_message and "Error:" not in observation.error_message: | |
| return -0.3 | |
| if observation.current_output and not observation.success: | |
| return -0.2 | |
| if observation.current_output is None and observation.error_message is None: | |
| return 0.1 | |
| if observation.attempts_remaining == 0 and not observation.success: | |
| return -1.0 | |
| if observation.error_message == "No fix provided": | |
| return -0.5 | |
| return 0.0 | |
| def terminated(self, observation: DebugObservation) -> bool: | |
| """Episode terminates on success or max attempts.""" | |
| return observation.success or self._state.attempts_made >= self._state.max_attempts | |
| def truncated(self, observation: DebugObservation) -> bool: | |
| """Episode is truncated if max attempts reached without success.""" | |
| return (self._state.attempts_made >= self._state.max_attempts | |
| and not observation.success) | |