File size: 4,195 Bytes
67d287e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""
Code execution tools for the GAIA Agent.
Includes Python, JavaScript, and Bash executors with sandboxed execution.
"""

import os
import subprocess
import tempfile
import shutil
from langchain_core.tools import tool


@tool
def python_executor(code: str) -> str:
    """Execute Python code and return the output.
    Use this for calculations, data processing, string manipulation, or any computation.
    The code should print() any results you want to see.
    
    Args:
        code: Python code to execute
    """
    try:
        with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, encoding='utf-8') as f:
            f.write(code)
            temp_path = f.name
        
        result = subprocess.run(
            ['python', temp_path],
            capture_output=True,
            text=True,
            timeout=60
        )
        
        os.unlink(temp_path)
        
        output = result.stdout.strip()
        if result.stderr:
            output += f"\nStderr: {result.stderr}"
        return output or "Code executed with no output."
    except subprocess.TimeoutExpired:
        if 'temp_path' in locals():
            os.unlink(temp_path)
        return "Execution timed out (60s limit)"
    except Exception as e:
        if 'temp_path' in locals() and os.path.exists(temp_path):
            os.unlink(temp_path)
        return f"Execution error: {str(e)}"


@tool
def javascript_executor(code: str) -> str:
    """Execute JavaScript code using Node.js and return the output.
    Use this for JSON processing, string manipulation, or JavaScript-specific operations.
    Use console.log() to output results.
    
    Args:
        code: JavaScript code to execute
    """
    # Check if Node.js is available
    if shutil.which("node") is None:
        return "Error: Node.js is not installed or not in PATH. Please install Node.js to use this tool."
    
    try:
        with tempfile.NamedTemporaryFile(mode='w', suffix='.js', delete=False, encoding='utf-8') as f:
            f.write(code)
            temp_path = f.name
        
        result = subprocess.run(
            ['node', temp_path],
            capture_output=True,
            text=True,
            timeout=60
        )
        
        os.unlink(temp_path)
        
        output = result.stdout.strip()
        if result.stderr:
            output += f"\nStderr: {result.stderr}"
        return output or "Code executed with no output."
    except subprocess.TimeoutExpired:
        if 'temp_path' in locals():
            os.unlink(temp_path)
        return "Execution timed out (60s limit)"
    except Exception as e:
        if 'temp_path' in locals() and os.path.exists(temp_path):
            os.unlink(temp_path)
        return f"Execution error: {str(e)}"


@tool
def bash_executor(command: str) -> str:
    """Execute a Bash/Shell command and return the output.
    Use this for file operations, text processing with sed/awk/grep, or system commands.
    
    On Windows, this uses PowerShell. On Unix/Linux/Mac, this uses bash.
    
    Args:
        command: Shell command to execute
    """
    try:
        # Determine the shell to use based on the OS
        import platform
        system = platform.system().lower()
        
        if system == "windows":
            # Use PowerShell on Windows
            shell_cmd = ["powershell", "-NoProfile", "-NonInteractive", "-Command", command]
        else:
            # Use bash on Unix-like systems
            shell_cmd = ["bash", "-c", command]
        
        result = subprocess.run(
            shell_cmd,
            capture_output=True,
            text=True,
            timeout=60,
            cwd=tempfile.gettempdir()  # Run in temp directory for safety
        )
        
        output = result.stdout.strip()
        if result.stderr:
            output += f"\nStderr: {result.stderr}"
        if result.returncode != 0:
            output += f"\nExit code: {result.returncode}"
        return output or "Command executed with no output."
    except subprocess.TimeoutExpired:
        return "Execution timed out (60s limit)"
    except Exception as e:
        return f"Execution error: {str(e)}"