""" Terminal/Bash Execution Tool - Execute shell commands with safety constraints Language support: bash, sh, zsh, powershell (OS-dependent) """ import asyncio import logging import os import shlex from dataclasses import dataclass from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) # Whitelist of allowed commands for security ALLOWED_COMMANDS = { "npm": {"allowed_args": ["install", "run", "build", "test", "start", "init", "update", "outdated"]}, "yarn": {"allowed_args": ["install", "run", "build", "test", "start", "init"]}, "pip": {"allowed_args": ["install", "uninstall", "list", "show", "freeze", "requirements"]}, "pip3": {"allowed_args": ["install", "uninstall", "list", "show", "freeze", "requirements"]}, "python": {"allowed_args": ["-m", "-c", "-v", "--version"]}, "python3": {"allowed_args": ["-m", "-c", "-v", "--version"]}, "git": {"allowed_args": ["clone", "commit", "push", "pull", "status", "log", "branch", "checkout", "add", "diff", "merge", "fetch", "remote"]}, "docker": {"allowed_args": ["build", "run", "ps", "stop", "start", "rm", "images", "pull", "push"]}, "curl": {"allowed_args": []}, # Allow curl for API requests and downloads "wget": {"allowed_args": []}, "node": {"allowed_args": ["-v", "--version"]}, "cat": {"allowed_args": []}, "grep": {"allowed_args": ["-r", "-n", "-i", "-v", "-E"]}, "find": {"allowed_args": ["-name", "-type", "-size", "-mtime"]}, "ls": {"allowed_args": ["-la", "-l", "-a", "-h"]}, "cd": {"allowed_args": []}, "mkdir": {"allowed_args": ["-p"]}, "cp": {"allowed_args": ["-r", "-f"]}, "mv": {"allowed_args": []}, "echo": {"allowed_args": []}, "env": {"allowed_args": []}, "which": {"allowed_args": []}, "pwd": {"allowed_args": []}, "touch": {"allowed_args": []}, "head": {"allowed_args": ["-n"]}, "tail": {"allowed_args": ["-n", "-f"]}, "sort": {"allowed_args": []}, "uniq": {"allowed_args": []}, "wc": {"allowed_args": ["-l", "-w", "-c"]}, "tar": {"allowed_args": ["-czf", "-xzf", "-cvf", "-xvf"]}, "zip": {"allowed_args": ["-r"]}, "unzip": {"allowed_args": []}, } # Commands that are completely disabled DISABLED_COMMANDS = { "sudo", "su", "rm", "rmdir", "mkfs", "fdisk", "dd", "format", "shutdown", "reboot", "halt", "poweroff", "kill", "pkill", "chmod", "chown", "chgrp", "passwd", "useradd", "userdel", } @dataclass class TerminalResult: """Result of a terminal command execution""" exit_code: int stdout: str stderr: str success: bool command: str duration_ms: float class TerminalExecutor: """ Execute shell commands with safety constraints. """ def __init__(self, cwd: Optional[str] = None, env_vars: Optional[Dict[str, str]] = None): self.cwd = cwd or os.getcwd() self.env_vars = {**os.environ, **(env_vars or {})} def _validate_command(self, command: str) -> tuple[bool, Optional[str]]: """ Validate that the command is safe to execute. Returns (is_valid, error_message) """ # Check for dangerous patterns dangerous_patterns = [ ";", "&&", "||", "|", ">", "<", "`", "$()", ] # Split command to get base command parts = shlex.split(command) if not parts: return False, "Empty command" base_cmd = parts[0] # Check if command is disabled if base_cmd in DISABLED_COMMANDS: return False, f"Command '{base_cmd}' is disabled for security reasons" # Check if command is in whitelist if base_cmd not in ALLOWED_COMMANDS: return False, f"Command '{base_cmd}' is not in the allowed list" # Check for shell operators that could be dangerous for pattern in dangerous_patterns: if pattern in command and pattern not in [">>", "<<"]: # Allow some safe redirects but warn about others if pattern in ["|", ";", "&&", "||"]: return False, f"Shell operators like '{pattern}' are not allowed for security" return True, None async def execute_command( self, command: str, timeout: int = 30, sandbox: bool = True ) -> TerminalResult: """ Execute terminal command with: - Timeout protection (prevent hanging) - Output streaming (see results in real-time) - Error capture (stderr + exit codes) - Working directory context - Environment variable injection """ start_time = asyncio.get_event_loop().time() # Validate command if sandbox mode if sandbox: is_valid, error_msg = self._validate_command(command) if not is_valid: return TerminalResult( exit_code=-1, stdout="", stderr=error_msg, success=False, command=command, duration_ms=0 ) try: # Execute with streaming output process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=self.cwd, env=self.env_vars ) try: stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=timeout ) duration_ms = (asyncio.get_event_loop().time() - start_time) * 1000 return TerminalResult( exit_code=process.returncode, stdout=stdout.decode('utf-8', errors='replace'), stderr=stderr.decode('utf-8', errors='replace'), success=process.returncode == 0, command=command, duration_ms=duration_ms ) except asyncio.TimeoutError: process.kill() await process.wait() duration_ms = (asyncio.get_event_loop().time() - start_time) * 1000 return TerminalResult( exit_code=-1, stdout="", stderr=f"Command timeout after {timeout}s", success=False, command=command, duration_ms=duration_ms ) except Exception as e: duration_ms = (asyncio.get_event_loop().time() - start_time) * 1000 return TerminalResult( exit_code=-1, stdout="", stderr=f"Execution error: {str(e)}", success=False, command=command, duration_ms=duration_ms ) async def stream_command( self, command: str, timeout: int = 30 ): """ Stream output line-by-line as it executes. Yields (line_type, line_content) tuples. """ is_valid, error_msg = self._validate_command(command) if not is_valid: yield ("stderr", error_msg) yield ("exit_code", -1) return try: process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=self.cwd, env=self.env_vars ) async def read_stream(stream, line_type): while True: try: line = await asyncio.wait_for(stream.readline(), timeout=0.1) if not line: break yield (line_type, line.decode('utf-8', errors='replace').rstrip()) except asyncio.TimeoutError: continue # Read both stdout and stderr concurrently stdout_task = asyncio.create_task( self._collect_stream(process.stdout, "stdout") ) stderr_task = asyncio.create_task( self._collect_stream(process.stderr, "stderr") ) # Yield lines as they come pending = {stdout_task, stderr_task} while pending: done, pending = await asyncio.wait( pending, return_when=asyncio.FIRST_COMPLETED ) for task in done: for line_type, line in task.result(): yield (line_type, line) # Wait for process to complete with timeout try: await asyncio.wait_for(process.wait(), timeout=timeout) yield ("exit_code", process.returncode) except asyncio.TimeoutError: process.kill() yield ("stderr", f"Command timeout after {timeout}s") yield ("exit_code", -1) except Exception as e: yield ("stderr", f"Execution error: {str(e)}") yield ("exit_code", -1) async def _collect_stream(self, stream, line_type): """Helper to collect stream output""" lines = [] while True: line = await stream.readline() if not line: break lines.append((line_type, line.decode('utf-8', errors='replace').rstrip())) return lines # Tool spec for LLM TERMINAL_TOOL_SPEC = { "name": "execute_terminal", "description": ( "Execute bash/shell commands with safety constraints. " "Use this to: run npm/yarn commands, git operations, docker commands, " "file operations (ls, cat, grep, find), and system utilities. " "Commands are validated against a whitelist for security. " "Examples: 'npm install', 'git status', 'docker ps', 'ls -la', 'cat file.txt'" ), "parameters": { "type": "object", "properties": { "command": { "type": "string", "description": "The shell command to execute. Must be a whitelisted command.", }, "timeout": { "type": "number", "description": "Maximum execution time in seconds (default: 30, max: 300)", }, }, "required": ["command"], }, } async def execute_terminal_handler(arguments: Dict[str, Any]) -> tuple[str, bool]: """Handler for terminal command execution.""" try: command = arguments.get("command", "").strip() timeout = min(arguments.get("timeout", 30), 300) # Max 5 minutes if not command: return "Error: Command is required", False logger.info(f"Executing terminal command: {command}") executor = TerminalExecutor() result = await executor.execute_command(command, timeout=timeout) # Format output output_lines = [ f"$ {result.command}", "", ] if result.stdout: output_lines.append(result.stdout) if result.stderr: output_lines.append(f"[stderr] {result.stderr}") output_lines.append("") output_lines.append(f"Exit code: {result.exit_code}") output_lines.append(f"Duration: {result.duration_ms:.0f}ms") output = "\n".join(output_lines) if result.success: return f"✅ Command executed successfully\n{output}", True else: return f"❌ Command failed\n{output}", False except Exception as e: logger.error(f"Terminal execution error: {e}") return f"❌ Error executing command: {str(e)}", False