| | import subprocess |
| | import sys |
| | from typing import Optional |
| |
|
| | from langchain.tools import tool |
| | from langchain_experimental.tools import PythonREPLTool |
| |
|
| |
|
| | @tool |
| | def execute_python_code(intent: str, code: Optional[str] = None, file_reference: Optional[str] = None) -> str: |
| | """ |
| | Executes the provided python code snippet or python file identified by its reference and returns the outcome of |
| | the execution |
| | :param intent: this parameter should be set to either code_snippet or file_execution depending on |
| | the intent of the user |
| | :param code: if the intent is code_snippet, this parameter should be populated with the |
| | python code snippet to be executed |
| | :param file_reference: if the intent is file_execution, this parameter should |
| | be populated with the reference of the file to be executed |
| | :return: the outcome of the python code execution |
| | """ |
| | if intent == "code_snippet": |
| | python_tool = PythonREPLTool() |
| | if not code: |
| | raise Exception("Invalid arguments. Tool intent is code_snippet but no value provided for code argument") |
| | return python_tool.run(code, verbose=True) |
| |
|
| | if intent == "file_execution": |
| | return subprocess_python_exec(file_reference) |
| | raise Exception("Invalid arguments. Invalid value for intent parameter") |
| |
|
| |
|
| | def subprocess_python_exec(file_reference: str) -> str: |
| | """Execute Python code in a subprocess for better isolation""" |
| | try: |
| |
|
| | |
| | result = subprocess.run( |
| | [sys.executable, file_reference], |
| | capture_output=True, |
| | text=True, |
| | timeout=60 |
| | ) |
| |
|
| | if result.returncode == 0: |
| | return result.stdout if result.stdout else "Code executed successfully" |
| | else: |
| | return f"Error: {result.stderr}" |
| | except subprocess.TimeoutExpired: |
| | return "Error: Code execution timed out" |
| | except Exception as e: |
| | return f"Error: {str(e)}" |
| |
|