linux-mcp-server / tools /linux_tools.py
ChrisSacrumCor's picture
Update tools/linux_tools.py
0add29f verified
"""
Linux administration tools for Gradio interface.
Standard Python functions (no @gr.tool decorators).
"""
import subprocess
import os
import pwd
import grp
import shlex
from typing import List, Optional
from shared.config import config
import logging
logger = logging.getLogger(__name__)
# Sensitive Tools (require Socratic dialogue)
def add_user(username: str, groups: Optional[List[str]] = None,
create_home: bool = True, shell: str = "/bin/bash") -> dict:
"""
Add a new system user with proper validation.
This is a SENSITIVE operation requiring careful consideration.
"""
# Validate username
if not _validate_username(username):
return {
"success": False,
"error": "Invalid username format",
"details": "Username must be lowercase, alphanumeric, and start with letter",
"tool_type": "sensitive_operation"
}
# Check if user already exists
try:
pwd.getpwnam(username)
return {
"success": False,
"error": f"User '{username}' already exists",
"tool_type": "sensitive_operation"
}
except KeyError:
pass # User doesn't exist, which is what we want
# Build useradd command
cmd = ["useradd"]
if create_home:
cmd.append("-m")
if shell:
cmd.extend(["-s", shell])
if groups:
# Validate groups exist
valid_groups = []
for group in groups:
try:
grp.getgrnam(group)
valid_groups.append(group)
except KeyError:
logger.warning(f"Group '{group}' does not exist, skipping")
if valid_groups:
cmd.extend(["-G", ",".join(valid_groups)])
cmd.append(username)
try:
logger.info(f"Adding user: {' '.join(cmd)}")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=config.security.command_timeout,
check=False
)
if result.returncode == 0:
return {
"success": True,
"message": f"User '{username}' created successfully",
"details": {
"username": username,
"groups": groups or [],
"home_created": create_home,
"shell": shell
},
"tool_type": "sensitive_operation"
}
else:
return {
"success": False,
"error": f"Failed to create user: {result.stderr}",
"exit_code": result.returncode,
"tool_type": "sensitive_operation"
}
except Exception as e:
return {
"success": False,
"error": f"Error creating user: {str(e)}",
"tool_type": "sensitive_operation"
}
def create_file(filepath: str, content: str, permissions: str = "644",
owner: Optional[str] = None) -> dict:
"""
Create a new file with specified content and permissions.
This is a SENSITIVE operation that modifies the filesystem.
"""
# Validate file path
if not _validate_filepath(filepath):
return {
"success": False,
"error": "Invalid file path - must be absolute and safe",
"tool_type": "sensitive_operation"
}
# Check if file already exists
if os.path.exists(filepath):
return {
"success": False,
"error": f"File '{filepath}' already exists",
"tool_type": "sensitive_operation"
}
try:
# Create directory if needed
directory = os.path.dirname(filepath)
if directory and not os.path.exists(directory):
os.makedirs(directory, mode=0o755)
# Write file
with open(filepath, 'w') as f:
f.write(content)
# Set permissions
os.chmod(filepath, int(permissions, 8))
# Change owner if specified
if owner:
try:
user_info = pwd.getpwnam(owner)
os.chown(filepath, user_info.pw_uid, user_info.pw_gid)
except KeyError:
logger.warning(f"User '{owner}' not found, keeping current owner")
return {
"success": True,
"message": f"File '{filepath}' created successfully",
"details": {
"path": filepath,
"permissions": permissions,
"owner": owner,
"size": len(content)
},
"tool_type": "sensitive_operation"
}
except Exception as e:
return {
"success": False,
"error": f"Error creating file: {str(e)}",
"tool_type": "sensitive_operation"
}
def change_permission(path: str, permissions: str, recursive: bool = False,
owner: Optional[str] = None) -> dict:
"""
Change file or directory permissions.
This is a SENSITIVE operation that affects system security.
"""
if not os.path.exists(path):
return {
"success": False,
"error": f"Path '{path}' does not exist",
"tool_type": "sensitive_operation"
}
try:
# Change permissions
perm_octal = int(permissions, 8)
if recursive and os.path.isdir(path):
# Recursively change permissions
for root, dirs, files in os.walk(path):
os.chmod(root, perm_octal)
for file in files:
os.chmod(os.path.join(root, file), perm_octal)
else:
os.chmod(path, perm_octal)
# Change owner if specified
if owner:
try:
user_info = pwd.getpwnam(owner)
if recursive and os.path.isdir(path):
for root, dirs, files in os.walk(path):
os.chown(root, user_info.pw_uid, user_info.pw_gid)
for file in files:
os.chown(os.path.join(root, file), user_info.pw_uid, user_info.pw_gid)
else:
os.chown(path, user_info.pw_uid, user_info.pw_gid)
except KeyError:
return {
"success": False,
"error": f"User '{owner}' not found",
"tool_type": "sensitive_operation"
}
return {
"success": True,
"message": f"Permissions changed for '{path}'",
"details": {
"path": path,
"permissions": permissions,
"recursive": recursive,
"owner": owner
},
"tool_type": "sensitive_operation"
}
except Exception as e:
return {
"success": False,
"error": f"Error changing permissions: {str(e)}",
"tool_type": "sensitive_operation"
}
def run_safe_command(command: str, args: Optional[List[str]] = None,
working_directory: Optional[str] = None) -> dict:
"""
Execute safe, whitelisted system commands for information gathering.
These are READ-ONLY or low-risk operations.
"""
if command not in config.security.safe_commands:
return {
"success": False,
"error": f"Command '{command}' not in whitelist",
"whitelist": config.security.safe_commands,
"tool_type": "safe_command"
}
# Build command
cmd_parts = [command]
if args:
# Validate and sanitize arguments
sanitized_args = [shlex.quote(arg) for arg in args if arg]
cmd_parts.extend(sanitized_args)
# Set working directory
if not working_directory:
working_directory = "/home"
try:
logger.info(f"Executing safe command: {' '.join(cmd_parts)} in {working_directory}")
result = subprocess.run(
cmd_parts,
cwd=working_directory,
capture_output=True,
text=True,
timeout=config.security.command_timeout,
check=False # Don't raise exception on non-zero exit
)
return {
"success": result.returncode == 0,
"output": result.stdout,
"error": result.stderr,
"exit_code": result.returncode,
"command": ' '.join(cmd_parts),
"tool_type": "safe_command"
}
except subprocess.TimeoutExpired:
return {
"success": False,
"error": f"Command timed out after {config.security.command_timeout} seconds",
"exit_code": -1,
"tool_type": "safe_command"
}
except Exception as e:
return {
"success": False,
"error": f"Execution error: {str(e)}",
"exit_code": -1,
"tool_type": "safe_command"
}
# Helper functions
def _validate_username(username: str) -> bool:
"""Validate username format"""
if not username or not isinstance(username, str):
return False
if len(username) > 32: # Typical Linux limit
return False
if not username[0].islower() or not username[0].isalpha():
return False
return all(c.islower() or c.isdigit() or c in '_-' for c in username)
def _validate_filepath(filepath: str) -> bool:
"""Basic file path validation"""
if not filepath or not isinstance(filepath, str):
return False
if not os.path.isabs(filepath):
return False
# Prevent path traversal
if '..' in filepath:
return False
# Prevent access to sensitive system files
forbidden_paths = ['/etc/passwd', '/etc/shadow', '/etc/sudoers']
if filepath in forbidden_paths:
return False
return True
"""
Add a new system user with proper validation.
This is a SENSITIVE operation requiring careful consideration.
Args:
username: Username (lowercase, alphanumeric, starts with letter)
groups: Additional groups for the user (optional)
create_home: Create home directory for user
shell: Default shell for user
Returns:
Dictionary with success status and details
"""
# Validate username
if not _validate_username(username):
return {
"success": False,
"error": "Invalid username format",
"details": "Username must be lowercase, alphanumeric, and start with letter",
"tool_type": "sensitive_operation"
}
# Check if user already exists
try:
pwd.getpwnam(username)
return {
"success": False,
"error": f"User '{username}' already exists",
"tool_type": "sensitive_operation"
}
except KeyError:
pass # User doesn't exist, which is what we want
# Build useradd command
cmd = ["useradd"]
if create_home:
cmd.append("-m")
if shell:
cmd.extend(["-s", shell])
if groups:
# Validate groups exist
valid_groups = []
for group in groups:
try:
grp.getgrnam(group)
valid_groups.append(group)
except KeyError:
logger.warning(f"Group '{group}' does not exist, skipping")
if valid_groups:
cmd.extend(["-G", ",".join(valid_groups)])
cmd.append(username)
try:
logger.info(f"Adding user: {' '.join(cmd)}")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=config.security.command_timeout,
check=False
)
if result.returncode == 0:
return {
"success": True,
"message": f"User '{username}' created successfully",
"details": {
"username": username,
"groups": groups or [],
"home_created": create_home,
"shell": shell
},
"tool_type": "sensitive_operation"
}
else:
return {
"success": False,
"error": f"Failed to create user: {result.stderr}",
"exit_code": result.returncode,
"tool_type": "sensitive_operation"
}
except Exception as e:
return {
"success": False,
"error": f"Error creating user: {str(e)}",
"tool_type": "sensitive_operation"
}
@gr.tool
def create_file(filepath: str, content: str, permissions: str = "644",
owner: Optional[str] = None) -> dict:
"""
Create a new file with specified content and permissions.
This is a SENSITIVE operation that modifies the filesystem.
Args:
filepath: Full path where file should be created
content: Content to write to the file
permissions: File permissions in octal notation (e.g., 644, 755)
owner: File owner (optional, defaults to current user)
Returns:
Dictionary with success status and details
"""
# Validate file path
if not _validate_filepath(filepath):
return {
"success": False,
"error": "Invalid file path - must be absolute and safe",
"tool_type": "sensitive_operation"
}
# Check if file already exists
if os.path.exists(filepath):
return {
"success": False,
"error": f"File '{filepath}' already exists",
"tool_type": "sensitive_operation"
}
try:
# Create directory if needed
directory = os.path.dirname(filepath)
if directory and not os.path.exists(directory):
os.makedirs(directory, mode=0o755)
# Write file
with open(filepath, 'w') as f:
f.write(content)
# Set permissions
os.chmod(filepath, int(permissions, 8))
# Change owner if specified
if owner:
try:
user_info = pwd.getpwnam(owner)
os.chown(filepath, user_info.pw_uid, user_info.pw_gid)
except KeyError:
logger.warning(f"User '{owner}' not found, keeping current owner")
return {
"success": True,
"message": f"File '{filepath}' created successfully",
"details": {
"path": filepath,
"permissions": permissions,
"owner": owner,
"size": len(content)
},
"tool_type": "sensitive_operation"
}
except Exception as e:
return {
"success": False,
"error": f"Error creating file: {str(e)}",
"tool_type": "sensitive_operation"
}
@gr.tool
def change_permission(path: str, permissions: str, recursive: bool = False,
owner: Optional[str] = None) -> dict:
"""
Change file or directory permissions.
This is a SENSITIVE operation that affects system security.
Args:
path: Path to file or directory
permissions: New permissions in octal notation (e.g., 755, 644)
recursive: Apply permissions recursively (for directories)
owner: Change owner as well (optional)
Returns:
Dictionary with success status and details
"""
if not os.path.exists(path):
return {
"success": False,
"error": f"Path '{path}' does not exist",
"tool_type": "sensitive_operation"
}
try:
# Change permissions
perm_octal = int(permissions, 8)
if recursive and os.path.isdir(path):
# Recursively change permissions
for root, dirs, files in os.walk(path):
os.chmod(root, perm_octal)
for file in files:
os.chmod(os.path.join(root, file), perm_octal)
else:
os.chmod(path, perm_octal)
# Change owner if specified
if owner:
try:
user_info = pwd.getpwnam(owner)
if recursive and os.path.isdir(path):
for root, dirs, files in os.walk(path):
os.chown(root, user_info.pw_uid, user_info.pw_gid)
for file in files:
os.chown(os.path.join(root, file), user_info.pw_uid, user_info.pw_gid)
else:
os.chown(path, user_info.pw_uid, user_info.pw_gid)
except KeyError:
return {
"success": False,
"error": f"User '{owner}' not found",
"tool_type": "sensitive_operation"
}
return {
"success": True,
"message": f"Permissions changed for '{path}'",
"details": {
"path": path,
"permissions": permissions,
"recursive": recursive,
"owner": owner
},
"tool_type": "sensitive_operation"
}
except Exception as e:
return {
"success": False,
"error": f"Error changing permissions: {str(e)}",
"tool_type": "sensitive_operation"
}
# Safe Command Tool (direct execution)
@gr.tool
def run_safe_command(command: str, args: Optional[List[str]] = None,
working_directory: Optional[str] = None) -> dict:
"""
Execute safe, whitelisted system commands for information gathering.
These are READ-ONLY or low-risk operations.
Args:
command: Command to execute (must be from approved whitelist)
args: Command arguments (optional)
working_directory: Directory to run command in (optional)
Returns:
Dictionary with command output and status
"""
if command not in config.security.safe_commands:
return {
"success": False,
"error": f"Command '{command}' not in whitelist",
"whitelist": config.security.safe_commands,
"tool_type": "safe_command"
}
# Build command
cmd_parts = [command]
if args:
# Validate and sanitize arguments
sanitized_args = [shlex.quote(arg) for arg in args if arg]
cmd_parts.extend(sanitized_args)
# Set working directory
if not working_directory:
working_directory = "/home"
try:
logger.info(f"Executing safe command: {' '.join(cmd_parts)} in {working_directory}")
result = subprocess.run(
cmd_parts,
cwd=working_directory,
capture_output=True,
text=True,
timeout=config.security.command_timeout,
check=False # Don't raise exception on non-zero exit
)
return {
"success": result.returncode == 0,
"output": result.stdout,
"error": result.stderr,
"exit_code": result.returncode,
"command": ' '.join(cmd_parts),
"tool_type": "safe_command"
}
except subprocess.TimeoutExpired:
return {
"success": False,
"error": f"Command timed out after {config.security.command_timeout} seconds",
"exit_code": -1,
"tool_type": "safe_command"
}
except Exception as e:
return {
"success": False,
"error": f"Execution error: {str(e)}",
"exit_code": -1,
"tool_type": "safe_command"
}
# Helper functions
def _validate_username(username: str) -> bool:
"""Validate username format"""
if not username or not isinstance(username, str):
return False
if len(username) > 32: # Typical Linux limit
return False
if not username[0].islower() or not username[0].isalpha():
return False
return all(c.islower() or c.isdigit() or c in '_-' for c in username)
def _validate_filepath(filepath: str) -> bool:
"""Basic file path validation"""
if not filepath or not isinstance(filepath, str):
return False
if not os.path.isabs(filepath):
return False
# Prevent path traversal
if '..' in filepath:
return False
# Prevent access to sensitive system files
forbidden_paths = ['/etc/passwd', '/etc/shadow', '/etc/sudoers']
if filepath in forbidden_paths:
return False
return True