conscious_Ai-2 / agent.py
Ret's picture
Upload 16 files
2660a90 verified
"""
AI Agent for executing and formatting code responses.
This module provides functionality to execute Python code and format the results
in a consistent way. It's designed to be used by the expert system for handling
code-related tasks.
"""
import ast
import sys
import traceback
from typing import Any, Dict, Optional, Tuple
import json
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class CodeExecutionError(Exception):
"""Custom exception for code execution errors."""
pass
class CodeAgent:
"""
Agent for executing and formatting Python code.
This agent is responsible for:
1. Executing Python code in a controlled environment
2. Capturing and formatting the output
3. Handling errors gracefully
4. Providing rich output formatting
"""
def __init__(self):
"""Initialize the code agent with a clean environment."""
self.environment = self._create_clean_environment()
def _create_clean_environment(self) -> Dict[str, Any]:
"""Create a clean execution environment with essential built-ins."""
# Start with a clean environment
env = {
'__builtins__': {
# Safe built-ins
'abs': abs, 'all': all, 'any': any, 'ascii': ascii,
'bin': bin, 'bool': bool, 'bytearray': bytearray, 'bytes': bytes,
'chr': chr, 'complex': complex, 'dict': dict, 'divmod': divmod,
'enumerate': enumerate, 'filter': filter, 'float': float,
'format': format, 'frozenset': frozenset, 'hash': hash,
'hex': hex, 'int': int, 'iter': iter, 'len': len, 'list': list,
'map': map, 'max': max, 'min': min, 'next': next, 'oct': oct,
'ord': ord, 'pow': pow, 'range': range, 'repr': repr,
'reversed': reversed, 'round': round, 'set': set, 'slice': slice,
'sorted': sorted, 'str': str, 'sum': sum, 'tuple': tuple,
'zip': zip,
# Constants
'True': True, 'False': False, 'None': None
},
# Add common modules that are generally safe
'math': __import__('math'),
'random': __import__('random'),
'datetime': __import__('datetime'),
'json': __import__('json'),
'collections': __import__('collections'),
'itertools': __import__('itertools'),
'functools': __import__('functools'),
'operator': __import__('operator'),
're': __import__('re'),
}
return env
def execute_code(self, code: str) -> Tuple[Any, Optional[str], bool]:
"""
Execute Python code and return the result, output, and success status.
Args:
code: The Python code to execute
Returns:
Tuple of (result, output, success) where:
- result: The result of the last expression (or None)
- output: Any captured stdout/stderr output
- success: Boolean indicating if execution was successful
"""
# Redirect stdout and stderr
from io import StringIO
# Create a custom class to capture print output
class PrintCapture(StringIO):
def __init__(self):
super().__init__()
self.output = []
def write(self, text):
self.output.append(text)
return super().write(text)
def getvalue(self):
return ''.join(self.output)
# Set up the capture
output_capture = PrintCapture()
old_stdout = sys.stdout
old_stderr = sys.stderr
sys.stdout = output_capture
sys.stderr = output_capture
try:
# Parse the code to an AST
tree = ast.parse(code)
# If the last node is an expression, we'll capture its value
result = None
if tree.body and isinstance(tree.body[-1], ast.Expr):
# Extract the last expression
last_expr = tree.body.pop()
# Compile and execute the remaining code
if tree.body:
exec(
compile(
ast.Module(body=tree.body, type_ignores=[]),
'<string>', 'exec'
),
self.environment
)
# Evaluate the last expression
result = eval(
compile(
ast.Expression(body=last_expr.value),
'<string>', 'eval'
),
self.environment
)
else:
# No expression to evaluate, just execute the code
exec(code, self.environment)
# Get the captured output
output = output_capture.getvalue().strip()
return result, output, True
except Exception as e:
# Capture the full traceback
exc_type, exc_value, exc_traceback = sys.exc_info()
tb_lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
error_msg = ''.join(tb_lines)
# Get any output before the error
output = output_buffer.getvalue() + '\n' + error_msg
return None, output, False
finally:
# Restore stdout and stderr
sys.stdout = old_stdout
sys.stderr = old_stderr
def format_code_response(self, code: str, result: Any, output: str, success: bool) -> str:
"""
Format the code execution results into a human-readable string.
Args:
code: The original code that was executed
result: The result of the last expression (if any)
output: Any captured stdout/stderr output
success: Whether the execution was successful
Returns:
Formatted string with the code and its results
"""
response = []
# Add the code block
response.append("```python")
response.append(code)
response.append("```")
# Add the output if any
if output.strip():
response.append("\n**Output:**")
response.append("```")
response.append(output.strip())
response.append("```")
# Add the result if any
if result is not None:
response.append("\n**Result:**")
response.append("```")
response.append(str(result))
response.append("```")
# Add status
status = "✅ Success" if success else "❌ Error"
response.append(f"\n**Status:** {status}")
return '\n'.join(response)
def process_code(self, code: str) -> str:
"""
Process and execute Python code, returning a formatted response.
Args:
code: The Python code to execute
Returns:
Formatted string with the code and its results
"""
try:
# Clean and validate the code
code = code.strip()
if not code:
return "Error: No code provided"
# Execute the code
result, output, success = self.execute_code(code)
# Format the response
return self.format_code_response(code, result, output, success)
except Exception as e:
error_msg = f"Error processing code: {str(e)}"
logger.error(error_msg, exc_info=True)
return f"Error: {error_msg}"
def create_code_agent() -> 'CodeAgent':
"""
Factory function to create a new CodeAgent instance.
Returns:
A new instance of CodeAgent
"""
return CodeAgent()
# Create a default instance for convenience
agent = create_code_agent()
def execute_code(code: str) -> str:
"""
Execute Python code and return the formatted results.
This is a convenience function that uses the default agent instance.
Args:
code: The Python code to execute
Returns:
Formatted string with the code and its results
"""
return agent.process_code(code)
if __name__ == "__main__":
# Example usage
test_code = """
# Calculate Fibonacci sequence
def fibonacci(n):
a, b = 0, 1
result = []
while a < n:
result.append(a)
a, b = b, a + b
return result
# Print first 10 Fibonacci numbers
print("First 10 Fibonacci numbers:")
fib_sequence = fibonacci(100)
fib_sequence
"""
print(execute_code(test_code))