File size: 1,966 Bytes
778116a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
86368de
778116a
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import subprocess
import sys
from typing import Optional

from langchain.tools import tool
from langchain_experimental.tools import PythonREPLTool


@tool
def execute_python_code(intent: str, code: Optional[str] = None, file_reference: Optional[str] = None) -> str:
    """
    Executes the provided python code snippet or python file identified by its reference and returns the outcome of
    the execution
    :param intent: this parameter should be set to either code_snippet or file_execution depending on
    the intent of the user
    :param code: if the intent is code_snippet, this parameter should be populated with the
    python code snippet to be executed
    :param file_reference: if the intent is file_execution, this parameter should
    be populated with the reference of the file to be executed
    :return: the outcome of the python code execution
    """
    if intent == "code_snippet":
        python_tool = PythonREPLTool()
        if not code:
            raise Exception("Invalid arguments. Tool intent is code_snippet but no value provided for code argument")
        return python_tool.run(code, verbose=True)

    if intent == "file_execution":
        return subprocess_python_exec(file_reference)
    raise Exception("Invalid arguments. Invalid value for intent parameter")


def subprocess_python_exec(file_reference: str) -> str:
    """Execute Python code in a subprocess for better isolation"""
    try:

        # Execute in subprocess
        result = subprocess.run(
            [sys.executable, file_reference],
            capture_output=True,
            text=True,
            timeout=90
        )

        if result.returncode == 0:
            return result.stdout if result.stdout else "Code executed successfully"
        else:
            return f"Error: {result.stderr}"
    except subprocess.TimeoutExpired:
        return "Error: Code execution timed out"
    except Exception as e:
        return f"Error: {str(e)}"