| | """ |
| | Terminal/Bash Execution Tool - Execute shell commands with safety constraints |
| | |
| | Language support: bash, sh, zsh, powershell (OS-dependent) |
| | """ |
| |
|
| | import asyncio |
| | import logging |
| | import os |
| | import shlex |
| | from dataclasses import dataclass |
| | from typing import Any, Dict, List, Optional |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | ALLOWED_COMMANDS = { |
| | "npm": {"allowed_args": ["install", "run", "build", "test", "start", "init", "update", "outdated"]}, |
| | "yarn": {"allowed_args": ["install", "run", "build", "test", "start", "init"]}, |
| | "pip": {"allowed_args": ["install", "uninstall", "list", "show", "freeze", "requirements"]}, |
| | "pip3": {"allowed_args": ["install", "uninstall", "list", "show", "freeze", "requirements"]}, |
| | "python": {"allowed_args": ["-m", "-c", "-v", "--version"]}, |
| | "python3": {"allowed_args": ["-m", "-c", "-v", "--version"]}, |
| | "git": {"allowed_args": ["clone", "commit", "push", "pull", "status", "log", "branch", "checkout", "add", "diff", "merge", "fetch", "remote"]}, |
| | "docker": {"allowed_args": ["build", "run", "ps", "stop", "start", "rm", "images", "pull", "push"]}, |
| | "curl": {"allowed_args": []}, |
| | "wget": {"allowed_args": []}, |
| | "node": {"allowed_args": ["-v", "--version"]}, |
| | "cat": {"allowed_args": []}, |
| | "grep": {"allowed_args": ["-r", "-n", "-i", "-v", "-E"]}, |
| | "find": {"allowed_args": ["-name", "-type", "-size", "-mtime"]}, |
| | "ls": {"allowed_args": ["-la", "-l", "-a", "-h"]}, |
| | "cd": {"allowed_args": []}, |
| | "mkdir": {"allowed_args": ["-p"]}, |
| | "cp": {"allowed_args": ["-r", "-f"]}, |
| | "mv": {"allowed_args": []}, |
| | "echo": {"allowed_args": []}, |
| | "env": {"allowed_args": []}, |
| | "which": {"allowed_args": []}, |
| | "pwd": {"allowed_args": []}, |
| | "touch": {"allowed_args": []}, |
| | "head": {"allowed_args": ["-n"]}, |
| | "tail": {"allowed_args": ["-n", "-f"]}, |
| | "sort": {"allowed_args": []}, |
| | "uniq": {"allowed_args": []}, |
| | "wc": {"allowed_args": ["-l", "-w", "-c"]}, |
| | "tar": {"allowed_args": ["-czf", "-xzf", "-cvf", "-xvf"]}, |
| | "zip": {"allowed_args": ["-r"]}, |
| | "unzip": {"allowed_args": []}, |
| | } |
| |
|
| | |
| | DISABLED_COMMANDS = { |
| | "sudo", "su", "rm", "rmdir", "mkfs", "fdisk", "dd", "format", |
| | "shutdown", "reboot", "halt", "poweroff", "kill", "pkill", |
| | "chmod", "chown", "chgrp", "passwd", "useradd", "userdel", |
| | } |
| |
|
| |
|
| | @dataclass |
| | class TerminalResult: |
| | """Result of a terminal command execution""" |
| | exit_code: int |
| | stdout: str |
| | stderr: str |
| | success: bool |
| | command: str |
| | duration_ms: float |
| |
|
| |
|
| | class TerminalExecutor: |
| | """ |
| | Execute shell commands with safety constraints. |
| | """ |
| | |
| | def __init__(self, cwd: Optional[str] = None, env_vars: Optional[Dict[str, str]] = None): |
| | self.cwd = cwd or os.getcwd() |
| | self.env_vars = {**os.environ, **(env_vars or {})} |
| | |
| | def _validate_command(self, command: str) -> tuple[bool, Optional[str]]: |
| | """ |
| | Validate that the command is safe to execute. |
| | Returns (is_valid, error_message) |
| | """ |
| | |
| | dangerous_patterns = [ |
| | ";", "&&", "||", "|", ">", "<", "`", "$()", |
| | ] |
| | |
| | |
| | parts = shlex.split(command) |
| | if not parts: |
| | return False, "Empty command" |
| | |
| | base_cmd = parts[0] |
| | |
| | |
| | if base_cmd in DISABLED_COMMANDS: |
| | return False, f"Command '{base_cmd}' is disabled for security reasons" |
| | |
| | |
| | if base_cmd not in ALLOWED_COMMANDS: |
| | return False, f"Command '{base_cmd}' is not in the allowed list" |
| | |
| | |
| | for pattern in dangerous_patterns: |
| | if pattern in command and pattern not in [">>", "<<"]: |
| | |
| | if pattern in ["|", ";", "&&", "||"]: |
| | return False, f"Shell operators like '{pattern}' are not allowed for security" |
| | |
| | return True, None |
| | |
| | async def execute_command( |
| | self, |
| | command: str, |
| | timeout: int = 30, |
| | sandbox: bool = True |
| | ) -> TerminalResult: |
| | """ |
| | Execute terminal command with: |
| | - Timeout protection (prevent hanging) |
| | - Output streaming (see results in real-time) |
| | - Error capture (stderr + exit codes) |
| | - Working directory context |
| | - Environment variable injection |
| | """ |
| | start_time = asyncio.get_event_loop().time() |
| | |
| | |
| | if sandbox: |
| | is_valid, error_msg = self._validate_command(command) |
| | if not is_valid: |
| | return TerminalResult( |
| | exit_code=-1, |
| | stdout="", |
| | stderr=error_msg, |
| | success=False, |
| | command=command, |
| | duration_ms=0 |
| | ) |
| | |
| | try: |
| | |
| | process = await asyncio.create_subprocess_shell( |
| | command, |
| | stdout=asyncio.subprocess.PIPE, |
| | stderr=asyncio.subprocess.PIPE, |
| | cwd=self.cwd, |
| | env=self.env_vars |
| | ) |
| | |
| | try: |
| | stdout, stderr = await asyncio.wait_for( |
| | process.communicate(), |
| | timeout=timeout |
| | ) |
| | |
| | duration_ms = (asyncio.get_event_loop().time() - start_time) * 1000 |
| | |
| | return TerminalResult( |
| | exit_code=process.returncode, |
| | stdout=stdout.decode('utf-8', errors='replace'), |
| | stderr=stderr.decode('utf-8', errors='replace'), |
| | success=process.returncode == 0, |
| | command=command, |
| | duration_ms=duration_ms |
| | ) |
| | |
| | except asyncio.TimeoutError: |
| | process.kill() |
| | await process.wait() |
| | |
| | duration_ms = (asyncio.get_event_loop().time() - start_time) * 1000 |
| | |
| | return TerminalResult( |
| | exit_code=-1, |
| | stdout="", |
| | stderr=f"Command timeout after {timeout}s", |
| | success=False, |
| | command=command, |
| | duration_ms=duration_ms |
| | ) |
| | |
| | except Exception as e: |
| | duration_ms = (asyncio.get_event_loop().time() - start_time) * 1000 |
| | |
| | return TerminalResult( |
| | exit_code=-1, |
| | stdout="", |
| | stderr=f"Execution error: {str(e)}", |
| | success=False, |
| | command=command, |
| | duration_ms=duration_ms |
| | ) |
| | |
| | async def stream_command( |
| | self, |
| | command: str, |
| | timeout: int = 30 |
| | ): |
| | """ |
| | Stream output line-by-line as it executes. |
| | Yields (line_type, line_content) tuples. |
| | """ |
| | is_valid, error_msg = self._validate_command(command) |
| | if not is_valid: |
| | yield ("stderr", error_msg) |
| | yield ("exit_code", -1) |
| | return |
| | |
| | try: |
| | process = await asyncio.create_subprocess_shell( |
| | command, |
| | stdout=asyncio.subprocess.PIPE, |
| | stderr=asyncio.subprocess.PIPE, |
| | cwd=self.cwd, |
| | env=self.env_vars |
| | ) |
| | |
| | async def read_stream(stream, line_type): |
| | while True: |
| | try: |
| | line = await asyncio.wait_for(stream.readline(), timeout=0.1) |
| | if not line: |
| | break |
| | yield (line_type, line.decode('utf-8', errors='replace').rstrip()) |
| | except asyncio.TimeoutError: |
| | continue |
| | |
| | |
| | stdout_task = asyncio.create_task( |
| | self._collect_stream(process.stdout, "stdout") |
| | ) |
| | stderr_task = asyncio.create_task( |
| | self._collect_stream(process.stderr, "stderr") |
| | ) |
| | |
| | |
| | pending = {stdout_task, stderr_task} |
| | while pending: |
| | done, pending = await asyncio.wait( |
| | pending, return_when=asyncio.FIRST_COMPLETED |
| | ) |
| | for task in done: |
| | for line_type, line in task.result(): |
| | yield (line_type, line) |
| | |
| | |
| | try: |
| | await asyncio.wait_for(process.wait(), timeout=timeout) |
| | yield ("exit_code", process.returncode) |
| | except asyncio.TimeoutError: |
| | process.kill() |
| | yield ("stderr", f"Command timeout after {timeout}s") |
| | yield ("exit_code", -1) |
| | |
| | except Exception as e: |
| | yield ("stderr", f"Execution error: {str(e)}") |
| | yield ("exit_code", -1) |
| | |
| | async def _collect_stream(self, stream, line_type): |
| | """Helper to collect stream output""" |
| | lines = [] |
| | while True: |
| | line = await stream.readline() |
| | if not line: |
| | break |
| | lines.append((line_type, line.decode('utf-8', errors='replace').rstrip())) |
| | return lines |
| |
|
| |
|
| | |
| | TERMINAL_TOOL_SPEC = { |
| | "name": "execute_terminal", |
| | "description": ( |
| | "Execute bash/shell commands with safety constraints. " |
| | "Use this to: run npm/yarn commands, git operations, docker commands, " |
| | "file operations (ls, cat, grep, find), and system utilities. " |
| | "Commands are validated against a whitelist for security. " |
| | "Examples: 'npm install', 'git status', 'docker ps', 'ls -la', 'cat file.txt'" |
| | ), |
| | "parameters": { |
| | "type": "object", |
| | "properties": { |
| | "command": { |
| | "type": "string", |
| | "description": "The shell command to execute. Must be a whitelisted command.", |
| | }, |
| | "timeout": { |
| | "type": "number", |
| | "description": "Maximum execution time in seconds (default: 30, max: 300)", |
| | }, |
| | }, |
| | "required": ["command"], |
| | }, |
| | } |
| |
|
| |
|
| | async def execute_terminal_handler(arguments: Dict[str, Any]) -> tuple[str, bool]: |
| | """Handler for terminal command execution.""" |
| | try: |
| | command = arguments.get("command", "").strip() |
| | timeout = min(arguments.get("timeout", 30), 300) |
| | |
| | if not command: |
| | return "Error: Command is required", False |
| | |
| | logger.info(f"Executing terminal command: {command}") |
| | |
| | executor = TerminalExecutor() |
| | result = await executor.execute_command(command, timeout=timeout) |
| | |
| | |
| | output_lines = [ |
| | f"$ {result.command}", |
| | "", |
| | ] |
| | |
| | if result.stdout: |
| | output_lines.append(result.stdout) |
| | |
| | if result.stderr: |
| | output_lines.append(f"[stderr] {result.stderr}") |
| | |
| | output_lines.append("") |
| | output_lines.append(f"Exit code: {result.exit_code}") |
| | output_lines.append(f"Duration: {result.duration_ms:.0f}ms") |
| | |
| | output = "\n".join(output_lines) |
| | |
| | if result.success: |
| | return f"✅ Command executed successfully\n{output}", True |
| | else: |
| | return f"❌ Command failed\n{output}", False |
| | |
| | except Exception as e: |
| | logger.error(f"Terminal execution error: {e}") |
| | return f"❌ Error executing command: {str(e)}", False |
| |
|