import os import subprocess import asyncio import shutil import logging import stat import platform from typing import Dict, List, Optional, Tuple, Union, Any from pathlib import Path # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SystemUtils: def __init__(self, allowed_dirs: List[str] = None, allowed_commands: List[str] = None): """ Initialize SystemUtils with security constraints Args: allowed_dirs: List of directories where file operations are allowed allowed_commands: List of allowed shell commands (if None, all commands are allowed - NOT RECOMMENDED) """ # Default allowed directories (current working directory and home directory) self.allowed_dirs = allowed_dirs or [ os.getcwd(), os.path.expanduser("~"), ] # Default allowed commands (empty list means all commands are allowed - BE CAREFUL!) self.allowed_commands = allowed_commands or [] # Platform-specific settings self.is_windows = platform.system() == 'Windows' self.shell = True if self.is_windows else False # For tracking command history self.command_history = [] async def execute_command(self, command: Union[str, List[str]], cwd: str = None, timeout: int = 30) -> Dict[str, Any]: """ Execute a shell command asynchronously Args: command: Command to execute (string or list of args) cwd: Working directory for the command timeout: Maximum time to wait for command completion (seconds) Returns: Dict containing command results or error information """ if not command: return {"status": "error", "message": "No command provided"} # Convert command to string for logging and history cmd_str = ' '.join(command) if isinstance(command, list) else command # Log the command logger.info(f"Executing command: {cmd_str}") # Add to command history self.command_history.append({ 'command': cmd_str, 'time': asyncio.get_event_loop().time(), 'cwd': cwd or os.getcwd() }) # Check if command is allowed if self.allowed_commands and not any( cmd_str.startswith(allowed) for allowed in self.allowed_commands ): return { "status": "error", "message": "Command not allowed by security policy", "command": cmd_str } # Set working directory work_dir = cwd or os.getcwd() try: # Create subprocess process = await asyncio.create_subprocess_shell( cmd_str, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=work_dir, shell=self.shell ) # Wait for the process to complete with timeout try: stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=timeout ) except asyncio.TimeoutError: process.kill() return { "status": "error", "message": f"Command timed out after {timeout} seconds", "command": cmd_str } # Get return code and output return_code = process.returncode stdout_str = stdout.decode().strip() stderr_str = stderr.decode().strip() return { "status": "success" if return_code == 0 else "error", "return_code": return_code, "stdout": stdout_str, "stderr": stderr_str, "command": cmd_str } except Exception as e: logger.error(f"Error executing command: {str(e)}") return { "status": "error", "message": str(e), "command": cmd_str } async def list_directory(self, path: str = ".") -> Dict[str, Any]: """List contents of a directory""" try: abs_path = os.path.abspath(path) # Check if path is in allowed directories if not any(abs_path.startswith(allowed) for allowed in self.allowed_dirs): return { "status": "error", "message": f"Access to {abs_path} is not allowed by security policy" } if not os.path.exists(abs_path): return { "status": "error", "message": f"Path does not exist: {abs_path}" } if not os.path.isdir(abs_path): return { "status": "error", "message": f"Path is not a directory: {abs_path}" } # Get directory contents contents = [] for entry in os.scandir(abs_path): try: stat_info = entry.stat() contents.append({ 'name': entry.name, 'path': entry.path, 'type': 'file' if entry.is_file() else 'directory', 'size': stat_info.st_size, 'created': stat_info.st_ctime, 'modified': stat_info.st_mtime, 'mode': stat_info.st_mode }) except OSError as e: logger.warning(f"Error accessing {entry.path}: {str(e)}") return { "status": "success", "path": abs_path, "contents": contents } except Exception as e: logger.error(f"Error listing directory {path}: {str(e)}") return { "status": "error", "message": str(e) } async def read_file(self, file_path: str) -> Dict[str, Any]: """Read the contents of a file""" try: abs_path = os.path.abspath(file_path) # Check if path is in allowed directories if not any(abs_path.startswith(allowed) for allowed in self.allowed_dirs): return { "status": "error", "message": f"Access to {abs_path} is not allowed by security policy" } if not os.path.exists(abs_path): return { "status": "error", "message": f"File does not exist: {abs_path}" } if not os.path.isfile(abs_path): return { "status": "error", "message": f"Path is not a file: {abs_path}" } # Check file size before reading (limit to 10MB) file_size = os.path.getsize(abs_path) if file_size > 10 * 1024 * 1024: # 10MB return { "status": "error", "message": f"File is too large ({file_size} bytes). Maximum allowed size is 10MB." } # Read file content with open(abs_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() return { "status": "success", "path": abs_path, "content": content, "size": file_size } except Exception as e: logger.error(f"Error reading file {file_path}: {str(e)}") return { "status": "error", "message": str(e) } async def write_file(self, file_path: str, content: str, mode: str = 'w', append: bool = False) -> Dict[str, Any]: """ Write content to a file Args: file_path: Path to the file content: Content to write mode: File open mode ('w' for write, 'x' for exclusive creation, etc.) append: If True, append to the file instead of overwriting """ try: abs_path = os.path.abspath(file_path) # Check if parent directory is in allowed directories parent_dir = os.path.dirname(abs_path) if not any(parent_dir.startswith(allowed) for allowed in self.allowed_dirs): return { "status": "error", "message": f"Writing to {abs_path} is not allowed by security policy" } # Check if file exists and we're not in append mode if os.path.exists(abs_path) and 'x' in mode: return { "status": "error", "message": f"File already exists: {abs_path}" } # Ensure parent directory exists os.makedirs(parent_dir, exist_ok=True) # Write content to file with open(abs_path, 'a' if append else mode, encoding='utf-8') as f: f.write(content) return { "status": "success", "path": abs_path, "bytes_written": len(content.encode('utf-8')) } except Exception as e: logger.error(f"Error writing to file {file_path}: {str(e)}") return { "status": "error", "message": str(e) } async def create_directory(self, dir_path: str, parents: bool = True, exist_ok: bool = True) -> Dict[str, Any]: """Create a directory""" try: abs_path = os.path.abspath(dir_path) # Check if parent directory is in allowed directories parent_dir = os.path.dirname(abs_path) if not any(parent_dir.startswith(allowed) for allowed in self.allowed_dirs): return { "status": "error", "message": f"Creating directory in {parent_dir} is not allowed by security policy" } # Create directory os.makedirs(abs_path, exist_ok=exist_ok) return { "status": "success", "path": abs_path } except Exception as e: logger.error(f"Error creating directory {dir_path}: {str(e)}") return { "status": "error", "message": str(e) } async def delete_path(self, path: str, recursive: bool = False) -> Dict[str, Any]: """ Delete a file or directory Args: path: Path to delete recursive: If True, delete directory and all its contents """ try: abs_path = os.path.abspath(path) # Check if path is in allowed directories if not any(abs_path.startswith(allowed) for allowed in self.allowed_dirs): return { "status": "error", "message": f"Deleting {abs_path} is not allowed by security policy" } if not os.path.exists(abs_path): return { "status": "error", "message": f"Path does not exist: {abs_path}" } # Delete file or directory if os.path.isfile(abs_path): os.remove(abs_path) elif os.path.isdir(abs_path): if recursive: shutil.rmtree(abs_path) else: os.rmdir(abs_path) else: return { "status": "error", "message": f"Path is neither a file nor a directory: {abs_path}" } return { "status": "success", "path": abs_path, "deleted": True } except Exception as e: logger.error(f"Error deleting {path}: {str(e)}") return { "status": "error", "message": str(e) } def get_command_history(self) -> List[Dict[str, Any]]: """Get command execution history""" return self.command_history # Example usage async def example_usage(): # Initialize with default settings (restricted to current directory and home) utils = SystemUtils() # List current directory print("Current directory:") result = await utils.list_directory(".") if result["status"] == "success": for item in result["contents"][:5]: # Show first 5 items print(f"- {item['name']} ({item['type']}, {item['size']} bytes)") # Create a test directory test_dir = "./test_dir" await utils.create_directory(test_dir) # Write a test file test_file = f"{test_dir}/test.txt" await utils.write_file(test_file, "Hello, World!") # Read the test file print("\nTest file content:") result = await utils.read_file(test_file) if result["status"] == "success": print(result["content"]) # Execute a command print("\nSystem information:") result = await utils.execute_command("uname -a" if not utils.is_windows else "ver") if result["status"] == "success": print(result["stdout"]) # Clean up await utils.delete_path(test_dir, recursive=True) if __name__ == "__main__": import asyncio asyncio.run(example_usage())