File size: 5,430 Bytes
0913c52
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import os
import shutil
import subprocess
import tempfile
from pathlib import Path

from .registry import register_tool, register_toolset_desc

register_toolset_desc("shell", "Shell command execution toolset.")


def _detect_shell() -> str:
    """Detect available shell. Prefer bash if available, otherwise use sh."""
    if shutil.which("bash"):
        return "bash"
    return "sh"


# Detect the available shell at module load time
_SHELL = _detect_shell()


@register_tool(
    "shell",
    {
        "type": "function",
        "function": {
            "name": f"run_{_SHELL}_cmd",
            "description": f"Execute a {_SHELL} command and return its output (stdout and stderr combined).",
            "parameters": {
                "type": "object",
                "properties": {
                    "command": {
                        "type": "string",
                        "description": f"The {_SHELL} command to execute",
                    },
                    "cwd": {
                        "type": "string",
                        "description": "Working directory for the command (optional)",
                        "default": None,
                    },
                    "timeout": {
                        "type": "integer",
                        "description": "Timeout in seconds (default 30)",
                        "default": 30,
                    },
                },
                "required": ["command"],
            },
        },
    },
)
def run_shell_cmd(command: str, cwd: str | None = None, timeout: int = 30) -> str:
    """Execute a shell command and return its output."""
    try:
        working_dir = None
        if cwd:
            working_dir = Path(os.path.expandvars(cwd)).expanduser()
            if not working_dir.exists():
                return f"Error: Working directory '{cwd}' does not exist"
            working_dir = str(working_dir)

        result = subprocess.run(
            [_SHELL, "-c", command],
            capture_output=True,
            text=True,
            cwd=working_dir,
            timeout=timeout,
        )

        output = []
        if result.stdout:
            output.append(f"STDOUT:\n{result.stdout}")
        if result.stderr:
            output.append(f"STDERR:\n{result.stderr}")

        output.append(f"\nReturn code: {result.returncode}")

        return "\n".join(output) if output else "Command executed with no output"

    except subprocess.TimeoutExpired:
        return f"Error: Command timed out after {timeout} seconds"
    except Exception as e:
        return f"Error executing command: {e}"


@register_tool(
    "shell",
    {
        "type": "function",
        "function": {
            "name": f"run_{_SHELL}_script",
            "description": f"Execute a {_SHELL} script from a string and return its output (stdout and stderr combined).",
            "parameters": {
                "type": "object",
                "properties": {
                    "script": {
                        "type": "string",
                        "description": f"The {_SHELL} script content to execute",
                    },
                    "cwd": {
                        "type": "string",
                        "description": "Working directory for the script (optional)",
                        "default": None,
                    },
                    "timeout": {
                        "type": "integer",
                        "description": "Timeout in seconds (default 30)",
                        "default": 30,
                    },
                },
                "required": ["script"],
            },
        },
    },
)
def run_shell_script(script: str, cwd: str | None = None, timeout: int = 30) -> str:
    """Execute a shell script and return its output."""
    try:
        working_dir = None
        if cwd:
            working_dir = Path(os.path.expandvars(cwd)).expanduser()
            if not working_dir.exists():
                return f"Error: Working directory '{cwd}' does not exist"
            working_dir = str(working_dir)

        # Create a temporary script file
        with tempfile.NamedTemporaryFile(
            mode="w",
            suffix=f".{_SHELL}",
            delete=False,
        ) as f:
            # Add shebang
            f.write(f"#!/usr/bin/{_SHELL}\n")
            f.write(script)
            script_path = f.name

        try:
            # Make the script executable
            os.chmod(script_path, 0o755)

            result = subprocess.run(
                [_SHELL, script_path],
                capture_output=True,
                text=True,
                cwd=working_dir,
                timeout=timeout,
            )

            output = []
            if result.stdout:
                output.append(f"STDOUT:\n{result.stdout}")
            if result.stderr:
                output.append(f"STDERR:\n{result.stderr}")

            output.append(f"\nReturn code: {result.returncode}")

            return "\n".join(output) if output else "Script executed with no output"

        finally:
            # Clean up the temporary script file
            try:
                os.unlink(script_path)
            except Exception:
                pass

    except subprocess.TimeoutExpired:
        return f"Error: Script timed out after {timeout} seconds"
    except Exception as e:
        return f"Error executing script: {e}"