|
|
import ast |
|
|
import contextlib |
|
|
import io |
|
|
import importlib |
|
|
import sys |
|
|
import os |
|
|
import traceback |
|
|
from typing import List, Set, Optional, Union, Dict |
|
|
from .interpreter_base import BaseInterpreter |
|
|
from .tool import Tool,Toolkit |
|
|
from .storage_handler import FileStorageHandler |
|
|
from pydantic import Field |
|
|
|
|
|
|
|
|
DEFAULT_ENCODING = 'utf-8' |
|
|
|
|
|
class PythonInterpreter(BaseInterpreter): |
|
|
|
|
|
project_path:Optional[str] = Field(default=".", description="Path to the project directory") |
|
|
directory_names:Optional[List[str]] = Field(default_factory=list, description="List of directory names to check for imports") |
|
|
allowed_imports:Optional[Set[str]] = Field(default_factory=set, description="Set of allowed imports") |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
name: str = 'PythonInterpreter', |
|
|
project_path:Optional[str] = ".", |
|
|
directory_names:Optional[List[str]] = [], |
|
|
allowed_imports:Optional[Set[str]] = None, |
|
|
storage_handler: FileStorageHandler = None, |
|
|
**kwargs |
|
|
): |
|
|
""" |
|
|
Initialize a Python interpreter for executing code in a controlled environment. |
|
|
|
|
|
Args: |
|
|
name (str): The name of the interpreter |
|
|
project_path (Optional[str]): Path to the project directory for module resolution |
|
|
directory_names (Optional[List[str]]): List of directory names to check for imports |
|
|
allowed_imports (Optional[Set[str]]): Set of allowed module imports to enforce security |
|
|
storage_handler (Optional[FileStorageHandler]): Storage handler for file operations |
|
|
**kwargs: Additional data to pass to the parent class |
|
|
""" |
|
|
super().__init__( |
|
|
name=name, |
|
|
project_path=project_path, |
|
|
directory_names=directory_names, |
|
|
allowed_imports=allowed_imports, |
|
|
**kwargs |
|
|
) |
|
|
self.allowed_imports = allowed_imports or set() |
|
|
self.namespace = {} |
|
|
self.visited_modules = {} |
|
|
|
|
|
|
|
|
if storage_handler is None: |
|
|
from .storage_handler import LocalStorageHandler |
|
|
self.storage_handler = LocalStorageHandler(base_path="./workplace/interpreter") |
|
|
else: |
|
|
self.storage_handler = storage_handler |
|
|
|
|
|
def _get_file_and_folder_names(self, target_path: str) -> List[str]: |
|
|
"""Retrieves the names of files and folders (without extensions) in a given directory. |
|
|
Args: |
|
|
target_path (str): Path to the target directory. |
|
|
Returns: |
|
|
List[str]: List of file and folder names (excluding extensions). |
|
|
""" |
|
|
names = [] |
|
|
for item in os.listdir(target_path): |
|
|
name, _ = os.path.splitext(item) |
|
|
names.append(name) |
|
|
return names |
|
|
|
|
|
def _extract_definitions(self, module_name: str, path: str, potential_names: Optional[Set[str]] = None) -> List[str]: |
|
|
"""Extracts function and class definitions from a module file while ensuring safety. |
|
|
Args: |
|
|
module_name (str): The name of the module. |
|
|
path (str): The file path of the module. |
|
|
potential_names (Optional[Set[str]]): The specific functions/classes to import (for ImportFrom). |
|
|
Returns: |
|
|
List[str]: A list of violations found during analysis. An empty list indicates no issues. |
|
|
""" |
|
|
if path in self.namespace: |
|
|
return [] |
|
|
|
|
|
try: |
|
|
|
|
|
module_spec = importlib.util.spec_from_file_location(module_name, path) |
|
|
loaded_module = importlib.util.module_from_spec(module_spec) |
|
|
module_spec.loader.exec_module(loaded_module) |
|
|
|
|
|
|
|
|
self.namespace[module_name] = loaded_module |
|
|
|
|
|
except Exception: |
|
|
exc_type, exc_value, exc_tb = sys.exc_info() |
|
|
return ["".join(traceback.format_exception(exc_type, exc_value, exc_tb))] |
|
|
|
|
|
|
|
|
|
|
|
result = self.storage_handler.read(path) |
|
|
if result["success"]: |
|
|
code = result["content"] |
|
|
else: |
|
|
raise FileNotFoundError(f"Could not read file {path}: {result.get('error', 'Unknown error')}") |
|
|
|
|
|
|
|
|
violations = self._analyze_code(code) |
|
|
if violations: |
|
|
return violations |
|
|
|
|
|
|
|
|
tree = ast.parse(code) |
|
|
available_symbols = {} |
|
|
|
|
|
for node in tree.body: |
|
|
if isinstance(node, (ast.FunctionDef, ast.ClassDef)): |
|
|
available_symbols[node.name] = node |
|
|
|
|
|
|
|
|
try: |
|
|
spec = importlib.util.spec_from_file_location(module_name, path) |
|
|
module = importlib.util.module_from_spec(spec) |
|
|
spec.loader.exec_module(module) |
|
|
|
|
|
if potential_names is None: |
|
|
|
|
|
for name in available_symbols: |
|
|
if hasattr(module, name): |
|
|
self.namespace[name] = getattr(module, name) |
|
|
else: |
|
|
|
|
|
for name in potential_names: |
|
|
if name in available_symbols and hasattr(module, name): |
|
|
self.namespace[name] = getattr(module, name) |
|
|
else: |
|
|
violations.append(f"Function or class '{name}' not found in {module_name}") |
|
|
|
|
|
except Exception: |
|
|
exc_type, exc_value, exc_tb = sys.exc_info() |
|
|
return ["".join(traceback.format_exception(exc_type, exc_value, exc_tb))] |
|
|
|
|
|
|
|
|
return violations |
|
|
|
|
|
def _check_project(self, module: Union[ast.Import, ast.ImportFrom]) -> List[str]: |
|
|
"""Checks and imports a local project module while ensuring safety. |
|
|
|
|
|
Args: |
|
|
module (Union[ast.Import, ast.ImportFrom]): The AST import node representing the module. |
|
|
|
|
|
Returns: |
|
|
List[str]: A list of violations found during analysis. |
|
|
""" |
|
|
|
|
|
if isinstance(module, ast.Import): |
|
|
module_name = module.name |
|
|
potential_names = None |
|
|
else: |
|
|
module_name = module.module |
|
|
potential_names = {name.name for name in module.names} |
|
|
|
|
|
|
|
|
if len(module_name.split(".")) > 1: |
|
|
module_path = os.path.join(self.project_path, *module_name.split(".")) + ".py" |
|
|
else: |
|
|
module_path = os.path.join(self.project_path, module_name + ".py") |
|
|
|
|
|
|
|
|
if os.path.exists(module_path): |
|
|
violations = self._extract_definitions(module_name, module_path, potential_names) |
|
|
else: |
|
|
return [f"Module not found: {module_name}"] |
|
|
|
|
|
if violations: |
|
|
return violations |
|
|
|
|
|
|
|
|
try: |
|
|
module_spec = importlib.util.spec_from_file_location(module_name, module_path) |
|
|
loaded_module = importlib.util.module_from_spec(module_spec) |
|
|
module_spec.loader.exec_module(loaded_module) |
|
|
|
|
|
|
|
|
self.namespace[module_name] = loaded_module |
|
|
|
|
|
except Exception: |
|
|
exc_type, exc_value, exc_tb = sys.exc_info() |
|
|
return ["".join(traceback.format_exception(exc_type, exc_value, exc_tb))] |
|
|
|
|
|
return violations |
|
|
|
|
|
def _execute_import(self, import_module: ast.Import) -> List[str]: |
|
|
"""Processes an import statement, verifying permissions and adding modules to the namespace. |
|
|
|
|
|
Args: |
|
|
import_module (ast.Import): The AST node representing an import statement. |
|
|
|
|
|
Returns: |
|
|
List[str]: A list of violations found during import handling. |
|
|
""" |
|
|
violations = [] |
|
|
|
|
|
for module in import_module.names: |
|
|
|
|
|
if module.name.split(".")[0] in self.directory_names: |
|
|
violations += self._check_project(module) |
|
|
continue |
|
|
|
|
|
|
|
|
if module.name not in self.allowed_imports: |
|
|
violations.append(f"Unauthorized import: {module.name}") |
|
|
return violations |
|
|
|
|
|
|
|
|
try: |
|
|
alias = module.asname or module.name |
|
|
imported_module = importlib.import_module(module.name) |
|
|
self.namespace[alias] = imported_module |
|
|
except ImportError: |
|
|
exc_type, exc_value, exc_tb = sys.exc_info() |
|
|
violations.append("".join(traceback.format_exception(exc_type, exc_value, exc_tb))) |
|
|
|
|
|
return violations |
|
|
|
|
|
def _execute_import_from(self, import_from: ast.ImportFrom) -> List[str]: |
|
|
"""Processes a 'from module import name' statement, ensuring safety and adding modules to the namespace. |
|
|
|
|
|
Args: |
|
|
import_from (ast.ImportFrom): The AST node representing an 'import from' statement. |
|
|
|
|
|
Returns: |
|
|
List[str]: A list of violations found during import handling. |
|
|
""" |
|
|
|
|
|
if import_from.module is None: |
|
|
return ["'from . import' is not supported."] |
|
|
|
|
|
|
|
|
if import_from.module.split(".")[0] in self.directory_names: |
|
|
return self._check_project(import_from) |
|
|
|
|
|
|
|
|
if import_from.module not in self.allowed_imports: |
|
|
return [f"Unauthorized import: {import_from.module}"] |
|
|
|
|
|
try: |
|
|
|
|
|
for import_name in import_from.names: |
|
|
imported_module = importlib.import_module(import_from.module) |
|
|
alias = import_name.asname or import_name.name |
|
|
self.namespace[alias] = getattr(imported_module, import_name.name) |
|
|
return [] |
|
|
except ImportError: |
|
|
exc_type, exc_value, exc_tb = sys.exc_info() |
|
|
return ["".join(traceback.format_exception(exc_type, exc_value, exc_tb))] |
|
|
|
|
|
def _analyze_code(self, code: str) -> List[str]: |
|
|
"""Parses and analyzes the code for import violations before execution. |
|
|
|
|
|
Args: |
|
|
code (str): The raw Python code to analyze. |
|
|
|
|
|
Returns: |
|
|
List[str]: A list of violations detected in the code. |
|
|
""" |
|
|
violations = [] |
|
|
|
|
|
try: |
|
|
|
|
|
tree = ast.parse(code) |
|
|
|
|
|
|
|
|
for node in ast.walk(tree): |
|
|
if isinstance(node, ast.Import): |
|
|
violations += self._execute_import(node) |
|
|
elif isinstance(node, ast.ImportFrom): |
|
|
violations += self._execute_import_from(node) |
|
|
except SyntaxError: |
|
|
exc_type, exc_value, exc_tb = sys.exc_info() |
|
|
violations.append("".join(traceback.format_exception(exc_type, exc_value, exc_tb))) |
|
|
|
|
|
return violations |
|
|
|
|
|
def execute(self, code: str, language: str = "python") -> str: |
|
|
""" |
|
|
Analyzes and executes the provided Python code in a controlled environment. |
|
|
|
|
|
NOTE: This method only returns content printed to stdout during execution. |
|
|
It does not return any values from the code itself. To see results, use |
|
|
print statements in your code. |
|
|
|
|
|
WARNING: This method uses Python's exec() function internally, which executes |
|
|
code with full privileges. While safety checks are performed, there is still |
|
|
a security risk. Do not use with untrusted code. |
|
|
|
|
|
Args: |
|
|
code (str): The Python code to execute. |
|
|
language (str, optional): The programming language of the code. Defaults to "python". |
|
|
|
|
|
Returns: |
|
|
str: The output of the executed code (printed content only), or a list of violations if found. |
|
|
""" |
|
|
|
|
|
if language.lower() != "python": |
|
|
return f"Error: This interpreter only supports Python language. Received: {language}" |
|
|
|
|
|
self.visited_modules = {} |
|
|
self.namespace = {} |
|
|
|
|
|
|
|
|
if not self.project_path: |
|
|
raise ValueError("Project path (project_path) is not set") |
|
|
|
|
|
if not os.path.exists(self.project_path): |
|
|
raise ValueError(f"Project path '{self.project_path}' does not exist") |
|
|
|
|
|
if not os.path.isdir(self.project_path): |
|
|
raise ValueError(f"Project path '{self.project_path}' is not a directory") |
|
|
|
|
|
os.chdir(self.project_path) |
|
|
sys.path.insert(0, self.project_path) |
|
|
|
|
|
if self.allowed_imports: |
|
|
violations = self._analyze_code(code) |
|
|
if violations: |
|
|
return"\n".join(violations) |
|
|
|
|
|
|
|
|
|
|
|
stdout_capture = io.StringIO() |
|
|
with contextlib.redirect_stdout(stdout_capture): |
|
|
try: |
|
|
|
|
|
exec(code, {"__builtins__": __builtins__}) |
|
|
except Exception: |
|
|
exc_type, exc_value, exc_tb = sys.exc_info() |
|
|
error_msg = "".join(traceback.format_exception(exc_type, exc_value, exc_tb)) |
|
|
return error_msg |
|
|
|
|
|
|
|
|
return stdout_capture.getvalue().strip() |
|
|
|
|
|
def execute_script(self, file_path: str, language: str = "python") -> str: |
|
|
""" |
|
|
Reads Python code from a file and executes it using the `execute` method. |
|
|
|
|
|
NOTE: This method only returns content printed to stdout during execution. |
|
|
It does not return any values from the code itself. To see results, use |
|
|
print statements in your code. |
|
|
|
|
|
WARNING: This method uses Python's exec() function internally, which executes |
|
|
code with full privileges. While safety checks are performed, there is still |
|
|
a security risk. Do not use with untrusted code. |
|
|
|
|
|
Args: |
|
|
file_path (str): The path to the Python file to be executed. |
|
|
language (str, optional): The programming language of the code. Defaults to "python". |
|
|
|
|
|
Returns: |
|
|
str: The output of the executed code (printed content only), or an error message if the execution fails. |
|
|
""" |
|
|
|
|
|
|
|
|
result = self.storage_handler.read(file_path) |
|
|
if result["success"]: |
|
|
code = result["content"] |
|
|
else: |
|
|
return f"Error: Could not read file '{file_path}': {result.get('error', 'Unknown error')}" |
|
|
|
|
|
return self.execute(code, language) |
|
|
|
|
|
|
|
|
class PythonExecuteTool(Tool): |
|
|
name: str = "python_execute" |
|
|
description: str = "Execute Python code in a controlled environment with safety checks" |
|
|
inputs: Dict[str, Dict[str, str]] = { |
|
|
"code": { |
|
|
"type": "string", |
|
|
"description": "The Python code to execute" |
|
|
}, |
|
|
"language": { |
|
|
"type": "string", |
|
|
"description": "The programming language of the code (only 'python' is supported)" |
|
|
} |
|
|
} |
|
|
required: Optional[List[str]] = ["code"] |
|
|
|
|
|
def __init__(self, python_interpreter: PythonInterpreter = None): |
|
|
super().__init__() |
|
|
self.python_interpreter = python_interpreter |
|
|
|
|
|
def __call__(self, code: str, language: str = "python") -> str: |
|
|
"""Execute Python code using the Python interpreter.""" |
|
|
if not self.python_interpreter: |
|
|
raise RuntimeError("Python interpreter not initialized") |
|
|
|
|
|
try: |
|
|
return self.python_interpreter.execute(code, language) |
|
|
except Exception as e: |
|
|
return f"Error executing code: {str(e)}" |
|
|
|
|
|
|
|
|
class PythonExecuteScriptTool(Tool): |
|
|
name: str = "python_execute_script" |
|
|
description: str = "Execute Python code from a file in a controlled environment with safety checks" |
|
|
inputs: Dict[str, Dict[str, str]] = { |
|
|
"file_path": { |
|
|
"type": "string", |
|
|
"description": "The path to the Python file to be executed" |
|
|
}, |
|
|
"language": { |
|
|
"type": "string", |
|
|
"description": "The programming language of the code (only 'python' is supported)" |
|
|
} |
|
|
} |
|
|
required: Optional[List[str]] = ["file_path"] |
|
|
|
|
|
def __init__(self, python_interpreter: PythonInterpreter = None): |
|
|
super().__init__() |
|
|
self.python_interpreter = python_interpreter |
|
|
|
|
|
def __call__(self, file_path: str, language: str = "python") -> str: |
|
|
"""Execute Python script file using the Python interpreter.""" |
|
|
if not self.python_interpreter: |
|
|
raise RuntimeError("Python interpreter not initialized") |
|
|
|
|
|
try: |
|
|
return self.python_interpreter.execute_script(file_path, language) |
|
|
except Exception as e: |
|
|
return f"Error executing script: {str(e)}" |
|
|
|
|
|
|
|
|
class PythonInterpreterToolkit(Toolkit): |
|
|
def __init__( |
|
|
self, |
|
|
name: str = "PythonInterpreterToolkit", |
|
|
project_path: Optional[str] = ".", |
|
|
directory_names: Optional[List[str]] = None, |
|
|
allowed_imports: Optional[Set[str]] = None, |
|
|
storage_handler: FileStorageHandler = None, |
|
|
**kwargs |
|
|
): |
|
|
|
|
|
if storage_handler is None: |
|
|
from .storage_handler import LocalStorageHandler |
|
|
storage_handler = LocalStorageHandler(base_path="./workplace/python") |
|
|
|
|
|
|
|
|
python_interpreter = PythonInterpreter( |
|
|
name="PythonInterpreter", |
|
|
project_path=project_path, |
|
|
directory_names=directory_names or [], |
|
|
allowed_imports=allowed_imports, |
|
|
storage_handler=storage_handler, |
|
|
**kwargs |
|
|
) |
|
|
|
|
|
|
|
|
tools = [ |
|
|
PythonExecuteTool(python_interpreter=python_interpreter), |
|
|
PythonExecuteScriptTool(python_interpreter=python_interpreter) |
|
|
] |
|
|
|
|
|
|
|
|
super().__init__(name=name, tools=tools) |
|
|
|
|
|
|
|
|
self.python_interpreter = python_interpreter |
|
|
|
|
|
|
|
|
|
|
|
|