ollama-api-test / system_utils.py
YussefGAFeer's picture
Create system_utils.py
035cf1a verified
import os
import subprocess
import asyncio
import shutil
import logging
import stat
import platform
from typing import Dict, List, Optional, Tuple, Union, Any
from pathlib import Path
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SystemUtils:
def __init__(self, allowed_dirs: List[str] = None, allowed_commands: List[str] = None):
"""
Initialize SystemUtils with security constraints
Args:
allowed_dirs: List of directories where file operations are allowed
allowed_commands: List of allowed shell commands (if None, all commands are allowed - NOT RECOMMENDED)
"""
# Default allowed directories (current working directory and home directory)
self.allowed_dirs = allowed_dirs or [
os.getcwd(),
os.path.expanduser("~"),
]
# Default allowed commands (empty list means all commands are allowed - BE CAREFUL!)
self.allowed_commands = allowed_commands or []
# Platform-specific settings
self.is_windows = platform.system() == 'Windows'
self.shell = True if self.is_windows else False
# For tracking command history
self.command_history = []
async def execute_command(self, command: Union[str, List[str]], cwd: str = None,
timeout: int = 30) -> Dict[str, Any]:
"""
Execute a shell command asynchronously
Args:
command: Command to execute (string or list of args)
cwd: Working directory for the command
timeout: Maximum time to wait for command completion (seconds)
Returns:
Dict containing command results or error information
"""
if not command:
return {"status": "error", "message": "No command provided"}
# Convert command to string for logging and history
cmd_str = ' '.join(command) if isinstance(command, list) else command
# Log the command
logger.info(f"Executing command: {cmd_str}")
# Add to command history
self.command_history.append({
'command': cmd_str,
'time': asyncio.get_event_loop().time(),
'cwd': cwd or os.getcwd()
})
# Check if command is allowed
if self.allowed_commands and not any(
cmd_str.startswith(allowed) for allowed in self.allowed_commands
):
return {
"status": "error",
"message": "Command not allowed by security policy",
"command": cmd_str
}
# Set working directory
work_dir = cwd or os.getcwd()
try:
# Create subprocess
process = await asyncio.create_subprocess_shell(
cmd_str,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=work_dir,
shell=self.shell
)
# Wait for the process to complete with timeout
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout
)
except asyncio.TimeoutError:
process.kill()
return {
"status": "error",
"message": f"Command timed out after {timeout} seconds",
"command": cmd_str
}
# Get return code and output
return_code = process.returncode
stdout_str = stdout.decode().strip()
stderr_str = stderr.decode().strip()
return {
"status": "success" if return_code == 0 else "error",
"return_code": return_code,
"stdout": stdout_str,
"stderr": stderr_str,
"command": cmd_str
}
except Exception as e:
logger.error(f"Error executing command: {str(e)}")
return {
"status": "error",
"message": str(e),
"command": cmd_str
}
async def list_directory(self, path: str = ".") -> Dict[str, Any]:
"""List contents of a directory"""
try:
abs_path = os.path.abspath(path)
# Check if path is in allowed directories
if not any(abs_path.startswith(allowed) for allowed in self.allowed_dirs):
return {
"status": "error",
"message": f"Access to {abs_path} is not allowed by security policy"
}
if not os.path.exists(abs_path):
return {
"status": "error",
"message": f"Path does not exist: {abs_path}"
}
if not os.path.isdir(abs_path):
return {
"status": "error",
"message": f"Path is not a directory: {abs_path}"
}
# Get directory contents
contents = []
for entry in os.scandir(abs_path):
try:
stat_info = entry.stat()
contents.append({
'name': entry.name,
'path': entry.path,
'type': 'file' if entry.is_file() else 'directory',
'size': stat_info.st_size,
'created': stat_info.st_ctime,
'modified': stat_info.st_mtime,
'mode': stat_info.st_mode
})
except OSError as e:
logger.warning(f"Error accessing {entry.path}: {str(e)}")
return {
"status": "success",
"path": abs_path,
"contents": contents
}
except Exception as e:
logger.error(f"Error listing directory {path}: {str(e)}")
return {
"status": "error",
"message": str(e)
}
async def read_file(self, file_path: str) -> Dict[str, Any]:
"""Read the contents of a file"""
try:
abs_path = os.path.abspath(file_path)
# Check if path is in allowed directories
if not any(abs_path.startswith(allowed) for allowed in self.allowed_dirs):
return {
"status": "error",
"message": f"Access to {abs_path} is not allowed by security policy"
}
if not os.path.exists(abs_path):
return {
"status": "error",
"message": f"File does not exist: {abs_path}"
}
if not os.path.isfile(abs_path):
return {
"status": "error",
"message": f"Path is not a file: {abs_path}"
}
# Check file size before reading (limit to 10MB)
file_size = os.path.getsize(abs_path)
if file_size > 10 * 1024 * 1024: # 10MB
return {
"status": "error",
"message": f"File is too large ({file_size} bytes). Maximum allowed size is 10MB."
}
# Read file content
with open(abs_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
return {
"status": "success",
"path": abs_path,
"content": content,
"size": file_size
}
except Exception as e:
logger.error(f"Error reading file {file_path}: {str(e)}")
return {
"status": "error",
"message": str(e)
}
async def write_file(self, file_path: str, content: str,
mode: str = 'w', append: bool = False) -> Dict[str, Any]:
"""
Write content to a file
Args:
file_path: Path to the file
content: Content to write
mode: File open mode ('w' for write, 'x' for exclusive creation, etc.)
append: If True, append to the file instead of overwriting
"""
try:
abs_path = os.path.abspath(file_path)
# Check if parent directory is in allowed directories
parent_dir = os.path.dirname(abs_path)
if not any(parent_dir.startswith(allowed) for allowed in self.allowed_dirs):
return {
"status": "error",
"message": f"Writing to {abs_path} is not allowed by security policy"
}
# Check if file exists and we're not in append mode
if os.path.exists(abs_path) and 'x' in mode:
return {
"status": "error",
"message": f"File already exists: {abs_path}"
}
# Ensure parent directory exists
os.makedirs(parent_dir, exist_ok=True)
# Write content to file
with open(abs_path, 'a' if append else mode, encoding='utf-8') as f:
f.write(content)
return {
"status": "success",
"path": abs_path,
"bytes_written": len(content.encode('utf-8'))
}
except Exception as e:
logger.error(f"Error writing to file {file_path}: {str(e)}")
return {
"status": "error",
"message": str(e)
}
async def create_directory(self, dir_path: str, parents: bool = True,
exist_ok: bool = True) -> Dict[str, Any]:
"""Create a directory"""
try:
abs_path = os.path.abspath(dir_path)
# Check if parent directory is in allowed directories
parent_dir = os.path.dirname(abs_path)
if not any(parent_dir.startswith(allowed) for allowed in self.allowed_dirs):
return {
"status": "error",
"message": f"Creating directory in {parent_dir} is not allowed by security policy"
}
# Create directory
os.makedirs(abs_path, exist_ok=exist_ok)
return {
"status": "success",
"path": abs_path
}
except Exception as e:
logger.error(f"Error creating directory {dir_path}: {str(e)}")
return {
"status": "error",
"message": str(e)
}
async def delete_path(self, path: str, recursive: bool = False) -> Dict[str, Any]:
"""
Delete a file or directory
Args:
path: Path to delete
recursive: If True, delete directory and all its contents
"""
try:
abs_path = os.path.abspath(path)
# Check if path is in allowed directories
if not any(abs_path.startswith(allowed) for allowed in self.allowed_dirs):
return {
"status": "error",
"message": f"Deleting {abs_path} is not allowed by security policy"
}
if not os.path.exists(abs_path):
return {
"status": "error",
"message": f"Path does not exist: {abs_path}"
}
# Delete file or directory
if os.path.isfile(abs_path):
os.remove(abs_path)
elif os.path.isdir(abs_path):
if recursive:
shutil.rmtree(abs_path)
else:
os.rmdir(abs_path)
else:
return {
"status": "error",
"message": f"Path is neither a file nor a directory: {abs_path}"
}
return {
"status": "success",
"path": abs_path,
"deleted": True
}
except Exception as e:
logger.error(f"Error deleting {path}: {str(e)}")
return {
"status": "error",
"message": str(e)
}
def get_command_history(self) -> List[Dict[str, Any]]:
"""Get command execution history"""
return self.command_history
# Example usage
async def example_usage():
# Initialize with default settings (restricted to current directory and home)
utils = SystemUtils()
# List current directory
print("Current directory:")
result = await utils.list_directory(".")
if result["status"] == "success":
for item in result["contents"][:5]: # Show first 5 items
print(f"- {item['name']} ({item['type']}, {item['size']} bytes)")
# Create a test directory
test_dir = "./test_dir"
await utils.create_directory(test_dir)
# Write a test file
test_file = f"{test_dir}/test.txt"
await utils.write_file(test_file, "Hello, World!")
# Read the test file
print("\nTest file content:")
result = await utils.read_file(test_file)
if result["status"] == "success":
print(result["content"])
# Execute a command
print("\nSystem information:")
result = await utils.execute_command("uname -a" if not utils.is_windows else "ver")
if result["status"] == "success":
print(result["stdout"])
# Clean up
await utils.delete_path(test_dir, recursive=True)
if __name__ == "__main__":
import asyncio
asyncio.run(example_usage())