Spaces:
Sleeping
Sleeping
| import os | |
| import asyncio | |
| import uvicorn | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import StreamingResponse | |
| import json | |
| from mcp.server import Server | |
| from mcp.types import Tool, TextContent | |
| # Create FastAPI app | |
| app = FastAPI(title="Linux MCP Server") | |
| # Create MCP server instance | |
| mcp_app = Server("linux-mcp") | |
| # ============================================================================ | |
| # LINUX MCP HANDLER FUNCTIONS | |
| # ============================================================================ | |
| def handle_user_management(args): | |
| """Generate Linux user management commands""" | |
| action = args.get("action") | |
| username = args.get("username", "") | |
| groupname = args.get("groupname", "") | |
| use_sudo = args.get("sudo", True) | |
| sudo_prefix = "sudo " if use_sudo else "" | |
| commands = [] | |
| if action == "add_user_to_group": | |
| if not username or not groupname: | |
| return [TextContent(type="text", text="β Username and groupname required for add_user_to_group")] | |
| commands = [ | |
| f"# Add user '{username}' to group '{groupname}'", | |
| f"{sudo_prefix}usermod -a -G {groupname} {username}", | |
| "", | |
| "# Alternative methods:", | |
| f"{sudo_prefix}gpasswd -a {username} {groupname}", | |
| f"{sudo_prefix}adduser {username} {groupname} # Ubuntu/Debian", | |
| "", | |
| "# Verify group membership:", | |
| f"groups {username}", | |
| f"id {username}", | |
| f"getent group {groupname}" | |
| ] | |
| elif action == "create_user": | |
| if not username: | |
| return [TextContent(type="text", text="β Username required for create_user")] | |
| commands = [ | |
| f"# Create new user '{username}'", | |
| f"{sudo_prefix}useradd -m -s /bin/bash {username}", | |
| f"{sudo_prefix}passwd {username}", | |
| "", | |
| "# Create user with specific group:", | |
| f"{sudo_prefix}useradd -m -g {groupname if groupname else 'users'} -s /bin/bash {username}", | |
| "", | |
| "# Add to sudo group (Ubuntu/Debian):", | |
| f"{sudo_prefix}usermod -a -G sudo {username}", | |
| "", | |
| "# Add to wheel group (RHEL/CentOS):", | |
| f"{sudo_prefix}usermod -a -G wheel {username}", | |
| "", | |
| "# Verify user creation:", | |
| f"id {username}", | |
| f"getent passwd {username}" | |
| ] | |
| elif action == "delete_user": | |
| if not username: | |
| return [TextContent(type="text", text="β Username required for delete_user")] | |
| commands = [ | |
| f"# Delete user '{username}'", | |
| f"{sudo_prefix}userdel {username}", | |
| "", | |
| "# Delete user and home directory:", | |
| f"{sudo_prefix}userdel -r {username}", | |
| "", | |
| "# Remove user from all groups:", | |
| f"{sudo_prefix}gpasswd -d {username} groupname", | |
| "", | |
| "# Verify user deletion:", | |
| f"getent passwd {username}", | |
| "# Should return no results if deleted successfully" | |
| ] | |
| elif action == "list_groups": | |
| commands = [ | |
| "# List all groups:", | |
| "cat /etc/group", | |
| "getent group", | |
| "", | |
| "# List groups for specific user:", | |
| f"groups {username}" if username else "groups $USER", | |
| f"id {username}" if username else "id $USER", | |
| "", | |
| "# List users in specific group:", | |
| f"getent group {groupname}" if groupname else "getent group sudo", | |
| "", | |
| "# List all users:", | |
| "cut -d: -f1 /etc/passwd", | |
| "getent passwd" | |
| ] | |
| result = "\n".join(commands) | |
| return [TextContent(type="text", text=result)] | |
| def handle_file_permissions(args): | |
| """Generate file permission and ownership commands""" | |
| action = args.get("action") | |
| path = args.get("path") | |
| permissions = args.get("permissions", "755") | |
| owner = args.get("owner", "") | |
| recursive = args.get("recursive", False) | |
| if not path: | |
| return [TextContent(type="text", text="β Path is required")] | |
| commands = [] | |
| recursive_flag = "-R " if recursive else "" | |
| if action == "chmod": | |
| commands = [ | |
| f"# Change permissions for {path}", | |
| f"chmod {recursive_flag}{permissions} {path}", | |
| "", | |
| "# Common permission examples:", | |
| "# 755 = rwxr-xr-x (executable for owner, readable for others)", | |
| "# 644 = rw-r--r-- (readable/writable for owner, readable for others)", | |
| "# 600 = rw------- (readable/writable for owner only)", | |
| "# 777 = rwxrwxrwx (full permissions for all - use with caution!)", | |
| "", | |
| "# Symbolic notation examples:", | |
| f"chmod {recursive_flag}u+x {path} # Add execute for owner", | |
| f"chmod {recursive_flag}g-w {path} # Remove write for group", | |
| f"chmod {recursive_flag}o-r {path} # Remove read for others", | |
| "", | |
| "# Verify permissions:", | |
| f"ls -la {path}", | |
| f"stat {path}" | |
| ] | |
| elif action == "chown": | |
| if not owner: | |
| return [TextContent(type="text", text="β Owner is required for chown")] | |
| commands = [ | |
| f"# Change ownership for {path}", | |
| f"sudo chown {recursive_flag}{owner} {path}", | |
| "", | |
| "# Change owner and group:", | |
| f"sudo chown {recursive_flag}{owner}:{owner} {path}", | |
| f"sudo chown {recursive_flag}{owner}:users {path}", | |
| "", | |
| "# Change only group:", | |
| f"sudo chgrp {recursive_flag}{owner} {path}", | |
| "", | |
| "# Verify ownership:", | |
| f"ls -la {path}", | |
| f"stat {path}" | |
| ] | |
| elif action == "find_permissions": | |
| commands = [ | |
| f"# Find files with specific permissions in {path}", | |
| f"find {path} -type f -perm {permissions}", | |
| f"find {path} -type f -perm -{permissions} # At least these permissions", | |
| "", | |
| "# Find files by owner:", | |
| f"find {path} -user {owner}" if owner else f"find {path} -user $USER", | |
| f"find {path} -group {owner}" if owner else f"find {path} -group users", | |
| "", | |
| "# Find files with dangerous permissions:", | |
| f"find {path} -type f -perm -002 # World-writable files", | |
| f"find {path} -type f -perm -004 # World-readable files", | |
| f"find {path} -type f -perm -006 # World-readable and writable", | |
| "", | |
| "# Find SUID/SGID files:", | |
| f"find {path} -type f -perm -4000 # SUID files", | |
| f"find {path} -type f -perm -2000 # SGID files", | |
| "", | |
| "# Find directories with specific permissions:", | |
| f"find {path} -type d -perm 777 # World-writable directories" | |
| ] | |
| result = "\n".join(commands) | |
| return [TextContent(type="text", text=result)] | |
| def handle_system_commands(args): | |
| """Generate Linux system administration commands""" | |
| category = args.get("category") | |
| action = args.get("action", "") | |
| target = args.get("target", "") | |
| commands = [] | |
| if category == "process": | |
| commands = [ | |
| "# Process Management Commands", | |
| "ps aux # List all processes with details", | |
| "ps -ef # Alternative process listing", | |
| "pstree # Show process tree", | |
| "", | |
| "# Find and manage specific processes:", | |
| f"ps aux | grep {target if target else 'nginx'} # Find specific process", | |
| f"pgrep -f {target if target else 'nginx'} # Get PID by name", | |
| f"pkill -f {target if target else 'nginx'} # Kill process by name", | |
| f"killall {target if target else 'nginx'} # Kill all processes by name", | |
| "", | |
| "# Process control:", | |
| "kill -15 PID # Graceful termination (SIGTERM)", | |
| "kill -9 PID # Force kill (SIGKILL)", | |
| "kill -HUP PID # Reload configuration (SIGHUP)", | |
| "", | |
| "# Background processes:", | |
| "nohup command & # Run command in background", | |
| "screen -S session_name # Start screen session", | |
| "tmux new -s session_name # Start tmux session", | |
| "", | |
| "# Process monitoring:", | |
| "top # Real-time process monitor", | |
| "htop # Enhanced process monitor", | |
| "jobs # List background jobs", | |
| "bg # Resume job in background", | |
| "fg # Bring job to foreground" | |
| ] | |
| elif category == "disk": | |
| commands = [ | |
| "# Disk Usage and Management Commands", | |
| "df -h # Show disk usage (human readable)", | |
| "df -i # Show inode usage", | |
| "du -sh /* # Show directory sizes in root", | |
| f"du -sh {target if target else '/var/log'}/* | sort -hr # Sort by size", | |
| "", | |
| "# Disk analysis:", | |
| "lsblk # List block devices", | |
| "lsblk -f # Show filesystems", | |
| "blkid # Show UUIDs and labels", | |
| "fdisk -l # List disk partitions", | |
| "parted -l # Alternative partition listing", | |
| "", | |
| "# Mount management:", | |
| "mount # Show mounted filesystems", | |
| "findmnt # Tree view of mounts", | |
| "findmnt -D # Show duplicate mounts", | |
| f"mount /dev/sdb1 /mnt/{target if target else 'backup'} # Mount filesystem", | |
| f"umount /mnt/{target if target else 'backup'} # Unmount filesystem", | |
| "", | |
| "# Cleanup commands:", | |
| "apt autoremove # Remove unused packages (Debian/Ubuntu)", | |
| "yum autoremove # Remove unused packages (RHEL/CentOS)", | |
| "journalctl --disk-usage # Check journal disk usage", | |
| "journalctl --vacuum-time=3d # Clean old journal entries" | |
| ] | |
| elif category == "network": | |
| commands = [ | |
| "# Network Configuration and Diagnostics", | |
| "ip addr show # Show IP addresses", | |
| "ip link show # Show network interfaces", | |
| "ip route show # Show routing table", | |
| "ip neigh show # Show ARP table", | |
| "", | |
| "# Port and connection analysis:", | |
| "ss -tuln # Show listening ports", | |
| "ss -tulpn # Show ports with process names", | |
| "netstat -tuln # Alternative port listing", | |
| "netstat -rn # Show routing table", | |
| "", | |
| "# Connectivity testing:", | |
| f"ping -c 4 {target if target else 'google.com'} # Test connectivity", | |
| f"traceroute {target if target else 'google.com'} # Trace network path", | |
| f"mtr {target if target else 'google.com'} # Network diagnostic tool", | |
| f"nslookup {target if target else 'google.com'} # DNS lookup", | |
| f"dig {target if target else 'google.com'} # DNS information", | |
| "", | |
| "# Network configuration:", | |
| "wget -O- ifconfig.me # Get public IP", | |
| "curl -s ifconfig.me # Alternative public IP", | |
| "curl -s ipinfo.io # Detailed IP information", | |
| "", | |
| "# Interface management:", | |
| "sudo ip link set eth0 up # Bring interface up", | |
| "sudo ip link set eth0 down # Bring interface down", | |
| "sudo dhclient eth0 # Request DHCP lease" | |
| ] | |
| elif category == "service": | |
| commands = [ | |
| "# Service Management (systemd)", | |
| f"systemctl status {target if target else 'nginx'} # Check service status", | |
| f"systemctl start {target if target else 'nginx'} # Start service", | |
| f"systemctl stop {target if target else 'nginx'} # Stop service", | |
| f"systemctl restart {target if target else 'nginx'} # Restart service", | |
| f"systemctl reload {target if target else 'nginx'} # Reload configuration", | |
| "", | |
| "# Service persistence:", | |
| f"systemctl enable {target if target else 'nginx'} # Enable at boot", | |
| f"systemctl disable {target if target else 'nginx'} # Disable at boot", | |
| f"systemctl is-enabled {target if target else 'nginx'} # Check if enabled", | |
| "", | |
| "# Service discovery:", | |
| "systemctl list-units # List all active services", | |
| "systemctl list-units --failed # List failed services", | |
| "systemctl list-unit-files # List all unit files", | |
| "", | |
| "# Logs and troubleshooting:", | |
| f"journalctl -u {target if target else 'nginx'} # View service logs", | |
| f"journalctl -u {target if target else 'nginx'} -f # Follow service logs", | |
| f"journalctl -u {target if target else 'nginx'} --since today # Today's logs", | |
| "", | |
| "# Legacy service management (SysV):", | |
| f"service {target if target else 'nginx'} status # Check status (legacy)", | |
| f"service {target if target else 'nginx'} start # Start service (legacy)", | |
| f"/etc/init.d/{target if target else 'nginx'} status # Direct init script" | |
| ] | |
| elif category == "firewall": | |
| commands = [ | |
| "# Firewall Management", | |
| "sudo ufw status verbose # Check UFW status (detailed)", | |
| "sudo ufw --version # Check UFW version", | |
| "", | |
| "# UFW basic operations:", | |
| "sudo ufw enable # Enable UFW", | |
| "sudo ufw disable # Disable UFW", | |
| "sudo ufw reset # Reset UFW rules", | |
| "", | |
| "# UFW rule management:", | |
| f"sudo ufw allow {target if target else '22'} # Allow port/service", | |
| f"sudo ufw deny {target if target else '23'} # Block port/service", | |
| "sudo ufw allow ssh # Allow SSH service", | |
| "sudo ufw allow 80/tcp # Allow HTTP", | |
| "sudo ufw allow 443/tcp # Allow HTTPS", | |
| "sudo ufw allow from 192.168.1.0/24 # Allow from subnet", | |
| "", | |
| "# UFW advanced rules:", | |
| "sudo ufw delete allow 80 # Remove rule", | |
| "sudo ufw insert 1 allow 22 # Insert rule at position", | |
| "sudo ufw logging on # Enable logging", | |
| "", | |
| "# iptables (advanced):", | |
| "sudo iptables -L # List iptables rules", | |
| "sudo iptables -L -n # List with numeric output", | |
| "sudo iptables -A INPUT -p tcp --dport 80 -j ACCEPT # Allow HTTP", | |
| "sudo iptables -A INPUT -p tcp --dport 22 -j DROP # Block SSH", | |
| "", | |
| "# Save iptables rules:", | |
| "sudo iptables-save > /etc/iptables/rules.v4 # Save IPv4 rules", | |
| "sudo ip6tables-save > /etc/iptables/rules.v6 # Save IPv6 rules" | |
| ] | |
| result = "\n".join(commands) | |
| return [TextContent(type="text", text=result)] | |
| # ============================================================================ | |
| # MCP SERVER CONFIGURATION | |
| # ============================================================================ | |
| async def list_tools(): | |
| """List all available Linux system administration tools""" | |
| return [ | |
| Tool( | |
| name="user_management", | |
| description="Generate Linux user management commands for creating, deleting, and managing users and groups", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "action": { | |
| "type": "string", | |
| "enum": ["add_user_to_group", "create_user", "delete_user", "list_groups"], | |
| "description": "User management action to perform" | |
| }, | |
| "username": { | |
| "type": "string", | |
| "description": "Username for the operation" | |
| }, | |
| "groupname": { | |
| "type": "string", | |
| "description": "Group name for the operation" | |
| }, | |
| "sudo": { | |
| "type": "boolean", | |
| "default": True, | |
| "description": "Whether to use sudo for privileged operations" | |
| } | |
| }, | |
| "required": ["action"] | |
| } | |
| ), | |
| Tool( | |
| name="file_permissions", | |
| description="Generate file permission and ownership commands for chmod, chown, and permission discovery", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "action": { | |
| "type": "string", | |
| "enum": ["chmod", "chown", "find_permissions"], | |
| "description": "File permission action to perform" | |
| }, | |
| "path": { | |
| "type": "string", | |
| "description": "File or directory path" | |
| }, | |
| "permissions": { | |
| "type": "string", | |
| "description": "Permission mode (e.g., 755, 644, u+x)" | |
| }, | |
| "owner": { | |
| "type": "string", | |
| "description": "Owner username for chown operations" | |
| }, | |
| "recursive": { | |
| "type": "boolean", | |
| "default": False, | |
| "description": "Apply operation recursively" | |
| } | |
| }, | |
| "required": ["action", "path"] | |
| } | |
| ), | |
| Tool( | |
| name="system_commands", | |
| description="Generate common Linux system administration commands for processes, disk, network, services, and firewall", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "category": { | |
| "type": "string", | |
| "enum": ["process", "disk", "network", "service", "firewall"], | |
| "description": "Category of system commands" | |
| }, | |
| "action": { | |
| "type": "string", | |
| "description": "Specific action within the category" | |
| }, | |
| "target": { | |
| "type": "string", | |
| "description": "Target for the operation (service name, hostname, etc.)" | |
| } | |
| }, | |
| "required": ["category"] | |
| } | |
| ) | |
| ] | |
| async def call_tool(name: str, arguments: dict): | |
| """Handle tool calls""" | |
| try: | |
| if name == "user_management": | |
| return handle_user_management(arguments) | |
| elif name == "file_permissions": | |
| return handle_file_permissions(arguments) | |
| elif name == "system_commands": | |
| return handle_system_commands(arguments) | |
| else: | |
| return [TextContent(type="text", text=f"Unknown tool: {name}")] | |
| except Exception as e: | |
| return [TextContent(type="text", text=f"Error: {str(e)}")] | |
| # ============================================================================ | |
| # FASTAPI WEB ENDPOINTS | |
| # ============================================================================ | |
| async def root(): | |
| """Status endpoint""" | |
| return { | |
| "service": "linux-mcp-server", | |
| "status": "running", | |
| "mcp_endpoint": "/mcp/sse", | |
| "tools": 3, | |
| "categories": ["user_management", "file_permissions", "system_commands"] | |
| } | |
| async def health(): | |
| """Health check""" | |
| return {"status": "healthy", "service": "linux-mcp-server"} | |
| async def mcp_sse_endpoint(request: Request): | |
| """MCP SSE endpoint for agent connections""" | |
| async def event_stream(): | |
| try: | |
| while True: | |
| if await request.is_disconnected(): | |
| break | |
| yield f"data: {json.dumps({'type': 'keepalive'})}\n\n" | |
| await asyncio.sleep(30) | |
| except asyncio.CancelledError: | |
| pass | |
| return StreamingResponse( | |
| event_stream(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "Access-Control-Allow-Origin": "*", | |
| } | |
| ) | |
| def main(): | |
| """Main entry point""" | |
| port = int(os.getenv("PORT", 7860)) | |
| print(f"π Starting Linux MCP Server on port {port}") | |
| print(f"π Status: http://localhost:{port}/") | |
| print(f"π MCP SSE: http://localhost:{port}/mcp/sse") | |
| print(f"π οΈ Tools: User Management, File Permissions, System Commands") | |
| uvicorn.run(app, host="0.0.0.0", port=port, log_level="info") | |
| if __name__ == "__main__": | |
| main() |