import os import asyncio import uvicorn from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse import json from mcp.server import Server from mcp.types import Tool, TextContent # Create FastAPI app app = FastAPI(title="Linux MCP Server") # Create MCP server instance mcp_app = Server("linux-mcp") # ============================================================================ # LINUX MCP HANDLER FUNCTIONS # ============================================================================ def handle_user_management(args): """Generate Linux user management commands""" action = args.get("action") username = args.get("username", "") groupname = args.get("groupname", "") use_sudo = args.get("sudo", True) sudo_prefix = "sudo " if use_sudo else "" commands = [] if action == "add_user_to_group": if not username or not groupname: return [TextContent(type="text", text="❌ Username and groupname required for add_user_to_group")] commands = [ f"# Add user '{username}' to group '{groupname}'", f"{sudo_prefix}usermod -a -G {groupname} {username}", "", "# Alternative methods:", f"{sudo_prefix}gpasswd -a {username} {groupname}", f"{sudo_prefix}adduser {username} {groupname} # Ubuntu/Debian", "", "# Verify group membership:", f"groups {username}", f"id {username}", f"getent group {groupname}" ] elif action == "create_user": if not username: return [TextContent(type="text", text="❌ Username required for create_user")] commands = [ f"# Create new user '{username}'", f"{sudo_prefix}useradd -m -s /bin/bash {username}", f"{sudo_prefix}passwd {username}", "", "# Create user with specific group:", f"{sudo_prefix}useradd -m -g {groupname if groupname else 'users'} -s /bin/bash {username}", "", "# Add to sudo group (Ubuntu/Debian):", f"{sudo_prefix}usermod -a -G sudo {username}", "", "# Add to wheel group (RHEL/CentOS):", f"{sudo_prefix}usermod -a -G wheel {username}", "", "# Verify user creation:", f"id {username}", f"getent passwd {username}" ] elif action == "delete_user": if not username: return [TextContent(type="text", text="❌ Username required for delete_user")] commands = [ f"# Delete user '{username}'", f"{sudo_prefix}userdel {username}", "", "# Delete user and home directory:", f"{sudo_prefix}userdel -r {username}", "", "# Remove user from all groups:", f"{sudo_prefix}gpasswd -d {username} groupname", "", "# Verify user deletion:", f"getent passwd {username}", "# Should return no results if deleted successfully" ] elif action == "list_groups": commands = [ "# List all groups:", "cat /etc/group", "getent group", "", "# List groups for specific user:", f"groups {username}" if username else "groups $USER", f"id {username}" if username else "id $USER", "", "# List users in specific group:", f"getent group {groupname}" if groupname else "getent group sudo", "", "# List all users:", "cut -d: -f1 /etc/passwd", "getent passwd" ] result = "\n".join(commands) return [TextContent(type="text", text=result)] def handle_file_permissions(args): """Generate file permission and ownership commands""" action = args.get("action") path = args.get("path") permissions = args.get("permissions", "755") owner = args.get("owner", "") recursive = args.get("recursive", False) if not path: return [TextContent(type="text", text="❌ Path is required")] commands = [] recursive_flag = "-R " if recursive else "" if action == "chmod": commands = [ f"# Change permissions for {path}", f"chmod {recursive_flag}{permissions} {path}", "", "# Common permission examples:", "# 755 = rwxr-xr-x (executable for owner, readable for others)", "# 644 = rw-r--r-- (readable/writable for owner, readable for others)", "# 600 = rw------- (readable/writable for owner only)", "# 777 = rwxrwxrwx (full permissions for all - use with caution!)", "", "# Symbolic notation examples:", f"chmod {recursive_flag}u+x {path} # Add execute for owner", f"chmod {recursive_flag}g-w {path} # Remove write for group", f"chmod {recursive_flag}o-r {path} # Remove read for others", "", "# Verify permissions:", f"ls -la {path}", f"stat {path}" ] elif action == "chown": if not owner: return [TextContent(type="text", text="❌ Owner is required for chown")] commands = [ f"# Change ownership for {path}", f"sudo chown {recursive_flag}{owner} {path}", "", "# Change owner and group:", f"sudo chown {recursive_flag}{owner}:{owner} {path}", f"sudo chown {recursive_flag}{owner}:users {path}", "", "# Change only group:", f"sudo chgrp {recursive_flag}{owner} {path}", "", "# Verify ownership:", f"ls -la {path}", f"stat {path}" ] elif action == "find_permissions": commands = [ f"# Find files with specific permissions in {path}", f"find {path} -type f -perm {permissions}", f"find {path} -type f -perm -{permissions} # At least these permissions", "", "# Find files by owner:", f"find {path} -user {owner}" if owner else f"find {path} -user $USER", f"find {path} -group {owner}" if owner else f"find {path} -group users", "", "# Find files with dangerous permissions:", f"find {path} -type f -perm -002 # World-writable files", f"find {path} -type f -perm -004 # World-readable files", f"find {path} -type f -perm -006 # World-readable and writable", "", "# Find SUID/SGID files:", f"find {path} -type f -perm -4000 # SUID files", f"find {path} -type f -perm -2000 # SGID files", "", "# Find directories with specific permissions:", f"find {path} -type d -perm 777 # World-writable directories" ] result = "\n".join(commands) return [TextContent(type="text", text=result)] def handle_system_commands(args): """Generate Linux system administration commands""" category = args.get("category") action = args.get("action", "") target = args.get("target", "") commands = [] if category == "process": commands = [ "# Process Management Commands", "ps aux # List all processes with details", "ps -ef # Alternative process listing", "pstree # Show process tree", "", "# Find and manage specific processes:", f"ps aux | grep {target if target else 'nginx'} # Find specific process", f"pgrep -f {target if target else 'nginx'} # Get PID by name", f"pkill -f {target if target else 'nginx'} # Kill process by name", f"killall {target if target else 'nginx'} # Kill all processes by name", "", "# Process control:", "kill -15 PID # Graceful termination (SIGTERM)", "kill -9 PID # Force kill (SIGKILL)", "kill -HUP PID # Reload configuration (SIGHUP)", "", "# Background processes:", "nohup command & # Run command in background", "screen -S session_name # Start screen session", "tmux new -s session_name # Start tmux session", "", "# Process monitoring:", "top # Real-time process monitor", "htop # Enhanced process monitor", "jobs # List background jobs", "bg # Resume job in background", "fg # Bring job to foreground" ] elif category == "disk": commands = [ "# Disk Usage and Management Commands", "df -h # Show disk usage (human readable)", "df -i # Show inode usage", "du -sh /* # Show directory sizes in root", f"du -sh {target if target else '/var/log'}/* | sort -hr # Sort by size", "", "# Disk analysis:", "lsblk # List block devices", "lsblk -f # Show filesystems", "blkid # Show UUIDs and labels", "fdisk -l # List disk partitions", "parted -l # Alternative partition listing", "", "# Mount management:", "mount # Show mounted filesystems", "findmnt # Tree view of mounts", "findmnt -D # Show duplicate mounts", f"mount /dev/sdb1 /mnt/{target if target else 'backup'} # Mount filesystem", f"umount /mnt/{target if target else 'backup'} # Unmount filesystem", "", "# Cleanup commands:", "apt autoremove # Remove unused packages (Debian/Ubuntu)", "yum autoremove # Remove unused packages (RHEL/CentOS)", "journalctl --disk-usage # Check journal disk usage", "journalctl --vacuum-time=3d # Clean old journal entries" ] elif category == "network": commands = [ "# Network Configuration and Diagnostics", "ip addr show # Show IP addresses", "ip link show # Show network interfaces", "ip route show # Show routing table", "ip neigh show # Show ARP table", "", "# Port and connection analysis:", "ss -tuln # Show listening ports", "ss -tulpn # Show ports with process names", "netstat -tuln # Alternative port listing", "netstat -rn # Show routing table", "", "# Connectivity testing:", f"ping -c 4 {target if target else 'google.com'} # Test connectivity", f"traceroute {target if target else 'google.com'} # Trace network path", f"mtr {target if target else 'google.com'} # Network diagnostic tool", f"nslookup {target if target else 'google.com'} # DNS lookup", f"dig {target if target else 'google.com'} # DNS information", "", "# Network configuration:", "wget -O- ifconfig.me # Get public IP", "curl -s ifconfig.me # Alternative public IP", "curl -s ipinfo.io # Detailed IP information", "", "# Interface management:", "sudo ip link set eth0 up # Bring interface up", "sudo ip link set eth0 down # Bring interface down", "sudo dhclient eth0 # Request DHCP lease" ] elif category == "service": commands = [ "# Service Management (systemd)", f"systemctl status {target if target else 'nginx'} # Check service status", f"systemctl start {target if target else 'nginx'} # Start service", f"systemctl stop {target if target else 'nginx'} # Stop service", f"systemctl restart {target if target else 'nginx'} # Restart service", f"systemctl reload {target if target else 'nginx'} # Reload configuration", "", "# Service persistence:", f"systemctl enable {target if target else 'nginx'} # Enable at boot", f"systemctl disable {target if target else 'nginx'} # Disable at boot", f"systemctl is-enabled {target if target else 'nginx'} # Check if enabled", "", "# Service discovery:", "systemctl list-units # List all active services", "systemctl list-units --failed # List failed services", "systemctl list-unit-files # List all unit files", "", "# Logs and troubleshooting:", f"journalctl -u {target if target else 'nginx'} # View service logs", f"journalctl -u {target if target else 'nginx'} -f # Follow service logs", f"journalctl -u {target if target else 'nginx'} --since today # Today's logs", "", "# Legacy service management (SysV):", f"service {target if target else 'nginx'} status # Check status (legacy)", f"service {target if target else 'nginx'} start # Start service (legacy)", f"/etc/init.d/{target if target else 'nginx'} status # Direct init script" ] elif category == "firewall": commands = [ "# Firewall Management", "sudo ufw status verbose # Check UFW status (detailed)", "sudo ufw --version # Check UFW version", "", "# UFW basic operations:", "sudo ufw enable # Enable UFW", "sudo ufw disable # Disable UFW", "sudo ufw reset # Reset UFW rules", "", "# UFW rule management:", f"sudo ufw allow {target if target else '22'} # Allow port/service", f"sudo ufw deny {target if target else '23'} # Block port/service", "sudo ufw allow ssh # Allow SSH service", "sudo ufw allow 80/tcp # Allow HTTP", "sudo ufw allow 443/tcp # Allow HTTPS", "sudo ufw allow from 192.168.1.0/24 # Allow from subnet", "", "# UFW advanced rules:", "sudo ufw delete allow 80 # Remove rule", "sudo ufw insert 1 allow 22 # Insert rule at position", "sudo ufw logging on # Enable logging", "", "# iptables (advanced):", "sudo iptables -L # List iptables rules", "sudo iptables -L -n # List with numeric output", "sudo iptables -A INPUT -p tcp --dport 80 -j ACCEPT # Allow HTTP", "sudo iptables -A INPUT -p tcp --dport 22 -j DROP # Block SSH", "", "# Save iptables rules:", "sudo iptables-save > /etc/iptables/rules.v4 # Save IPv4 rules", "sudo ip6tables-save > /etc/iptables/rules.v6 # Save IPv6 rules" ] result = "\n".join(commands) return [TextContent(type="text", text=result)] # ============================================================================ # MCP SERVER CONFIGURATION # ============================================================================ @mcp_app.list_tools() async def list_tools(): """List all available Linux system administration tools""" return [ Tool( name="user_management", description="Generate Linux user management commands for creating, deleting, and managing users and groups", inputSchema={ "type": "object", "properties": { "action": { "type": "string", "enum": ["add_user_to_group", "create_user", "delete_user", "list_groups"], "description": "User management action to perform" }, "username": { "type": "string", "description": "Username for the operation" }, "groupname": { "type": "string", "description": "Group name for the operation" }, "sudo": { "type": "boolean", "default": True, "description": "Whether to use sudo for privileged operations" } }, "required": ["action"] } ), Tool( name="file_permissions", description="Generate file permission and ownership commands for chmod, chown, and permission discovery", inputSchema={ "type": "object", "properties": { "action": { "type": "string", "enum": ["chmod", "chown", "find_permissions"], "description": "File permission action to perform" }, "path": { "type": "string", "description": "File or directory path" }, "permissions": { "type": "string", "description": "Permission mode (e.g., 755, 644, u+x)" }, "owner": { "type": "string", "description": "Owner username for chown operations" }, "recursive": { "type": "boolean", "default": False, "description": "Apply operation recursively" } }, "required": ["action", "path"] } ), Tool( name="system_commands", description="Generate common Linux system administration commands for processes, disk, network, services, and firewall", inputSchema={ "type": "object", "properties": { "category": { "type": "string", "enum": ["process", "disk", "network", "service", "firewall"], "description": "Category of system commands" }, "action": { "type": "string", "description": "Specific action within the category" }, "target": { "type": "string", "description": "Target for the operation (service name, hostname, etc.)" } }, "required": ["category"] } ) ] @mcp_app.call_tool() async def call_tool(name: str, arguments: dict): """Handle tool calls""" try: if name == "user_management": return handle_user_management(arguments) elif name == "file_permissions": return handle_file_permissions(arguments) elif name == "system_commands": return handle_system_commands(arguments) else: return [TextContent(type="text", text=f"Unknown tool: {name}")] except Exception as e: return [TextContent(type="text", text=f"Error: {str(e)}")] # ============================================================================ # FASTAPI WEB ENDPOINTS # ============================================================================ @app.get("/") async def root(): """Status endpoint""" return { "service": "linux-mcp-server", "status": "running", "mcp_endpoint": "/mcp/sse", "tools": 3, "categories": ["user_management", "file_permissions", "system_commands"] } @app.get("/health") async def health(): """Health check""" return {"status": "healthy", "service": "linux-mcp-server"} @app.get("/mcp/sse") async def mcp_sse_endpoint(request: Request): """MCP SSE endpoint for agent connections""" async def event_stream(): try: while True: if await request.is_disconnected(): break yield f"data: {json.dumps({'type': 'keepalive'})}\n\n" await asyncio.sleep(30) except asyncio.CancelledError: pass return StreamingResponse( event_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", } ) def main(): """Main entry point""" port = int(os.getenv("PORT", 7860)) print(f"🚀 Starting Linux MCP Server on port {port}") print(f"📊 Status: http://localhost:{port}/") print(f"🔗 MCP SSE: http://localhost:{port}/mcp/sse") print(f"🛠️ Tools: User Management, File Permissions, System Commands") uvicorn.run(app, host="0.0.0.0", port=port, log_level="info") if __name__ == "__main__": main()