| | """ |
| | AI Agent for executing and formatting code responses. |
| | |
| | This module provides functionality to execute Python code and format the results |
| | in a consistent way. It's designed to be used by the expert system for handling |
| | code-related tasks. |
| | """ |
| | import ast |
| | import sys |
| | import traceback |
| | from typing import Any, Dict, Optional, Tuple |
| | import json |
| | import logging |
| |
|
| | |
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| | ) |
| | logger = logging.getLogger(__name__) |
| |
|
| | class CodeExecutionError(Exception): |
| | """Custom exception for code execution errors.""" |
| | pass |
| |
|
| | class CodeAgent: |
| | """ |
| | Agent for executing and formatting Python code. |
| | |
| | This agent is responsible for: |
| | 1. Executing Python code in a controlled environment |
| | 2. Capturing and formatting the output |
| | 3. Handling errors gracefully |
| | 4. Providing rich output formatting |
| | """ |
| | |
| | def __init__(self): |
| | """Initialize the code agent with a clean environment.""" |
| | self.environment = self._create_clean_environment() |
| | |
| | def _create_clean_environment(self) -> Dict[str, Any]: |
| | """Create a clean execution environment with essential built-ins.""" |
| | |
| | env = { |
| | '__builtins__': { |
| | |
| | 'abs': abs, 'all': all, 'any': any, 'ascii': ascii, |
| | 'bin': bin, 'bool': bool, 'bytearray': bytearray, 'bytes': bytes, |
| | 'chr': chr, 'complex': complex, 'dict': dict, 'divmod': divmod, |
| | 'enumerate': enumerate, 'filter': filter, 'float': float, |
| | 'format': format, 'frozenset': frozenset, 'hash': hash, |
| | 'hex': hex, 'int': int, 'iter': iter, 'len': len, 'list': list, |
| | 'map': map, 'max': max, 'min': min, 'next': next, 'oct': oct, |
| | 'ord': ord, 'pow': pow, 'range': range, 'repr': repr, |
| | 'reversed': reversed, 'round': round, 'set': set, 'slice': slice, |
| | 'sorted': sorted, 'str': str, 'sum': sum, 'tuple': tuple, |
| | 'zip': zip, |
| | |
| | 'True': True, 'False': False, 'None': None |
| | }, |
| | |
| | 'math': __import__('math'), |
| | 'random': __import__('random'), |
| | 'datetime': __import__('datetime'), |
| | 'json': __import__('json'), |
| | 'collections': __import__('collections'), |
| | 'itertools': __import__('itertools'), |
| | 'functools': __import__('functools'), |
| | 'operator': __import__('operator'), |
| | 're': __import__('re'), |
| | } |
| | return env |
| | |
| | def execute_code(self, code: str) -> Tuple[Any, Optional[str], bool]: |
| | """ |
| | Execute Python code and return the result, output, and success status. |
| | |
| | Args: |
| | code: The Python code to execute |
| | |
| | Returns: |
| | Tuple of (result, output, success) where: |
| | - result: The result of the last expression (or None) |
| | - output: Any captured stdout/stderr output |
| | - success: Boolean indicating if execution was successful |
| | """ |
| | |
| | from io import StringIO |
| | |
| | |
| | class PrintCapture(StringIO): |
| | def __init__(self): |
| | super().__init__() |
| | self.output = [] |
| | |
| | def write(self, text): |
| | self.output.append(text) |
| | return super().write(text) |
| | |
| | def getvalue(self): |
| | return ''.join(self.output) |
| | |
| | |
| | output_capture = PrintCapture() |
| | old_stdout = sys.stdout |
| | old_stderr = sys.stderr |
| | sys.stdout = output_capture |
| | sys.stderr = output_capture |
| | |
| | try: |
| | |
| | tree = ast.parse(code) |
| | |
| | |
| | result = None |
| | if tree.body and isinstance(tree.body[-1], ast.Expr): |
| | |
| | last_expr = tree.body.pop() |
| | |
| | if tree.body: |
| | exec( |
| | compile( |
| | ast.Module(body=tree.body, type_ignores=[]), |
| | '<string>', 'exec' |
| | ), |
| | self.environment |
| | ) |
| | |
| | result = eval( |
| | compile( |
| | ast.Expression(body=last_expr.value), |
| | '<string>', 'eval' |
| | ), |
| | self.environment |
| | ) |
| | else: |
| | |
| | exec(code, self.environment) |
| | |
| | |
| | output = output_capture.getvalue().strip() |
| | |
| | return result, output, True |
| | |
| | except Exception as e: |
| | |
| | exc_type, exc_value, exc_traceback = sys.exc_info() |
| | tb_lines = traceback.format_exception(exc_type, exc_value, exc_traceback) |
| | error_msg = ''.join(tb_lines) |
| | |
| | |
| | output = output_buffer.getvalue() + '\n' + error_msg |
| | |
| | return None, output, False |
| | |
| | finally: |
| | |
| | sys.stdout = old_stdout |
| | sys.stderr = old_stderr |
| | |
| | def format_code_response(self, code: str, result: Any, output: str, success: bool) -> str: |
| | """ |
| | Format the code execution results into a human-readable string. |
| | |
| | Args: |
| | code: The original code that was executed |
| | result: The result of the last expression (if any) |
| | output: Any captured stdout/stderr output |
| | success: Whether the execution was successful |
| | |
| | Returns: |
| | Formatted string with the code and its results |
| | """ |
| | response = [] |
| | |
| | |
| | response.append("```python") |
| | response.append(code) |
| | response.append("```") |
| | |
| | |
| | if output.strip(): |
| | response.append("\n**Output:**") |
| | response.append("```") |
| | response.append(output.strip()) |
| | response.append("```") |
| | |
| | |
| | if result is not None: |
| | response.append("\n**Result:**") |
| | response.append("```") |
| | response.append(str(result)) |
| | response.append("```") |
| | |
| | |
| | status = "✅ Success" if success else "❌ Error" |
| | response.append(f"\n**Status:** {status}") |
| | |
| | return '\n'.join(response) |
| | |
| | def process_code(self, code: str) -> str: |
| | """ |
| | Process and execute Python code, returning a formatted response. |
| | |
| | Args: |
| | code: The Python code to execute |
| | |
| | Returns: |
| | Formatted string with the code and its results |
| | """ |
| | try: |
| | |
| | code = code.strip() |
| | if not code: |
| | return "Error: No code provided" |
| | |
| | |
| | result, output, success = self.execute_code(code) |
| | |
| | |
| | return self.format_code_response(code, result, output, success) |
| | |
| | except Exception as e: |
| | error_msg = f"Error processing code: {str(e)}" |
| | logger.error(error_msg, exc_info=True) |
| | return f"Error: {error_msg}" |
| |
|
| |
|
| | def create_code_agent() -> 'CodeAgent': |
| | """ |
| | Factory function to create a new CodeAgent instance. |
| | |
| | Returns: |
| | A new instance of CodeAgent |
| | """ |
| | return CodeAgent() |
| |
|
| |
|
| | |
| | agent = create_code_agent() |
| |
|
| |
|
| | def execute_code(code: str) -> str: |
| | """ |
| | Execute Python code and return the formatted results. |
| | |
| | This is a convenience function that uses the default agent instance. |
| | |
| | Args: |
| | code: The Python code to execute |
| | |
| | Returns: |
| | Formatted string with the code and its results |
| | """ |
| | return agent.process_code(code) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | |
| | test_code = """ |
| | # Calculate Fibonacci sequence |
| | def fibonacci(n): |
| | a, b = 0, 1 |
| | result = [] |
| | while a < n: |
| | result.append(a) |
| | a, b = b, a + b |
| | return result |
| | |
| | # Print first 10 Fibonacci numbers |
| | print("First 10 Fibonacci numbers:") |
| | fib_sequence = fibonacci(100) |
| | fib_sequence |
| | """ |
| | |
| | print(execute_code(test_code)) |