water3 / agent /tools /terminal_tool.py
onewayto's picture
Upload 187 files
070daf8 verified
"""
Terminal/Bash Execution Tool - Execute shell commands with safety constraints
Language support: bash, sh, zsh, powershell (OS-dependent)
"""
import asyncio
import logging
import os
import shlex
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
# Whitelist of allowed commands for security
ALLOWED_COMMANDS = {
"npm": {"allowed_args": ["install", "run", "build", "test", "start", "init", "update", "outdated"]},
"yarn": {"allowed_args": ["install", "run", "build", "test", "start", "init"]},
"pip": {"allowed_args": ["install", "uninstall", "list", "show", "freeze", "requirements"]},
"pip3": {"allowed_args": ["install", "uninstall", "list", "show", "freeze", "requirements"]},
"python": {"allowed_args": ["-m", "-c", "-v", "--version"]},
"python3": {"allowed_args": ["-m", "-c", "-v", "--version"]},
"git": {"allowed_args": ["clone", "commit", "push", "pull", "status", "log", "branch", "checkout", "add", "diff", "merge", "fetch", "remote"]},
"docker": {"allowed_args": ["build", "run", "ps", "stop", "start", "rm", "images", "pull", "push"]},
"curl": {"allowed_args": []}, # Allow curl for API requests and downloads
"wget": {"allowed_args": []},
"node": {"allowed_args": ["-v", "--version"]},
"cat": {"allowed_args": []},
"grep": {"allowed_args": ["-r", "-n", "-i", "-v", "-E"]},
"find": {"allowed_args": ["-name", "-type", "-size", "-mtime"]},
"ls": {"allowed_args": ["-la", "-l", "-a", "-h"]},
"cd": {"allowed_args": []},
"mkdir": {"allowed_args": ["-p"]},
"cp": {"allowed_args": ["-r", "-f"]},
"mv": {"allowed_args": []},
"echo": {"allowed_args": []},
"env": {"allowed_args": []},
"which": {"allowed_args": []},
"pwd": {"allowed_args": []},
"touch": {"allowed_args": []},
"head": {"allowed_args": ["-n"]},
"tail": {"allowed_args": ["-n", "-f"]},
"sort": {"allowed_args": []},
"uniq": {"allowed_args": []},
"wc": {"allowed_args": ["-l", "-w", "-c"]},
"tar": {"allowed_args": ["-czf", "-xzf", "-cvf", "-xvf"]},
"zip": {"allowed_args": ["-r"]},
"unzip": {"allowed_args": []},
}
# Commands that are completely disabled
DISABLED_COMMANDS = {
"sudo", "su", "rm", "rmdir", "mkfs", "fdisk", "dd", "format",
"shutdown", "reboot", "halt", "poweroff", "kill", "pkill",
"chmod", "chown", "chgrp", "passwd", "useradd", "userdel",
}
@dataclass
class TerminalResult:
"""Result of a terminal command execution"""
exit_code: int
stdout: str
stderr: str
success: bool
command: str
duration_ms: float
class TerminalExecutor:
"""
Execute shell commands with safety constraints.
"""
def __init__(self, cwd: Optional[str] = None, env_vars: Optional[Dict[str, str]] = None):
self.cwd = cwd or os.getcwd()
self.env_vars = {**os.environ, **(env_vars or {})}
def _validate_command(self, command: str) -> tuple[bool, Optional[str]]:
"""
Validate that the command is safe to execute.
Returns (is_valid, error_message)
"""
# Check for dangerous patterns
dangerous_patterns = [
";", "&&", "||", "|", ">", "<", "`", "$()",
]
# Split command to get base command
parts = shlex.split(command)
if not parts:
return False, "Empty command"
base_cmd = parts[0]
# Check if command is disabled
if base_cmd in DISABLED_COMMANDS:
return False, f"Command '{base_cmd}' is disabled for security reasons"
# Check if command is in whitelist
if base_cmd not in ALLOWED_COMMANDS:
return False, f"Command '{base_cmd}' is not in the allowed list"
# Check for shell operators that could be dangerous
for pattern in dangerous_patterns:
if pattern in command and pattern not in [">>", "<<"]:
# Allow some safe redirects but warn about others
if pattern in ["|", ";", "&&", "||"]:
return False, f"Shell operators like '{pattern}' are not allowed for security"
return True, None
async def execute_command(
self,
command: str,
timeout: int = 30,
sandbox: bool = True
) -> TerminalResult:
"""
Execute terminal command with:
- Timeout protection (prevent hanging)
- Output streaming (see results in real-time)
- Error capture (stderr + exit codes)
- Working directory context
- Environment variable injection
"""
start_time = asyncio.get_event_loop().time()
# Validate command if sandbox mode
if sandbox:
is_valid, error_msg = self._validate_command(command)
if not is_valid:
return TerminalResult(
exit_code=-1,
stdout="",
stderr=error_msg,
success=False,
command=command,
duration_ms=0
)
try:
# Execute with streaming output
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=self.cwd,
env=self.env_vars
)
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout
)
duration_ms = (asyncio.get_event_loop().time() - start_time) * 1000
return TerminalResult(
exit_code=process.returncode,
stdout=stdout.decode('utf-8', errors='replace'),
stderr=stderr.decode('utf-8', errors='replace'),
success=process.returncode == 0,
command=command,
duration_ms=duration_ms
)
except asyncio.TimeoutError:
process.kill()
await process.wait()
duration_ms = (asyncio.get_event_loop().time() - start_time) * 1000
return TerminalResult(
exit_code=-1,
stdout="",
stderr=f"Command timeout after {timeout}s",
success=False,
command=command,
duration_ms=duration_ms
)
except Exception as e:
duration_ms = (asyncio.get_event_loop().time() - start_time) * 1000
return TerminalResult(
exit_code=-1,
stdout="",
stderr=f"Execution error: {str(e)}",
success=False,
command=command,
duration_ms=duration_ms
)
async def stream_command(
self,
command: str,
timeout: int = 30
):
"""
Stream output line-by-line as it executes.
Yields (line_type, line_content) tuples.
"""
is_valid, error_msg = self._validate_command(command)
if not is_valid:
yield ("stderr", error_msg)
yield ("exit_code", -1)
return
try:
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=self.cwd,
env=self.env_vars
)
async def read_stream(stream, line_type):
while True:
try:
line = await asyncio.wait_for(stream.readline(), timeout=0.1)
if not line:
break
yield (line_type, line.decode('utf-8', errors='replace').rstrip())
except asyncio.TimeoutError:
continue
# Read both stdout and stderr concurrently
stdout_task = asyncio.create_task(
self._collect_stream(process.stdout, "stdout")
)
stderr_task = asyncio.create_task(
self._collect_stream(process.stderr, "stderr")
)
# Yield lines as they come
pending = {stdout_task, stderr_task}
while pending:
done, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_COMPLETED
)
for task in done:
for line_type, line in task.result():
yield (line_type, line)
# Wait for process to complete with timeout
try:
await asyncio.wait_for(process.wait(), timeout=timeout)
yield ("exit_code", process.returncode)
except asyncio.TimeoutError:
process.kill()
yield ("stderr", f"Command timeout after {timeout}s")
yield ("exit_code", -1)
except Exception as e:
yield ("stderr", f"Execution error: {str(e)}")
yield ("exit_code", -1)
async def _collect_stream(self, stream, line_type):
"""Helper to collect stream output"""
lines = []
while True:
line = await stream.readline()
if not line:
break
lines.append((line_type, line.decode('utf-8', errors='replace').rstrip()))
return lines
# Tool spec for LLM
TERMINAL_TOOL_SPEC = {
"name": "execute_terminal",
"description": (
"Execute bash/shell commands with safety constraints. "
"Use this to: run npm/yarn commands, git operations, docker commands, "
"file operations (ls, cat, grep, find), and system utilities. "
"Commands are validated against a whitelist for security. "
"Examples: 'npm install', 'git status', 'docker ps', 'ls -la', 'cat file.txt'"
),
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The shell command to execute. Must be a whitelisted command.",
},
"timeout": {
"type": "number",
"description": "Maximum execution time in seconds (default: 30, max: 300)",
},
},
"required": ["command"],
},
}
async def execute_terminal_handler(arguments: Dict[str, Any]) -> tuple[str, bool]:
"""Handler for terminal command execution."""
try:
command = arguments.get("command", "").strip()
timeout = min(arguments.get("timeout", 30), 300) # Max 5 minutes
if not command:
return "Error: Command is required", False
logger.info(f"Executing terminal command: {command}")
executor = TerminalExecutor()
result = await executor.execute_command(command, timeout=timeout)
# Format output
output_lines = [
f"$ {result.command}",
"",
]
if result.stdout:
output_lines.append(result.stdout)
if result.stderr:
output_lines.append(f"[stderr] {result.stderr}")
output_lines.append("")
output_lines.append(f"Exit code: {result.exit_code}")
output_lines.append(f"Duration: {result.duration_ms:.0f}ms")
output = "\n".join(output_lines)
if result.success:
return f"✅ Command executed successfully\n{output}", True
else:
return f"❌ Command failed\n{output}", False
except Exception as e:
logger.error(f"Terminal execution error: {e}")
return f"❌ Error executing command: {str(e)}", False