""" Core environment logic for Code Debugging Challenge. """ import uuid import random import sys from io import StringIO from typing import Optional from openenv.core.env_server import Environment from ..models import DebugAction, DebugObservation, DebugState # Bug database with various Python bugs BUG_DATABASE = [ { "buggy_code": "def add_numbers(a, b):\n return a + b\n\nresult = add_numbers(5)\nprint(result)", "fixed_code": "def add_numbers(a, b):\n return a + b\n\nresult = add_numbers(5, 3)\nprint(result)", "expected_output": "8", "test_inputs": [], "hint": "Function is called with wrong number of arguments", "bug_type": "argument_count" }, { "buggy_code": "numbers = [1, 2, 3, 4, 5]\ntotal = 0\nfor i in range(len(numbers)):\n total += i\nprint(total)", "fixed_code": "numbers = [1, 2, 3, 4, 5]\ntotal = 0\nfor i in range(len(numbers)):\n total += numbers[i]\nprint(total)", "expected_output": "15", "test_inputs": [], "hint": "Loop variable is not being used correctly", "bug_type": "logic_error" }, { "buggy_code": "def divide(a, b):\n return a / b\n\nprint(divide(10, 0))", "fixed_code": "def divide(a, b):\n if b == 0:\n return 'Error: Division by zero'\n return a / b\n\nprint(divide(10, 0))", "expected_output": "Error: Division by zero", "test_inputs": [], "hint": "Need to handle edge case when dividing by zero", "bug_type": "exception_handling" }, { "buggy_code": "text = 'Hello World'\nprint(text[100])", "fixed_code": "text = 'Hello World'\nif len(text) > 100:\n print(text[100])\nelse:\n print('Index out of range')", "expected_output": "Index out of range", "test_inputs": [], "hint": "Index is out of bounds for the string", "bug_type": "index_error" }, { "buggy_code": "def factorial(n):\n if n == 0:\n return 1\n return n * factorial(n)\n\nprint(factorial(5))", "fixed_code": "def factorial(n):\n if n == 0:\n return 1\n return n * factorial(n - 1)\n\nprint(factorial(5))", "expected_output": "120", "test_inputs": [], "hint": "Recursive call is not reducing the problem size", "bug_type": "infinite_recursion" }, { "buggy_code": "name = 'Alice'\nage = 25\nprint('My name is ' + name + ' and I am ' + age + ' years old')", "fixed_code": "name = 'Alice'\nage = 25\nprint('My name is ' + name + ' and I am ' + str(age) + ' years old')", "expected_output": "My name is Alice and I am 25 years old", "test_inputs": [], "hint": "Cannot concatenate string and integer directly", "bug_type": "type_error" }, { "buggy_code": "my_dict = {'a': 1, 'b': 2}\nprint(my_dict['c'])", "fixed_code": "my_dict = {'a': 1, 'b': 2}\nprint(my_dict.get('c', 'Key not found'))", "expected_output": "Key not found", "test_inputs": [], "hint": "Key does not exist in dictionary", "bug_type": "key_error" }, ] class DebugEnvironment(Environment): """Code Debugging Challenge Environment.""" supports_concurrent_sessions = True def __init__(self): super().__init__() self._state = DebugState( episode_id=str(uuid.uuid4()), total_problems=len(BUG_DATABASE) ) self.current_problem = None def reset(self) -> DebugObservation: """Reset environment and return initial observation.""" self._state = DebugState( episode_id=str(uuid.uuid4()), total_problems=len(BUG_DATABASE) ) self.current_problem = random.choice(BUG_DATABASE) return DebugObservation( buggy_code=self.current_problem["buggy_code"], expected_output=self.current_problem["expected_output"], test_inputs=self.current_problem.get("test_inputs", []), attempts_remaining=self._state.max_attempts, success=False ) def step(self, action: DebugAction) -> DebugObservation: """Execute one step in the environment.""" self._state.attempts_made += 1 if action.action_type == "analyze": return self._handle_analyze() elif action.action_type == "fix": return self._handle_fix(action.content) elif action.action_type == "test": return self._handle_test() elif action.action_type == "submit": return self._handle_submit() else: return DebugObservation( buggy_code=self.current_problem["buggy_code"], expected_output=self.current_problem["expected_output"], test_inputs=self.current_problem.get("test_inputs", []), error_message="Invalid action type", attempts_remaining=self._state.max_attempts - self._state.attempts_made, success=False ) def _handle_analyze(self) -> DebugObservation: """Handle analyze action.""" return DebugObservation( buggy_code=self.current_problem["buggy_code"], expected_output=self.current_problem["expected_output"], test_inputs=self.current_problem.get("test_inputs", []), attempts_remaining=self._state.max_attempts - self._state.attempts_made, success=False ) def _handle_fix(self, code_fix: Optional[str]) -> DebugObservation: """Handle fix action.""" if code_fix is None: return DebugObservation( buggy_code=self.current_problem["buggy_code"], expected_output=self.current_problem["expected_output"], test_inputs=self.current_problem.get("test_inputs", []), error_message="No fix provided", attempts_remaining=self._state.max_attempts - self._state.attempts_made, success=False ) output, error = self._execute_code(code_fix) if error: return DebugObservation( buggy_code=self.current_problem["buggy_code"], expected_output=self.current_problem["expected_output"], test_inputs=self.current_problem.get("test_inputs", []), current_output=output, error_message=error, attempts_remaining=self._state.max_attempts - self._state.attempts_made, hint=self.current_problem["hint"] if self._state.attempts_made >= 2 else None, success=False ) if output.strip() == self.current_problem["expected_output"].strip(): self._state.solved = True self._state.score += 1.0 return DebugObservation( buggy_code=self.current_problem["buggy_code"], expected_output=self.current_problem["expected_output"], test_inputs=self.current_problem.get("test_inputs", []), current_output=output, attempts_remaining=self._state.max_attempts - self._state.attempts_made, success=True ) else: return DebugObservation( buggy_code=self.current_problem["buggy_code"], expected_output=self.current_problem["expected_output"], test_inputs=self.current_problem.get("test_inputs", []), current_output=output, error_message=f"Output mismatch. Got: {output.strip()}, Expected: {self.current_problem['expected_output'].strip()}", attempts_remaining=self._state.max_attempts - self._state.attempts_made, hint=self.current_problem["hint"] if self._state.attempts_made >= 2 else None, success=False ) def _handle_test(self) -> DebugObservation: """Handle test action - run the buggy code to see the error.""" output, error = self._execute_code(self.current_problem["buggy_code"]) return DebugObservation( buggy_code=self.current_problem["buggy_code"], expected_output=self.current_problem["expected_output"], test_inputs=self.current_problem.get("test_inputs", []), current_output=output, error_message=error, attempts_remaining=self._state.max_attempts - self._state.attempts_made, success=False ) def _handle_submit(self) -> DebugObservation: """Handle early submission without fixing.""" return DebugObservation( buggy_code=self.current_problem["buggy_code"], expected_output=self.current_problem["expected_output"], test_inputs=self.current_problem.get("test_inputs", []), attempts_remaining=0, success=False ) def _execute_code(self, code: str) -> tuple[str, Optional[str]]: """Safely execute code and capture output/errors.""" old_stdout = sys.stdout sys.stdout = StringIO() try: exec(code, {}) output = sys.stdout.getvalue() error = None except Exception as e: output = sys.stdout.getvalue() error = f"{type(e).__name__}: {str(e)}" finally: sys.stdout = old_stdout return output, error @property def state(self) -> DebugState: """Return current environment state.""" return self._state def reward(self, observation: DebugObservation) -> float: """Compute reward based on observation.""" if observation.success: return 1.0 if observation.error_message and "Error:" not in observation.error_message: return -0.3 if observation.current_output and not observation.success: return -0.2 if observation.current_output is None and observation.error_message is None: return 0.1 if observation.attempts_remaining == 0 and not observation.success: return -1.0 if observation.error_message == "No fix provided": return -0.5 return 0.0 def terminated(self, observation: DebugObservation) -> bool: """Episode terminates on success or max attempts.""" return observation.success or self._state.attempts_made >= self._state.max_attempts def truncated(self, observation: DebugObservation) -> bool: """Episode is truncated if max attempts reached without success.""" return (self._state.attempts_made >= self._state.max_attempts and not observation.success)