scider / scievo /tools /shell_tool.py
harry-lu-0708's picture
clean HF Space commit (no binary history)
0913c52
import os
import shutil
import subprocess
import tempfile
from pathlib import Path
from .registry import register_tool, register_toolset_desc
register_toolset_desc("shell", "Shell command execution toolset.")
def _detect_shell() -> str:
"""Detect available shell. Prefer bash if available, otherwise use sh."""
if shutil.which("bash"):
return "bash"
return "sh"
# Detect the available shell at module load time
_SHELL = _detect_shell()
@register_tool(
"shell",
{
"type": "function",
"function": {
"name": f"run_{_SHELL}_cmd",
"description": f"Execute a {_SHELL} command and return its output (stdout and stderr combined).",
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": f"The {_SHELL} command to execute",
},
"cwd": {
"type": "string",
"description": "Working directory for the command (optional)",
"default": None,
},
"timeout": {
"type": "integer",
"description": "Timeout in seconds (default 30)",
"default": 30,
},
},
"required": ["command"],
},
},
},
)
def run_shell_cmd(command: str, cwd: str | None = None, timeout: int = 30) -> str:
"""Execute a shell command and return its output."""
try:
working_dir = None
if cwd:
working_dir = Path(os.path.expandvars(cwd)).expanduser()
if not working_dir.exists():
return f"Error: Working directory '{cwd}' does not exist"
working_dir = str(working_dir)
result = subprocess.run(
[_SHELL, "-c", command],
capture_output=True,
text=True,
cwd=working_dir,
timeout=timeout,
)
output = []
if result.stdout:
output.append(f"STDOUT:\n{result.stdout}")
if result.stderr:
output.append(f"STDERR:\n{result.stderr}")
output.append(f"\nReturn code: {result.returncode}")
return "\n".join(output) if output else "Command executed with no output"
except subprocess.TimeoutExpired:
return f"Error: Command timed out after {timeout} seconds"
except Exception as e:
return f"Error executing command: {e}"
@register_tool(
"shell",
{
"type": "function",
"function": {
"name": f"run_{_SHELL}_script",
"description": f"Execute a {_SHELL} script from a string and return its output (stdout and stderr combined).",
"parameters": {
"type": "object",
"properties": {
"script": {
"type": "string",
"description": f"The {_SHELL} script content to execute",
},
"cwd": {
"type": "string",
"description": "Working directory for the script (optional)",
"default": None,
},
"timeout": {
"type": "integer",
"description": "Timeout in seconds (default 30)",
"default": 30,
},
},
"required": ["script"],
},
},
},
)
def run_shell_script(script: str, cwd: str | None = None, timeout: int = 30) -> str:
"""Execute a shell script and return its output."""
try:
working_dir = None
if cwd:
working_dir = Path(os.path.expandvars(cwd)).expanduser()
if not working_dir.exists():
return f"Error: Working directory '{cwd}' does not exist"
working_dir = str(working_dir)
# Create a temporary script file
with tempfile.NamedTemporaryFile(
mode="w",
suffix=f".{_SHELL}",
delete=False,
) as f:
# Add shebang
f.write(f"#!/usr/bin/{_SHELL}\n")
f.write(script)
script_path = f.name
try:
# Make the script executable
os.chmod(script_path, 0o755)
result = subprocess.run(
[_SHELL, script_path],
capture_output=True,
text=True,
cwd=working_dir,
timeout=timeout,
)
output = []
if result.stdout:
output.append(f"STDOUT:\n{result.stdout}")
if result.stderr:
output.append(f"STDERR:\n{result.stderr}")
output.append(f"\nReturn code: {result.returncode}")
return "\n".join(output) if output else "Script executed with no output"
finally:
# Clean up the temporary script file
try:
os.unlink(script_path)
except Exception:
pass
except subprocess.TimeoutExpired:
return f"Error: Script timed out after {timeout} seconds"
except Exception as e:
return f"Error executing script: {e}"