Spaces:
Build error
Build error
| import os | |
| import subprocess | |
| import asyncio | |
| import shutil | |
| import logging | |
| import stat | |
| import platform | |
| from typing import Dict, List, Optional, Tuple, Union, Any | |
| from pathlib import Path | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class SystemUtils: | |
| def __init__(self, allowed_dirs: List[str] = None, allowed_commands: List[str] = None): | |
| """ | |
| Initialize SystemUtils with security constraints | |
| Args: | |
| allowed_dirs: List of directories where file operations are allowed | |
| allowed_commands: List of allowed shell commands (if None, all commands are allowed - NOT RECOMMENDED) | |
| """ | |
| # Default allowed directories (current working directory and home directory) | |
| self.allowed_dirs = allowed_dirs or [ | |
| os.getcwd(), | |
| os.path.expanduser("~"), | |
| ] | |
| # Default allowed commands (empty list means all commands are allowed - BE CAREFUL!) | |
| self.allowed_commands = allowed_commands or [] | |
| # Platform-specific settings | |
| self.is_windows = platform.system() == 'Windows' | |
| self.shell = True if self.is_windows else False | |
| # For tracking command history | |
| self.command_history = [] | |
| async def execute_command(self, command: Union[str, List[str]], cwd: str = None, | |
| timeout: int = 30) -> Dict[str, Any]: | |
| """ | |
| Execute a shell command asynchronously | |
| Args: | |
| command: Command to execute (string or list of args) | |
| cwd: Working directory for the command | |
| timeout: Maximum time to wait for command completion (seconds) | |
| Returns: | |
| Dict containing command results or error information | |
| """ | |
| if not command: | |
| return {"status": "error", "message": "No command provided"} | |
| # Convert command to string for logging and history | |
| cmd_str = ' '.join(command) if isinstance(command, list) else command | |
| # Log the command | |
| logger.info(f"Executing command: {cmd_str}") | |
| # Add to command history | |
| self.command_history.append({ | |
| 'command': cmd_str, | |
| 'time': asyncio.get_event_loop().time(), | |
| 'cwd': cwd or os.getcwd() | |
| }) | |
| # Check if command is allowed | |
| if self.allowed_commands and not any( | |
| cmd_str.startswith(allowed) for allowed in self.allowed_commands | |
| ): | |
| return { | |
| "status": "error", | |
| "message": "Command not allowed by security policy", | |
| "command": cmd_str | |
| } | |
| # Set working directory | |
| work_dir = cwd or os.getcwd() | |
| try: | |
| # Create subprocess | |
| process = await asyncio.create_subprocess_shell( | |
| cmd_str, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| cwd=work_dir, | |
| shell=self.shell | |
| ) | |
| # Wait for the process to complete with timeout | |
| try: | |
| stdout, stderr = await asyncio.wait_for( | |
| process.communicate(), | |
| timeout=timeout | |
| ) | |
| except asyncio.TimeoutError: | |
| process.kill() | |
| return { | |
| "status": "error", | |
| "message": f"Command timed out after {timeout} seconds", | |
| "command": cmd_str | |
| } | |
| # Get return code and output | |
| return_code = process.returncode | |
| stdout_str = stdout.decode().strip() | |
| stderr_str = stderr.decode().strip() | |
| return { | |
| "status": "success" if return_code == 0 else "error", | |
| "return_code": return_code, | |
| "stdout": stdout_str, | |
| "stderr": stderr_str, | |
| "command": cmd_str | |
| } | |
| except Exception as e: | |
| logger.error(f"Error executing command: {str(e)}") | |
| return { | |
| "status": "error", | |
| "message": str(e), | |
| "command": cmd_str | |
| } | |
| async def list_directory(self, path: str = ".") -> Dict[str, Any]: | |
| """List contents of a directory""" | |
| try: | |
| abs_path = os.path.abspath(path) | |
| # Check if path is in allowed directories | |
| if not any(abs_path.startswith(allowed) for allowed in self.allowed_dirs): | |
| return { | |
| "status": "error", | |
| "message": f"Access to {abs_path} is not allowed by security policy" | |
| } | |
| if not os.path.exists(abs_path): | |
| return { | |
| "status": "error", | |
| "message": f"Path does not exist: {abs_path}" | |
| } | |
| if not os.path.isdir(abs_path): | |
| return { | |
| "status": "error", | |
| "message": f"Path is not a directory: {abs_path}" | |
| } | |
| # Get directory contents | |
| contents = [] | |
| for entry in os.scandir(abs_path): | |
| try: | |
| stat_info = entry.stat() | |
| contents.append({ | |
| 'name': entry.name, | |
| 'path': entry.path, | |
| 'type': 'file' if entry.is_file() else 'directory', | |
| 'size': stat_info.st_size, | |
| 'created': stat_info.st_ctime, | |
| 'modified': stat_info.st_mtime, | |
| 'mode': stat_info.st_mode | |
| }) | |
| except OSError as e: | |
| logger.warning(f"Error accessing {entry.path}: {str(e)}") | |
| return { | |
| "status": "success", | |
| "path": abs_path, | |
| "contents": contents | |
| } | |
| except Exception as e: | |
| logger.error(f"Error listing directory {path}: {str(e)}") | |
| return { | |
| "status": "error", | |
| "message": str(e) | |
| } | |
| async def read_file(self, file_path: str) -> Dict[str, Any]: | |
| """Read the contents of a file""" | |
| try: | |
| abs_path = os.path.abspath(file_path) | |
| # Check if path is in allowed directories | |
| if not any(abs_path.startswith(allowed) for allowed in self.allowed_dirs): | |
| return { | |
| "status": "error", | |
| "message": f"Access to {abs_path} is not allowed by security policy" | |
| } | |
| if not os.path.exists(abs_path): | |
| return { | |
| "status": "error", | |
| "message": f"File does not exist: {abs_path}" | |
| } | |
| if not os.path.isfile(abs_path): | |
| return { | |
| "status": "error", | |
| "message": f"Path is not a file: {abs_path}" | |
| } | |
| # Check file size before reading (limit to 10MB) | |
| file_size = os.path.getsize(abs_path) | |
| if file_size > 10 * 1024 * 1024: # 10MB | |
| return { | |
| "status": "error", | |
| "message": f"File is too large ({file_size} bytes). Maximum allowed size is 10MB." | |
| } | |
| # Read file content | |
| with open(abs_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| content = f.read() | |
| return { | |
| "status": "success", | |
| "path": abs_path, | |
| "content": content, | |
| "size": file_size | |
| } | |
| except Exception as e: | |
| logger.error(f"Error reading file {file_path}: {str(e)}") | |
| return { | |
| "status": "error", | |
| "message": str(e) | |
| } | |
| async def write_file(self, file_path: str, content: str, | |
| mode: str = 'w', append: bool = False) -> Dict[str, Any]: | |
| """ | |
| Write content to a file | |
| Args: | |
| file_path: Path to the file | |
| content: Content to write | |
| mode: File open mode ('w' for write, 'x' for exclusive creation, etc.) | |
| append: If True, append to the file instead of overwriting | |
| """ | |
| try: | |
| abs_path = os.path.abspath(file_path) | |
| # Check if parent directory is in allowed directories | |
| parent_dir = os.path.dirname(abs_path) | |
| if not any(parent_dir.startswith(allowed) for allowed in self.allowed_dirs): | |
| return { | |
| "status": "error", | |
| "message": f"Writing to {abs_path} is not allowed by security policy" | |
| } | |
| # Check if file exists and we're not in append mode | |
| if os.path.exists(abs_path) and 'x' in mode: | |
| return { | |
| "status": "error", | |
| "message": f"File already exists: {abs_path}" | |
| } | |
| # Ensure parent directory exists | |
| os.makedirs(parent_dir, exist_ok=True) | |
| # Write content to file | |
| with open(abs_path, 'a' if append else mode, encoding='utf-8') as f: | |
| f.write(content) | |
| return { | |
| "status": "success", | |
| "path": abs_path, | |
| "bytes_written": len(content.encode('utf-8')) | |
| } | |
| except Exception as e: | |
| logger.error(f"Error writing to file {file_path}: {str(e)}") | |
| return { | |
| "status": "error", | |
| "message": str(e) | |
| } | |
| async def create_directory(self, dir_path: str, parents: bool = True, | |
| exist_ok: bool = True) -> Dict[str, Any]: | |
| """Create a directory""" | |
| try: | |
| abs_path = os.path.abspath(dir_path) | |
| # Check if parent directory is in allowed directories | |
| parent_dir = os.path.dirname(abs_path) | |
| if not any(parent_dir.startswith(allowed) for allowed in self.allowed_dirs): | |
| return { | |
| "status": "error", | |
| "message": f"Creating directory in {parent_dir} is not allowed by security policy" | |
| } | |
| # Create directory | |
| os.makedirs(abs_path, exist_ok=exist_ok) | |
| return { | |
| "status": "success", | |
| "path": abs_path | |
| } | |
| except Exception as e: | |
| logger.error(f"Error creating directory {dir_path}: {str(e)}") | |
| return { | |
| "status": "error", | |
| "message": str(e) | |
| } | |
| async def delete_path(self, path: str, recursive: bool = False) -> Dict[str, Any]: | |
| """ | |
| Delete a file or directory | |
| Args: | |
| path: Path to delete | |
| recursive: If True, delete directory and all its contents | |
| """ | |
| try: | |
| abs_path = os.path.abspath(path) | |
| # Check if path is in allowed directories | |
| if not any(abs_path.startswith(allowed) for allowed in self.allowed_dirs): | |
| return { | |
| "status": "error", | |
| "message": f"Deleting {abs_path} is not allowed by security policy" | |
| } | |
| if not os.path.exists(abs_path): | |
| return { | |
| "status": "error", | |
| "message": f"Path does not exist: {abs_path}" | |
| } | |
| # Delete file or directory | |
| if os.path.isfile(abs_path): | |
| os.remove(abs_path) | |
| elif os.path.isdir(abs_path): | |
| if recursive: | |
| shutil.rmtree(abs_path) | |
| else: | |
| os.rmdir(abs_path) | |
| else: | |
| return { | |
| "status": "error", | |
| "message": f"Path is neither a file nor a directory: {abs_path}" | |
| } | |
| return { | |
| "status": "success", | |
| "path": abs_path, | |
| "deleted": True | |
| } | |
| except Exception as e: | |
| logger.error(f"Error deleting {path}: {str(e)}") | |
| return { | |
| "status": "error", | |
| "message": str(e) | |
| } | |
| def get_command_history(self) -> List[Dict[str, Any]]: | |
| """Get command execution history""" | |
| return self.command_history | |
| # Example usage | |
| async def example_usage(): | |
| # Initialize with default settings (restricted to current directory and home) | |
| utils = SystemUtils() | |
| # List current directory | |
| print("Current directory:") | |
| result = await utils.list_directory(".") | |
| if result["status"] == "success": | |
| for item in result["contents"][:5]: # Show first 5 items | |
| print(f"- {item['name']} ({item['type']}, {item['size']} bytes)") | |
| # Create a test directory | |
| test_dir = "./test_dir" | |
| await utils.create_directory(test_dir) | |
| # Write a test file | |
| test_file = f"{test_dir}/test.txt" | |
| await utils.write_file(test_file, "Hello, World!") | |
| # Read the test file | |
| print("\nTest file content:") | |
| result = await utils.read_file(test_file) | |
| if result["status"] == "success": | |
| print(result["content"]) | |
| # Execute a command | |
| print("\nSystem information:") | |
| result = await utils.execute_command("uname -a" if not utils.is_windows else "ver") | |
| if result["status"] == "success": | |
| print(result["stdout"]) | |
| # Clean up | |
| await utils.delete_path(test_dir, recursive=True) | |
| if __name__ == "__main__": | |
| import asyncio | |
| asyncio.run(example_usage()) | |