|
|
import platform |
|
|
import subprocess |
|
|
from typing import Dict, Any, List, Optional |
|
|
|
|
|
from .tool import Tool, Toolkit |
|
|
from .storage_handler import FileStorageHandler |
|
|
from ..core.logging import logger |
|
|
|
|
|
|
|
|
class CMDBase: |
|
|
""" |
|
|
Base class for command execution with permission checking and cross-platform support. |
|
|
""" |
|
|
|
|
|
def __init__(self, default_shell: str = None, storage_handler: FileStorageHandler = None): |
|
|
""" |
|
|
Initialize CMDBase with system detection and shell configuration. |
|
|
|
|
|
Args: |
|
|
default_shell: Override default shell detection |
|
|
storage_handler: Storage handler for file operations |
|
|
""" |
|
|
self.system = platform.system().lower() |
|
|
self.default_shell = default_shell or self._detect_default_shell() |
|
|
self.permission_cache = {} |
|
|
self.storage_handler = storage_handler |
|
|
|
|
|
def _detect_default_shell(self) -> str: |
|
|
"""Detect the default shell for the current system.""" |
|
|
if self.system == "windows": |
|
|
return "cmd" |
|
|
elif self.system == "darwin": |
|
|
return "bash" |
|
|
else: |
|
|
return "bash" |
|
|
|
|
|
def _is_dangerous_command(self, command: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Check if a command is potentially dangerous. |
|
|
|
|
|
Args: |
|
|
command: The command to check |
|
|
|
|
|
Returns: |
|
|
Dictionary with danger assessment |
|
|
""" |
|
|
dangerous_patterns = [ |
|
|
|
|
|
r"\brm\s+-rf\b", |
|
|
r"\bdel\s+/[sq]\b", |
|
|
r"\bformat\b", |
|
|
r"\bdd\b", |
|
|
r"\bshutdown\b", |
|
|
r"\breboot\b", |
|
|
r"\binit\s+[06]\b", |
|
|
|
|
|
|
|
|
r"\bnetcat\b", |
|
|
r"\bnc\b", |
|
|
r"\bssh\b", |
|
|
r"\bscp\b", |
|
|
|
|
|
|
|
|
r"\bkill\s+-9\b", |
|
|
r"\btaskkill\s+/f\b", |
|
|
|
|
|
|
|
|
r"\bapt\s+install\b", |
|
|
r"\byum\s+install\b", |
|
|
r"\bbrew\s+install\b", |
|
|
r"\bchoco\s+install\b", |
|
|
|
|
|
|
|
|
r"\buseradd\b", |
|
|
r"\buserdel\b", |
|
|
r"\bpasswd\b", |
|
|
|
|
|
|
|
|
r"\bmount\b", |
|
|
r"\bumount\b", |
|
|
r"\bchmod\s+777\b", |
|
|
r"\bchown\s+root\b", |
|
|
] |
|
|
|
|
|
import re |
|
|
command_lower = command.lower() |
|
|
|
|
|
for pattern in dangerous_patterns: |
|
|
if re.search(pattern, command_lower): |
|
|
return { |
|
|
"is_dangerous": True, |
|
|
"reason": f"Command matches dangerous pattern: {pattern}", |
|
|
"risk_level": "high" |
|
|
} |
|
|
|
|
|
|
|
|
if command_lower.startswith(("sudo ", "runas ")): |
|
|
return { |
|
|
"is_dangerous": True, |
|
|
"reason": "Command requires elevated privileges", |
|
|
"risk_level": "high" |
|
|
} |
|
|
|
|
|
|
|
|
system_dirs = ["/etc/", "/usr/", "/var/", "/bin/", "/sbin/", "C:\\Windows\\", "C:\\Program Files\\"] |
|
|
for sys_dir in system_dirs: |
|
|
if sys_dir in command: |
|
|
return { |
|
|
"is_dangerous": True, |
|
|
"reason": f"Command operates on system directory: {sys_dir}", |
|
|
"risk_level": "medium" |
|
|
} |
|
|
|
|
|
return {"is_dangerous": False, "risk_level": "low"} |
|
|
|
|
|
def _request_permission(self, command: str, danger_assessment: Dict[str, Any]) -> bool: |
|
|
""" |
|
|
Request permission from user to execute command. |
|
|
|
|
|
Args: |
|
|
command: The command to execute |
|
|
danger_assessment: Assessment of command danger |
|
|
|
|
|
Returns: |
|
|
True if permission granted, False otherwise |
|
|
""" |
|
|
print(f"\n{'='*60}") |
|
|
print("🔒 PERMISSION REQUEST") |
|
|
print(f"{'='*60}") |
|
|
print(f"Command: {command}") |
|
|
print(f"System: {self.system}") |
|
|
print(f"Shell: {self.default_shell}") |
|
|
|
|
|
if danger_assessment["is_dangerous"]: |
|
|
print(f"⚠️ WARNING: {danger_assessment['reason']}") |
|
|
print(f"Risk Level: {danger_assessment['risk_level'].upper()}") |
|
|
else: |
|
|
print("✅ Command appears safe") |
|
|
|
|
|
print("\nDo you want to execute this command?") |
|
|
print("Options:") |
|
|
print(" y/Y - Yes, execute the command") |
|
|
print(" n/N - No, do not execute") |
|
|
print(" [reason] - No, with explanation") |
|
|
print(" [empty] - No, without explanation") |
|
|
|
|
|
try: |
|
|
response = input("\nYour response: ").strip().lower() |
|
|
|
|
|
if response in ['y', 'yes']: |
|
|
print("✅ Permission granted. Executing command...") |
|
|
return True |
|
|
elif response in ['n', 'no', '']: |
|
|
print("❌ Permission denied.") |
|
|
return False |
|
|
else: |
|
|
print(f"❌ Permission denied. Reason: {response}") |
|
|
return False |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
print("\n❌ Permission request cancelled by user.") |
|
|
return False |
|
|
|
|
|
def execute_command(self, command: str, timeout: int = 30, cwd: str = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Execute a command with permission checking. |
|
|
|
|
|
Args: |
|
|
command: The command to execute |
|
|
timeout: Command timeout in seconds |
|
|
cwd: Working directory for command execution |
|
|
|
|
|
Returns: |
|
|
Dictionary with execution results |
|
|
""" |
|
|
try: |
|
|
|
|
|
danger_assessment = self._is_dangerous_command(command) |
|
|
|
|
|
|
|
|
if not self._request_permission(command, danger_assessment): |
|
|
return { |
|
|
"success": False, |
|
|
"error": "Permission denied by user", |
|
|
"command": command, |
|
|
"stdout": "", |
|
|
"stderr": "", |
|
|
"return_code": None |
|
|
} |
|
|
|
|
|
|
|
|
if self.system == "windows": |
|
|
|
|
|
if self.default_shell == "cmd": |
|
|
cmd_args = ["cmd", "/c", command] |
|
|
else: |
|
|
cmd_args = ["powershell", "-Command", command] |
|
|
else: |
|
|
|
|
|
cmd_args = [self.default_shell, "-c", command] |
|
|
|
|
|
|
|
|
logger.info(f"Executing command: {command}") |
|
|
|
|
|
result = subprocess.run( |
|
|
cmd_args, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=timeout, |
|
|
cwd=cwd, |
|
|
shell=False |
|
|
) |
|
|
|
|
|
result_dict = { |
|
|
"success": result.returncode == 0, |
|
|
"command": command, |
|
|
"stdout": result.stdout, |
|
|
"stderr": result.stderr, |
|
|
"return_code": result.returncode, |
|
|
"system": self.system, |
|
|
"shell": self.default_shell |
|
|
} |
|
|
|
|
|
|
|
|
if self.storage_handler: |
|
|
result_dict["storage_handler"] = type(self.storage_handler).__name__ |
|
|
result_dict["storage_base_path"] = str(self.storage_handler.base_path) |
|
|
|
|
|
return result_dict |
|
|
|
|
|
except subprocess.TimeoutExpired: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Command timed out after {timeout} seconds", |
|
|
"command": command, |
|
|
"stdout": "", |
|
|
"stderr": "", |
|
|
"return_code": None |
|
|
} |
|
|
except Exception as e: |
|
|
return { |
|
|
"success": False, |
|
|
"error": str(e), |
|
|
"command": command, |
|
|
"stdout": "", |
|
|
"stderr": "", |
|
|
"return_code": None |
|
|
} |
|
|
|
|
|
|
|
|
class ExecuteCommandTool(Tool): |
|
|
name: str = "execute_command" |
|
|
description: str = "Execute a command line operation with permission checking and cross-platform support. Can handle all command line operations including directory creation, file listing, system info, and more." |
|
|
inputs: Dict[str, Dict[str, str]] = { |
|
|
"command": { |
|
|
"type": "string", |
|
|
"description": "The command to execute (e.g., 'ls -la', 'dir', 'mkdir test', 'pwd', 'whoami', 'date', etc.)" |
|
|
}, |
|
|
"timeout": { |
|
|
"type": "integer", |
|
|
"description": "Command timeout in seconds (default: 30)" |
|
|
}, |
|
|
"working_directory": { |
|
|
"type": "string", |
|
|
"description": "Working directory for command execution (optional)" |
|
|
} |
|
|
} |
|
|
required: Optional[List[str]] = ["command"] |
|
|
|
|
|
def __init__(self, cmd_base: CMDBase = None): |
|
|
super().__init__() |
|
|
self.cmd_base = cmd_base or CMDBase() |
|
|
|
|
|
def __call__(self, command: str, timeout: int = 30, working_directory: str = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Execute a command with permission checking. |
|
|
|
|
|
Args: |
|
|
command: The command to execute |
|
|
timeout: Command timeout in seconds |
|
|
working_directory: Working directory for command execution |
|
|
|
|
|
Returns: |
|
|
Dictionary containing the command execution result |
|
|
""" |
|
|
try: |
|
|
result = self.cmd_base.execute_command( |
|
|
command=command, |
|
|
timeout=timeout, |
|
|
cwd=working_directory |
|
|
) |
|
|
|
|
|
if result["success"]: |
|
|
logger.info(f"Successfully executed command: {command}") |
|
|
else: |
|
|
logger.error(f"Failed to execute command {command}: {result.get('error', 'Unknown error')}") |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in execute_command tool: {str(e)}") |
|
|
return { |
|
|
"success": False, |
|
|
"error": str(e), |
|
|
"command": command |
|
|
} |
|
|
|
|
|
|
|
|
class CMDToolkit(Toolkit): |
|
|
""" |
|
|
Command line toolkit that provides safe command execution with permission checking |
|
|
and cross-platform support. Supports Linux, macOS, and Windows. |
|
|
""" |
|
|
|
|
|
def __init__(self, name: str = "CMDToolkit", default_shell: str = None, storage_handler: FileStorageHandler = None): |
|
|
""" |
|
|
Initialize the CMDToolkit with a shared command base instance. |
|
|
|
|
|
Args: |
|
|
name: Name of the toolkit |
|
|
default_shell: Override default shell detection |
|
|
storage_handler: Storage handler for file operations |
|
|
""" |
|
|
|
|
|
if storage_handler is None: |
|
|
from .storage_handler import LocalStorageHandler |
|
|
storage_handler = LocalStorageHandler(base_path="./workplace/cmd") |
|
|
|
|
|
|
|
|
cmd_base = CMDBase(default_shell=default_shell, storage_handler=storage_handler) |
|
|
|
|
|
|
|
|
tools = [ |
|
|
ExecuteCommandTool(cmd_base=cmd_base) |
|
|
] |
|
|
|
|
|
|
|
|
super().__init__(name=name, tools=tools) |
|
|
self.cmd_base = cmd_base |
|
|
self.storage_handler = storage_handler |