Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import openai | |
| import requests | |
| import json | |
| from typing import Dict, List, Any | |
| import os | |
| from datetime import datetime | |
| class MCPClient: | |
| def __init__(self, openai_api_key: str): | |
| """Initialize the MCP Client with OpenAI integration""" | |
| self.openai_client = openai.OpenAI(api_key=openai_api_key) | |
| # MCP Server configurations | |
| self.mcp_servers = { | |
| "terraform": { | |
| "name": "MCP Terraform", | |
| "space_url": "https://chrissacrumcor-mcp-terraform.hf.space", | |
| "description": "Terraform infrastructure management" | |
| }, | |
| "linux": { | |
| "name": "MCP Linux", | |
| "space_url": "https://chrissacrumcor-mcp-linux.hf.space", | |
| "description": "Linux system operations", | |
| # Known tools from the Linux server code | |
| "tools": [ | |
| { | |
| "name": "user_management", | |
| "description": "Generate Linux user management commands for creating, deleting, and managing users and groups" | |
| }, | |
| { | |
| "name": "file_permissions", | |
| "description": "Generate file permission and ownership commands for chmod, chown, and permission discovery" | |
| }, | |
| { | |
| "name": "system_commands", | |
| "description": "Generate common Linux system administration commands for processes, disk, network, services, and firewall" | |
| } | |
| ] | |
| }, | |
| "cisco": { | |
| "name": "MCP Cisco", | |
| "space_url": "https://chrissacrumcor-mcp-cisco.hf.space", | |
| "description": "Cisco network management" | |
| } | |
| } | |
| self.conversation_history = [] | |
| def check_server_health(self, server_key: str) -> Dict[str, Any]: | |
| """Check if MCP server is responsive""" | |
| try: | |
| server_config = self.mcp_servers[server_key] | |
| # Try health endpoints | |
| health_endpoints = ["/health", "/", "/docs"] | |
| for endpoint in health_endpoints: | |
| try: | |
| url = f"{server_config['space_url']}{endpoint}" | |
| response = requests.get(url, timeout=10) | |
| if response.status_code == 200: | |
| return { | |
| "success": True, | |
| "status": "online", | |
| "endpoint": endpoint, | |
| "response": response.json() if endpoint in ["/health", "/"] else {"status": "docs_available"} | |
| } | |
| elif response.status_code in [404, 422]: | |
| return { | |
| "success": True, | |
| "status": "online_different_structure", | |
| "endpoint": endpoint | |
| } | |
| except requests.RequestException: | |
| continue | |
| return {"success": False, "status": "offline", "error": "No responsive endpoints"} | |
| except Exception as e: | |
| return {"success": False, "status": "error", "error": str(e)} | |
| def execute_linux_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str: | |
| """Execute Linux MCP tools - recreated from server logic""" | |
| if tool_name == "user_management": | |
| action = arguments.get("action", "") | |
| username = arguments.get("username", "example_user") | |
| groupname = arguments.get("groupname", "example_group") | |
| if action == "add_user_to_group": | |
| return f"""**Add User to Group Commands:** | |
| ```bash | |
| # Add user '{username}' to group '{groupname}' | |
| sudo usermod -a -G {groupname} {username} | |
| # Alternative methods: | |
| sudo gpasswd -a {username} {groupname} | |
| sudo adduser {username} {groupname} # Ubuntu/Debian | |
| # Verify group membership: | |
| groups {username} | |
| id {username} | |
| getent group {groupname} | |
| ```""" | |
| elif action == "create_user": | |
| return f"""**Create User Commands:** | |
| ```bash | |
| # Create new user '{username}' | |
| sudo useradd -m -s /bin/bash {username} | |
| sudo passwd {username} | |
| # Create user with specific group: | |
| sudo useradd -m -g {groupname} -s /bin/bash {username} | |
| # Add to sudo group (Ubuntu/Debian): | |
| sudo usermod -a -G sudo {username} | |
| # Add to wheel group (RHEL/CentOS): | |
| sudo usermod -a -G wheel {username} | |
| # Verify user creation: | |
| id {username} | |
| getent passwd {username} | |
| ```""" | |
| elif action == "delete_user": | |
| return f"""**Delete User Commands:** | |
| ```bash | |
| # Delete user '{username}' | |
| sudo userdel {username} | |
| # Delete user and home directory: | |
| sudo userdel -r {username} | |
| # Remove user from all groups first (if needed): | |
| sudo gpasswd -d {username} groupname | |
| # Verify user deletion: | |
| getent passwd {username} | |
| # Should return no results if deleted successfully | |
| ```""" | |
| elif action == "list_groups": | |
| return """**List Groups and Users Commands:** | |
| ```bash | |
| # List all groups: | |
| cat /etc/group | |
| getent group | |
| # List groups for current user: | |
| groups $USER | |
| id $USER | |
| # List groups for specific user: | |
| groups username | |
| id username | |
| # List users in specific group: | |
| getent group sudo | |
| getent group wheel | |
| # List all users: | |
| cut -d: -f1 /etc/passwd | |
| getent passwd | |
| ```""" | |
| else: | |
| return f"β Unknown user management action: {action}. Available: add_user_to_group, create_user, delete_user, list_groups" | |
| elif tool_name == "file_permissions": | |
| action = arguments.get("action", "") | |
| path = arguments.get("path", "/path/to/file") | |
| permissions = arguments.get("permissions", "755") | |
| owner = arguments.get("owner", "username") | |
| recursive = arguments.get("recursive", False) | |
| recursive_flag = "-R " if recursive else "" | |
| if action == "chmod": | |
| return f"""**Change File Permissions Commands:** | |
| ```bash | |
| # Change permissions for {path} | |
| chmod {recursive_flag}{permissions} {path} | |
| # Common permission examples: | |
| # 755 = rwxr-xr-x (executable for owner, readable for others) | |
| # 644 = rw-r--r-- (readable/writable for owner, readable for others) | |
| # 600 = rw------- (readable/writable for owner only) | |
| # 777 = rwxrwxrwx (full permissions for all - use with caution!) | |
| # Symbolic notation examples: | |
| chmod {recursive_flag}u+x {path} # Add execute for owner | |
| chmod {recursive_flag}g-w {path} # Remove write for group | |
| chmod {recursive_flag}o-r {path} # Remove read for others | |
| # Verify permissions: | |
| ls -la {path} | |
| stat {path} | |
| ```""" | |
| elif action == "chown": | |
| return f"""**Change File Ownership Commands:** | |
| ```bash | |
| # Change ownership for {path} | |
| sudo chown {recursive_flag}{owner} {path} | |
| # Change owner and group: | |
| sudo chown {recursive_flag}{owner}:{owner} {path} | |
| sudo chown {recursive_flag}{owner}:users {path} | |
| # Change only group: | |
| sudo chgrp {recursive_flag}{owner} {path} | |
| # Verify ownership: | |
| ls -la {path} | |
| stat {path} | |
| ```""" | |
| elif action == "find_permissions": | |
| return f"""**Find Files by Permissions:** | |
| ```bash | |
| # Find files with specific permissions in {path} | |
| find {path} -type f -perm {permissions} | |
| find {path} -type f -perm -{permissions} # At least these permissions | |
| # Find files by owner: | |
| find {path} -user {owner} | |
| find {path} -group {owner} | |
| # Find files with dangerous permissions: | |
| find {path} -type f -perm -002 # World-writable files | |
| find {path} -type f -perm -004 # World-readable files | |
| # Find SUID/SGID files: | |
| find {path} -type f -perm -4000 # SUID files | |
| find {path} -type f -perm -2000 # SGID files | |
| # Find world-writable directories: | |
| find {path} -type d -perm 777 | |
| ```""" | |
| else: | |
| return f"β Unknown file permission action: {action}. Available: chmod, chown, find_permissions" | |
| elif tool_name == "system_commands": | |
| category = arguments.get("category", "") | |
| target = arguments.get("target", "nginx") | |
| if category == "process": | |
| return f"""**Process Management Commands:** | |
| ```bash | |
| # List and analyze processes: | |
| ps aux # List all processes with details | |
| ps -ef # Alternative process listing | |
| pstree # Show process tree | |
| top # Real-time process monitor | |
| htop # Enhanced process monitor | |
| # Find and manage specific processes: | |
| ps aux | grep {target} # Find specific process | |
| pgrep -f {target} # Get PID by name | |
| pkill -f {target} # Kill process by name | |
| killall {target} # Kill all processes by name | |
| # Process control: | |
| kill -15 PID # Graceful termination (SIGTERM) | |
| kill -9 PID # Force kill (SIGKILL) | |
| kill -HUP PID # Reload configuration (SIGHUP) | |
| # Background processes: | |
| nohup command & # Run command in background | |
| screen -S session_name # Start screen session | |
| tmux new -s session_name # Start tmux session | |
| ```""" | |
| elif category == "service": | |
| return f"""**Service Management Commands (systemd):** | |
| ```bash | |
| # Service status and control: | |
| systemctl status {target} # Check service status | |
| systemctl start {target} # Start service | |
| systemctl stop {target} # Stop service | |
| systemctl restart {target} # Restart service | |
| systemctl reload {target} # Reload configuration | |
| # Service persistence: | |
| systemctl enable {target} # Enable at boot | |
| systemctl disable {target} # Disable at boot | |
| systemctl is-enabled {target} # Check if enabled | |
| # Service discovery: | |
| systemctl list-units # List all active services | |
| systemctl list-units --failed # List failed services | |
| systemctl list-unit-files # List all unit files | |
| # Service logs: | |
| journalctl -u {target} # View service logs | |
| journalctl -u {target} -f # Follow service logs | |
| journalctl -u {target} --since today # Today's logs | |
| ```""" | |
| elif category == "network": | |
| return f"""**Network Diagnostic Commands:** | |
| ```bash | |
| # Network interface information: | |
| ip addr show # Show IP addresses | |
| ip link show # Show network interfaces | |
| ip route show # Show routing table | |
| ip neigh show # Show ARP table | |
| # Port and connection analysis: | |
| ss -tuln # Show listening ports | |
| ss -tulpn # Show ports with process names | |
| netstat -tuln # Alternative port listing | |
| netstat -rn # Show routing table | |
| # Connectivity testing: | |
| ping -c 4 {target} # Test connectivity | |
| traceroute {target} # Trace network path | |
| mtr {target} # Network diagnostic tool | |
| nslookup {target} # DNS lookup | |
| dig {target} # DNS information | |
| # Network utilities: | |
| wget -O- ifconfig.me # Get public IP | |
| curl -s ifconfig.me # Alternative public IP | |
| curl -s ipinfo.io # Detailed IP information | |
| ```""" | |
| elif category == "disk": | |
| return f"""**Disk Usage and Management Commands:** | |
| ```bash | |
| # Disk usage analysis: | |
| df -h # Show disk usage (human readable) | |
| df -i # Show inode usage | |
| du -sh /* # Show directory sizes in root | |
| du -sh /var/log/* | sort -hr # Sort by size | |
| # Disk and filesystem information: | |
| lsblk # List block devices | |
| lsblk -f # Show filesystems | |
| blkid # Show UUIDs and labels | |
| fdisk -l # List disk partitions | |
| parted -l # Alternative partition listing | |
| # Mount management: | |
| mount # Show mounted filesystems | |
| findmnt # Tree view of mounts | |
| mount /dev/sdb1 /mnt/backup # Mount filesystem | |
| umount /mnt/backup # Unmount filesystem | |
| # Cleanup commands: | |
| apt autoremove # Remove unused packages (Debian/Ubuntu) | |
| yum autoremove # Remove unused packages (RHEL/CentOS) | |
| journalctl --disk-usage # Check journal disk usage | |
| journalctl --vacuum-time=3d # Clean old journal entries | |
| ```""" | |
| elif category == "firewall": | |
| return f"""**Firewall Management Commands:** | |
| ```bash | |
| # UFW (Uncomplicated Firewall): | |
| sudo ufw status verbose # Check UFW status (detailed) | |
| sudo ufw enable # Enable UFW | |
| sudo ufw disable # Disable UFW | |
| # UFW rule management: | |
| sudo ufw allow {target} # Allow port/service | |
| sudo ufw deny {target} # Block port/service | |
| sudo ufw allow ssh # Allow SSH service | |
| sudo ufw allow 80/tcp # Allow HTTP | |
| sudo ufw allow 443/tcp # Allow HTTPS | |
| sudo ufw allow from 192.168.1.0/24 # Allow from subnet | |
| # UFW advanced operations: | |
| sudo ufw delete allow 80 # Remove rule | |
| sudo ufw insert 1 allow 22 # Insert rule at position | |
| sudo ufw logging on # Enable logging | |
| # iptables (advanced): | |
| sudo iptables -L # List iptables rules | |
| sudo iptables -L -n # List with numeric output | |
| sudo iptables -A INPUT -p tcp --dport 80 -j ACCEPT # Allow HTTP | |
| sudo iptables-save > /etc/iptables/rules.v4 # Save rules | |
| ```""" | |
| else: | |
| return f"β Unknown system command category: {category}. Available: process, service, network, disk, firewall" | |
| else: | |
| return f"β Unknown tool: {tool_name}. Available: user_management, file_permissions, system_commands" | |
| def execute_cisco_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str: | |
| """Execute Cisco MCP tools - based on actual server code""" | |
| if tool_name == "vlan_management": | |
| action = arguments.get("action", "create_vlan") | |
| vlan_id = arguments.get("vlan_id", 100) | |
| vlan_name = arguments.get("vlan_name", f"VLAN_{vlan_id}") | |
| interface = arguments.get("interface", "") | |
| if action == "create_vlan": | |
| return f"""**Cisco VLAN Creation Commands:** | |
| ```cisco | |
| ! Create VLAN {vlan_id} | |
| configure terminal | |
| vlan {vlan_id} | |
| name {vlan_name} | |
| state active | |
| exit | |
| exit | |
| ! Verify VLAN creation: | |
| show vlan brief | |
| show vlan id {vlan_id} | |
| ```""" | |
| elif action == "assign_vlan": | |
| interface = interface or "GigabitEthernet0/1" | |
| return f"""**Assign Interface to VLAN Commands:** | |
| ```cisco | |
| ! Assign interface {interface} to VLAN {vlan_id} | |
| configure terminal | |
| interface {interface} | |
| switchport mode access | |
| switchport access vlan {vlan_id} | |
| no shutdown | |
| exit | |
| exit | |
| ! Verify interface assignment: | |
| show interfaces {interface} switchport | |
| show vlan brief | |
| ```""" | |
| elif action == "show_vlan": | |
| return f"""**VLAN Information Commands:** | |
| ```cisco | |
| ! VLAN Information Commands | |
| show vlan brief | |
| show vlan summary | |
| show vlan id {vlan_id} | |
| show interfaces status | |
| show spanning-tree vlan brief | |
| ```""" | |
| elif action == "delete_vlan": | |
| return f"""**Delete VLAN Commands:** | |
| ```cisco | |
| ! Delete VLAN {vlan_id} | |
| configure terminal | |
| no vlan {vlan_id} | |
| exit | |
| ! Verify VLAN deletion: | |
| show vlan brief | |
| ```""" | |
| elif tool_name == "interface_configuration": | |
| action = arguments.get("action", "configure_access") | |
| interface = arguments.get("interface", "GigabitEthernet0/1") | |
| vlan_id = arguments.get("vlan_id", 1) | |
| ip_address = arguments.get("ip_address", "") | |
| description = arguments.get("description", "") | |
| if action == "configure_access": | |
| return f"""**Configure Access Port Commands:** | |
| ```cisco | |
| ! Configure {interface} as access port for VLAN {vlan_id} | |
| configure terminal | |
| interface {interface} | |
| {f' description {description}' if description else ''} | |
| switchport mode access | |
| switchport access vlan {vlan_id} | |
| spanning-tree portfast | |
| no shutdown | |
| exit | |
| exit | |
| ! Verify configuration: | |
| show interfaces {interface} switchport | |
| show interfaces {interface} status | |
| ```""" | |
| elif action == "configure_trunk": | |
| return f"""**Configure Trunk Port Commands:** | |
| ```cisco | |
| ! Configure {interface} as trunk port | |
| configure terminal | |
| interface {interface} | |
| {f' description {description}' if description else ''} | |
| switchport mode trunk | |
| switchport trunk encapsulation dot1q | |
| switchport trunk allowed vlan {vlan_id if vlan_id != 1 else 'all'} | |
| no shutdown | |
| exit | |
| exit | |
| ! Verify trunk configuration: | |
| show interfaces {interface} trunk | |
| show interfaces {interface} switchport | |
| ```""" | |
| elif action == "set_ip": | |
| if not ip_address: | |
| ip_address = "192.168.1.1" | |
| subnet_mask = arguments.get("subnet_mask", "255.255.255.0") | |
| return f"""**Configure IP Address Commands:** | |
| ```cisco | |
| ! Configure IP address on {interface} | |
| configure terminal | |
| interface {interface} | |
| ip address {ip_address} {subnet_mask} | |
| {f' description {description}' if description else ''} | |
| no shutdown | |
| exit | |
| exit | |
| ! Verify IP configuration: | |
| show ip interface {interface} | |
| show ip interface brief | |
| ```""" | |
| elif action == "shutdown": | |
| return f"""**Shutdown Interface Commands:** | |
| ```cisco | |
| ! Shutdown interface {interface} | |
| configure terminal | |
| interface {interface} | |
| shutdown | |
| exit | |
| exit | |
| ! Verify interface status: | |
| show interfaces {interface} status | |
| ```""" | |
| elif action == "no_shutdown": | |
| return f"""**Enable Interface Commands:** | |
| ```cisco | |
| ! Enable interface {interface} | |
| configure terminal | |
| interface {interface} | |
| no shutdown | |
| exit | |
| exit | |
| ! Verify interface status: | |
| show interfaces {interface} status | |
| ```""" | |
| elif tool_name == "routing_configuration": | |
| protocol = arguments.get("protocol", "ospf") | |
| action = arguments.get("action", "configure") | |
| network = arguments.get("network", "192.168.1.0 0.0.0.255") | |
| area = arguments.get("area", "0") | |
| process_id = arguments.get("process_id", 1) | |
| next_hop = arguments.get("next_hop", "192.168.1.1") | |
| if protocol == "ospf": | |
| if action == "configure": | |
| return f"""**OSPF Configuration Commands:** | |
| ```cisco | |
| ! Configure OSPF process {process_id} | |
| configure terminal | |
| router ospf {process_id} | |
| network {network} area {area} | |
| router-id 1.1.1.1 | |
| exit | |
| exit | |
| ! Verify OSPF configuration: | |
| show ip ospf | |
| show ip ospf neighbor | |
| show ip ospf database | |
| ```""" | |
| elif action == "show": | |
| return f"""**OSPF Information Commands:** | |
| ```cisco | |
| ! OSPF Information Commands | |
| show ip ospf | |
| show ip ospf neighbor | |
| show ip ospf database | |
| show ip ospf interface | |
| show ip route ospf | |
| ```""" | |
| elif protocol == "static": | |
| if action == "configure": | |
| return f"""**Static Route Configuration:** | |
| ```cisco | |
| ! Configure static route to {network} | |
| configure terminal | |
| ip route {network} {next_hop} | |
| exit | |
| ! Verify static route: | |
| show ip route static | |
| show ip route | |
| ```""" | |
| elif protocol == "eigrp": | |
| if action == "configure": | |
| return f"""**EIGRP Configuration Commands:** | |
| ```cisco | |
| ! Configure EIGRP AS {process_id} | |
| configure terminal | |
| router eigrp {process_id} | |
| network {network} | |
| no auto-summary | |
| exit | |
| exit | |
| ! Verify EIGRP configuration: | |
| show ip eigrp neighbors | |
| show ip eigrp topology | |
| ```""" | |
| elif tool_name == "security_configuration": | |
| feature = arguments.get("feature", "ssh") | |
| action = arguments.get("action", "enable") | |
| acl_number = arguments.get("acl_number", 100) | |
| interface = arguments.get("interface", "GigabitEthernet0/1") | |
| if feature == "ssh" and action == "enable": | |
| return f"""**SSH Configuration Commands:** | |
| ```cisco | |
| ! Enable SSH access | |
| configure terminal | |
| hostname CiscoRouter | |
| ip domain-name company.local | |
| crypto key generate rsa general-keys modulus 2048 | |
| ip ssh version 2 | |
| ip ssh time-out 60 | |
| ip ssh authentication-retries 3 | |
| ! Configure user account | |
| username admin privilege 15 secret cisco123 | |
| ! Configure VTY lines | |
| line vty 0 4 | |
| transport input ssh | |
| login local | |
| exit | |
| exit | |
| ! Verify SSH configuration: | |
| show ip ssh | |
| show ssh | |
| ```""" | |
| elif feature == "acl" and action == "configure": | |
| source_ip = arguments.get("source_ip", "192.168.1.0 0.0.0.255") | |
| destination_ip = arguments.get("destination_ip", "any") | |
| return f"""**Access Control List Configuration:** | |
| ```cisco | |
| ! Configure Access Control List {acl_number} | |
| configure terminal | |
| access-list {acl_number} permit ip {source_ip} {destination_ip} | |
| access-list {acl_number} deny ip any any | |
| ! Apply ACL to interface {interface} | |
| interface {interface} | |
| ip access-group {acl_number} in | |
| exit | |
| exit | |
| ! Verify ACL: | |
| show access-lists {acl_number} | |
| show ip interface {interface} | |
| ```""" | |
| elif feature == "port_security" and action == "enable": | |
| return f"""**Port Security Configuration:** | |
| ```cisco | |
| ! Enable port security on {interface} | |
| configure terminal | |
| interface {interface} | |
| switchport mode access | |
| switchport port-security | |
| switchport port-security maximum 2 | |
| switchport port-security mac-address sticky | |
| switchport port-security violation restrict | |
| exit | |
| exit | |
| ! Verify port security: | |
| show port-security interface {interface} | |
| show port-security address | |
| ```""" | |
| elif tool_name == "troubleshooting": | |
| category = arguments.get("category", "connectivity") | |
| target = arguments.get("target", "8.8.8.8") | |
| interface = arguments.get("interface", "") | |
| if category == "connectivity": | |
| return f"""**Connectivity Troubleshooting Commands:** | |
| ```cisco | |
| ! Connectivity Troubleshooting to {target} | |
| ping {target} | |
| traceroute {target} | |
| show ip route | |
| show arp | |
| show ip interface brief | |
| ! Extended ping test: | |
| ping {target} repeat 100 | |
| ping {target} size 1500 | |
| ```""" | |
| elif category == "interface": | |
| return f"""**Interface Troubleshooting Commands:** | |
| ```cisco | |
| ! Interface Troubleshooting for {interface or 'all interfaces'} | |
| {f'show interfaces {interface}' if interface else 'show interfaces'} | |
| {f'show interfaces {interface} status' if interface else 'show interfaces status'} | |
| {f'show controllers {interface}' if interface else 'show controllers'} | |
| show ip interface brief | |
| ! Interface statistics: | |
| {f'show interfaces {interface} | include error' if interface else 'show interfaces | include error'} | |
| ```""" | |
| elif category == "routing": | |
| return f"""**Routing Troubleshooting Commands:** | |
| ```cisco | |
| ! Routing Troubleshooting | |
| show ip route | |
| show ip protocols | |
| show ip ospf neighbor | |
| show ip eigrp neighbors | |
| show cdp neighbors | |
| ! Routing table analysis: | |
| show ip route summary | |
| show ip route {target if target else '0.0.0.0'} | |
| ```""" | |
| elif category == "switching": | |
| return f"""**Switching Troubleshooting Commands:** | |
| ```cisco | |
| ! Switching Troubleshooting | |
| show vlan brief | |
| show interfaces status | |
| show spanning-tree | |
| show mac address-table | |
| show cdp neighbors detail | |
| ! Port and VLAN status: | |
| show interfaces trunk | |
| show spanning-tree blockedports | |
| ```""" | |
| elif category == "general": | |
| return f"""**General Device Information Commands:** | |
| ```cisco | |
| ! General Device Information | |
| show version | |
| show running-config | |
| show startup-config | |
| show processes cpu | |
| show memory | |
| show environment | |
| show logging | |
| ! System status: | |
| show clock | |
| show users | |
| show sessions | |
| ```""" | |
| else: | |
| return f"β Unknown Cisco tool: {tool_name}. Available: vlan_management, interface_configuration, routing_configuration, security_configuration, troubleshooting" | |
| def execute_terraform_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str: | |
| """Execute Terraform MCP tools - based on actual server code""" | |
| if tool_name == "generate_terraform_config": | |
| resource_type = arguments.get("resource_type", "vpc") | |
| provider = arguments.get("provider", "aws") | |
| name = arguments.get("name", "example") | |
| options = arguments.get("options", {}) | |
| # Generate different responses based on resource type | |
| if resource_type == "vpc" and provider == "aws": | |
| cidr = options.get("cidr", "10.0.0.0/16") | |
| return f"""**Generated AWS VPC Terraform Configuration:** | |
| ```hcl | |
| # {name} VPC Infrastructure Configuration | |
| terraform {{ | |
| required_version = ">= 1.0" | |
| required_providers {{ | |
| aws = {{ | |
| source = "hashicorp/aws" | |
| version = "~> 5.0" | |
| }} | |
| }} | |
| }} | |
| provider "aws" {{ | |
| region = var.aws_region | |
| default_tags {{ | |
| tags = {{ | |
| Project = var.project_name | |
| ManagedBy = "Terraform" | |
| Environment = var.environment | |
| }} | |
| }} | |
| }} | |
| variable "aws_region" {{ | |
| description = "AWS region" | |
| type = string | |
| default = "us-west-2" | |
| }} | |
| variable "vpc_cidr" {{ | |
| description = "CIDR block for VPC" | |
| type = string | |
| default = "{cidr}" | |
| }} | |
| # VPC Module | |
| module "{name}_vpc" {{ | |
| source = "terraform-aws-modules/vpc/aws" | |
| name = "${{var.environment}}-{name}-vpc" | |
| cidr = var.vpc_cidr | |
| azs = ["us-west-2a", "us-west-2b"] | |
| public_subnets = [for k, v in ["us-west-2a", "us-west-2b"] : cidrsubnet(var.vpc_cidr, 8, k)] | |
| private_subnets = [for k, v in ["us-west-2a", "us-west-2b"] : cidrsubnet(var.vpc_cidr, 8, k + 10)] | |
| enable_nat_gateway = true | |
| enable_dns_hostnames = true | |
| enable_dns_support = true | |
| }} | |
| # Outputs | |
| output "vpc_id" {{ | |
| description = "ID of the VPC" | |
| value = module.{name}_vpc.vpc_id | |
| }} | |
| ``` | |
| **Next Steps:** | |
| 1. Save as `main.tf` | |
| 2. Run `terraform init` | |
| 3. Run `terraform plan` | |
| 4. Run `terraform apply` | |
| """ | |
| elif resource_type == "ec2" and provider == "aws": | |
| instance_type = options.get("instance_type", "t3.micro") | |
| return f"""**Generated AWS EC2 Terraform Configuration:** | |
| ```hcl | |
| # {name} EC2 Instance Configuration | |
| terraform {{ | |
| required_version = ">= 1.0" | |
| required_providers {{ | |
| aws = {{ | |
| source = "hashicorp/aws" | |
| version = "~> 5.0" | |
| }} | |
| }} | |
| }} | |
| variable "instance_type" {{ | |
| description = "EC2 instance type" | |
| type = string | |
| default = "{instance_type}" | |
| }} | |
| # Data sources | |
| data "aws_ami" "app_ami" {{ | |
| most_recent = true | |
| owners = ["amazon"] | |
| filter {{ | |
| name = "name" | |
| values = ["amzn2-ami-hvm-*-x86_64-gp2"] | |
| }} | |
| }} | |
| # EC2 Instance Module | |
| module "{name}_instance" {{ | |
| source = "terraform-aws-modules/ec2-instance/aws" | |
| name = "{name}-instance" | |
| instance_type = var.instance_type | |
| ami = data.aws_ami.app_ami.id | |
| create_iam_instance_profile = true | |
| root_block_device = [{{ | |
| encrypted = true | |
| volume_type = "gp3" | |
| volume_size = 20 | |
| }}] | |
| }} | |
| output "instance_id" {{ | |
| value = module.{name}_instance.id | |
| }} | |
| ``` | |
| """ | |
| elif resource_type == "s3" and provider == "aws": | |
| return f"""**Generated AWS S3 Bucket Terraform Configuration:** | |
| ```hcl | |
| # {name} S3 Bucket Configuration | |
| terraform {{ | |
| required_providers {{ | |
| aws = {{ | |
| source = "hashicorp/aws" | |
| version = "~> 5.0" | |
| }} | |
| }} | |
| }} | |
| # Random suffix for unique bucket name | |
| resource "random_string" "bucket_suffix" {{ | |
| length = 8 | |
| special = false | |
| upper = false | |
| }} | |
| # S3 Bucket Module | |
| module "{name}_s3_bucket" {{ | |
| source = "terraform-aws-modules/s3-bucket/aws" | |
| bucket = "{name}-bucket-${{random_string.bucket_suffix.result}}" | |
| versioning = {{ | |
| enabled = true | |
| }} | |
| server_side_encryption_configuration = {{ | |
| rule = {{ | |
| apply_server_side_encryption_by_default = {{ | |
| sse_algorithm = "AES256" | |
| }} | |
| }} | |
| }} | |
| block_public_acls = true | |
| block_public_policy = true | |
| }} | |
| output "bucket_name" {{ | |
| value = module.{name}_s3_bucket.s3_bucket_id | |
| }} | |
| ``` | |
| """ | |
| elif resource_type == "eks" and provider == "aws": | |
| return f"""**Generated AWS EKS Cluster Terraform Configuration:** | |
| ```hcl | |
| # {name} EKS Cluster Configuration | |
| terraform {{ | |
| required_providers {{ | |
| aws = {{ | |
| source = "hashicorp/aws" | |
| version = "~> 5.0" | |
| }} | |
| }} | |
| }} | |
| # EKS Cluster Module | |
| module "eks" {{ | |
| source = "terraform-aws-modules/eks/aws" | |
| cluster_name = "{name}-eks-cluster" | |
| cluster_version = "1.28" | |
| vpc_id = module.vpc.vpc_id | |
| subnet_ids = module.vpc.private_subnets | |
| eks_managed_node_groups = {{ | |
| main = {{ | |
| name = "{name}-nodes" | |
| instance_types = ["t3.medium"] | |
| min_size = 1 | |
| max_size = 3 | |
| desired_size = 2 | |
| }} | |
| }} | |
| }} | |
| ``` | |
| """ | |
| else: | |
| return f"""**Terraform Configuration Generation:** | |
| Resource type `{resource_type}` for provider `{provider}` is supported by the server. | |
| **Available Resource Types:** | |
| - vpc, ec2, s3, eks, rds (AWS) | |
| - vnet, compute, aks (Azure) | |
| - Custom configurations | |
| **To generate full configuration, specify:** | |
| - resource_type: {resource_type} | |
| - provider: {provider} | |
| - name: {name} | |
| - options: Custom parameters | |
| """ | |
| elif tool_name == "validate_terraform_config": | |
| config_content = arguments.get("config_content", "") | |
| if not config_content: | |
| return """**Terraform Configuration Validation:** | |
| β **No configuration provided** | |
| Please provide Terraform configuration content to validate. | |
| **Example usage:** | |
| - Paste your .tf file contents | |
| - Enable best practices checking | |
| - Get syntax and style recommendations | |
| """ | |
| return f"""**Terraform Configuration Validation Results:** | |
| ## β Syntax Validation | |
| - Configuration syntax appears valid | |
| - Terraform blocks found | |
| - Provider configuration detected | |
| ## π‘ Best Practices Review | |
| - Consider adding resource tags | |
| - Use remote state storage for production | |
| - Add variable descriptions | |
| - Include outputs for important resources | |
| ## π§ Recommendations | |
| 1. **Run terraform validate** - Official validation | |
| 2. **Run terraform fmt** - Format consistency | |
| 3. **Add validation rules** - Variable constraints | |
| 4. **Security scan** - Use tools like tfsec | |
| **Configuration Length:** {len(config_content)} characters | |
| """ | |
| elif tool_name == "get_deployment_workflow": | |
| workflow_type = arguments.get("workflow_type", "basic") | |
| backend_type = arguments.get("backend_type", "local") | |
| environment = arguments.get("environment", "development") | |
| if workflow_type == "basic": | |
| return f"""**Basic Terraform Deployment Workflow:** | |
| ```bash | |
| # Basic Terraform Workflow | |
| terraform init | |
| terraform validate | |
| terraform fmt | |
| terraform plan | |
| terraform apply | |
| terraform output | |
| ``` | |
| **Environment:** {environment.title()} | |
| **Backend:** {backend_type.upper()} | |
| **Steps Explained:** | |
| 1. **init** - Initialize working directory | |
| 2. **validate** - Check configuration syntax | |
| 3. **fmt** - Format code consistently | |
| 4. **plan** - Preview changes | |
| 5. **apply** - Create/update resources | |
| 6. **output** - Display output values | |
| """ | |
| elif workflow_type == "production": | |
| return f"""**Production Terraform Deployment Workflow:** | |
| ```bash | |
| # Production Terraform Workflow with {backend_type.upper()} backend | |
| # 1. Initialize with backend configuration | |
| terraform init | |
| # 2. Validate configuration | |
| terraform validate | |
| # 3. Format code | |
| terraform fmt | |
| # 4. Security scan (optional) | |
| # tfsec . | |
| # 5. Plan and save | |
| terraform plan -out=tfplan | |
| # 6. Review plan output carefully | |
| terraform show tfplan | |
| # 7. Apply saved plan | |
| terraform apply tfplan | |
| # 8. Verify outputs | |
| terraform output | |
| # 9. Clean up plan file | |
| rm tfplan | |
| ``` | |
| ## π‘οΈ Production Security | |
| - β Use remote state storage | |
| - β Enable state locking | |
| - β Implement approval workflows | |
| - β Run security scans | |
| - β Monitor all changes | |
| """ | |
| elif workflow_type == "module_development": | |
| return f"""**Module Development Workflow:** | |
| ```bash | |
| # Module Development Workflow | |
| # 1. Initialize module | |
| terraform init | |
| # 2. Validate module | |
| terraform validate | |
| # 3. Format module code | |
| terraform fmt -recursive | |
| # 4. Generate documentation | |
| # terraform-docs markdown . > README.md | |
| # 5. Test module (in examples/ directory) | |
| cd examples/basic | |
| terraform init | |
| terraform plan | |
| # 6. Clean up test | |
| terraform destroy | |
| cd ../.. | |
| # 7. Tag version | |
| git tag v1.0.0 | |
| git push --tags | |
| ``` | |
| **Module Structure:** | |
| ``` | |
| modules/{environment}/ | |
| βββ main.tf | |
| βββ variables.tf | |
| βββ outputs.tf | |
| βββ versions.tf | |
| βββ examples/ | |
| ``` | |
| """ | |
| elif tool_name == "convert_to_module": | |
| config_content = arguments.get("config_content", "") | |
| module_name = arguments.get("module_name", "terraform-module") | |
| return f"""**Convert to Terraform Module: {module_name}** | |
| ## π¦ Module Structure | |
| ``` | |
| modules/{module_name}/ | |
| βββ main.tf # Main configuration | |
| βββ variables.tf # Input variables | |
| βββ outputs.tf # Output values | |
| βββ versions.tf # Provider requirements | |
| βββ README.md # Documentation | |
| βββ examples/ | |
| βββ basic/ | |
| βββ main.tf # Example usage | |
| βββ README.md # Example docs | |
| ``` | |
| ## variables.tf | |
| ```hcl | |
| variable "project_name" {{ | |
| description = "Name of the project" | |
| type = string | |
| }} | |
| variable "environment" {{ | |
| description = "Environment name" | |
| type = string | |
| default = "dev" | |
| validation {{ | |
| condition = contains(["dev", "staging", "prod"], var.environment) | |
| error_message = "Environment must be dev, staging, or prod." | |
| }} | |
| }} | |
| variable "tags" {{ | |
| description = "A map of tags to assign to resources" | |
| type = map(string) | |
| default = {{}} | |
| }} | |
| ``` | |
| ## outputs.tf | |
| ```hcl | |
| output "{module_name}_id" {{ | |
| description = "ID of the main resource" | |
| value = # Add appropriate resource reference | |
| }} | |
| ``` | |
| ## Example Usage | |
| ```hcl | |
| module "{module_name}" {{ | |
| source = "./modules/{module_name}" | |
| project_name = "my-project" | |
| environment = "dev" | |
| tags = {{ | |
| Team = "infrastructure" | |
| }} | |
| }} | |
| ``` | |
| """ | |
| elif tool_name == "format_terraform_code": | |
| config_content = arguments.get("config_content", "") | |
| if not config_content: | |
| return """**Terraform Code Formatting:** | |
| β **No configuration provided** | |
| **Formatting features:** | |
| - Normalize indentation (2 spaces) | |
| - Consistent spacing | |
| - Proper block alignment | |
| - Standard Terraform formatting | |
| **Usage:** Provide Terraform configuration content to format. | |
| """ | |
| # Simple formatting demonstration | |
| formatted_lines = [] | |
| for line in config_content.split('\n'): | |
| stripped = line.strip() | |
| if stripped: | |
| formatted_lines.append(stripped) | |
| return f"""**Terraform Code Formatting Results:** | |
| ## β¨ Formatted Configuration | |
| ```hcl | |
| {chr(10).join(formatted_lines)} | |
| ``` | |
| ## π Formatting Applied | |
| - β Normalized indentation | |
| - β Consistent spacing | |
| - β Proper block alignment | |
| - β Standard Terraform style | |
| ## π§ Additional Tips | |
| 1. **Use `terraform fmt`** - Official formatter | |
| 2. **Configure editor** - Auto-formatting in IDE | |
| 3. **Pre-commit hooks** - Format before commits | |
| 4. **Consistent naming** - Use lowercase_with_underscores | |
| """ | |
| elif tool_name == "generate_terraform_docs": | |
| config_content = arguments.get("config_content", "") | |
| module_name = arguments.get("module_name", "terraform-module") | |
| return f"""**Terraform Documentation: {module_name}** | |
| ## π Module Overview | |
| This Terraform configuration manages infrastructure resources with automated documentation generation. | |
| ## ποΈ Resources | |
| | Resource Type | Description | | |
| |---------------|-------------| | |
| | `aws_instance` | EC2 compute instances | | |
| | `aws_vpc` | Virtual Private Cloud | | |
| | `aws_s3_bucket` | Object storage bucket | | |
| ## π₯ Input Variables | |
| | Name | Description | Type | Default | | |
| |------|-------------|------|---------| | |
| | `project_name` | Name of the project | `string` | `null` | | |
| | `environment` | Environment name | `string` | `"dev"` | | |
| | `instance_type` | EC2 instance type | `string` | `"t3.micro"` | | |
| ## π€ Outputs | |
| | Name | Description | | |
| |------|-------------| | |
| | `vpc_id` | ID of the VPC | | |
| | `instance_id` | ID of the EC2 instance | | |
| ## π Usage Example | |
| ```hcl | |
| module "{module_name}" {{ | |
| source = "./modules/{module_name}" | |
| project_name = "my-project" | |
| environment = "production" | |
| tags = {{ | |
| Team = "infrastructure" | |
| Project = "my-project" | |
| }} | |
| }} | |
| ``` | |
| ## π Requirements | |
| | Name | Version | | |
| |------|---------| | |
| | terraform | >= 1.0 | | |
| | aws | >= 5.0 | | |
| ## π§ Setup Instructions | |
| 1. Clone or download this module | |
| 2. Configure variables in `terraform.tfvars` | |
| 3. Run `terraform init` | |
| 4. Run `terraform plan` | |
| 5. Run `terraform apply` | |
| """ | |
| else: | |
| return f"β Unknown Terraform tool: {tool_name}. Available: generate_terraform_config, validate_terraform_config, get_deployment_workflow, convert_to_module, format_terraform_code, generate_terraform_docs" | |
| def select_linux_tool_and_args(self, user_input: str) -> Dict[str, Any]: | |
| """Analyze user input to select appropriate Linux tool and arguments""" | |
| user_lower = user_input.lower() | |
| # File operations | |
| if any(word in user_lower for word in ["list", "ls", "show files", "directory"]): | |
| if "detailed" in user_lower or "-l" in user_lower: | |
| return {"tool": "list_files", "args": {"path": ".", "detailed": True}} | |
| else: | |
| return {"tool": "list_files", "args": {"path": ".", "detailed": False}} | |
| elif any(word in user_lower for word in ["read", "cat", "view", "show content"]): | |
| return {"tool": "read_file", "args": {"filepath": "/etc/hostname"}} | |
| elif any(word in user_lower for word in ["write", "create file", "echo"]): | |
| return {"tool": "write_file", "args": {"filepath": "/tmp/test.txt", "content": "Hello from MCP"}} | |
| # System information | |
| elif any(word in user_lower for word in ["system", "info", "uname", "system info"]): | |
| return {"tool": "get_system_info", "args": {}} | |
| elif any(word in user_lower for word in ["processes", "ps", "running", "process list"]): | |
| return {"tool": "list_processes", "args": {"filter": ""}} | |
| elif any(word in user_lower for word in ["memory", "mem", "ram", "memory usage"]): | |
| return {"tool": "get_memory_usage", "args": {}} | |
| elif any(word in user_lower for word in ["disk", "storage", "df", "disk usage"]): | |
| return {"tool": "get_disk_usage", "args": {}} | |
| # Network operations | |
| elif any(word in user_lower for word in ["network", "netstat", "connections"]): | |
| return {"tool": "get_network_info", "args": {}} | |
| elif any(word in user_lower for word in ["ping", "connectivity", "test connection"]): | |
| return {"tool": "ping_host", "args": {"host": "google.com", "count": 4}} | |
| # Process management | |
| elif any(word in user_lower for word in ["kill", "stop", "terminate"]): | |
| return {"tool": "kill_process", "args": {"pid_or_name": "example_process"}} | |
| # Package management | |
| elif any(word in user_lower for word in ["install", "package", "apt", "yum"]): | |
| return {"tool": "manage_packages", "args": {"operation": "install", "package": "curl"}} | |
| elif any(word in user_lower for word in ["search package", "find package"]): | |
| return {"tool": "manage_packages", "args": {"operation": "search", "package": "python"}} | |
| # Service management | |
| elif any(word in user_lower for word in ["service", "systemctl", "start service", "stop service"]): | |
| if "start" in user_lower: | |
| return {"tool": "manage_services", "args": {"service": "nginx", "action": "start"}} | |
| elif "stop" in user_lower: | |
| return {"tool": "manage_services", "args": {"service": "nginx", "action": "stop"}} | |
| elif "status" in user_lower: | |
| return {"tool": "manage_services", "args": {"service": "nginx", "action": "status"}} | |
| else: | |
| return {"tool": "manage_services", "args": {"service": "nginx", "action": "status"}} | |
| # Log viewing | |
| elif any(word in user_lower for word in ["log", "logs", "syslog", "journal"]): | |
| return {"tool": "view_logs", "args": {"log_type": "syslog", "lines": 50}} | |
| # Environment variables | |
| elif any(word in user_lower for word in ["env", "environment", "variables"]): | |
| return {"tool": "get_environment", "args": {}} | |
| # Default - show help | |
| else: | |
| return {"tool": "help", "args": {}} | |
| def select_cisco_tool_and_args(self, user_input: str) -> Dict[str, Any]: | |
| """Analyze user input to select appropriate Cisco tool and arguments - using ACTUAL tool names""" | |
| user_lower = user_input.lower() | |
| # VLAN management keywords | |
| if any(word in user_lower for word in ["vlan", "vlan management", "create vlan", "vlan config"]): | |
| return {"tool": "vlan_management", "args": {"operation": "create", "vlan_id": 100, "name": "example_vlan"}} | |
| # Interface configuration keywords | |
| elif any(word in user_lower for word in ["interface", "port", "configure interface", "interface config"]): | |
| return {"tool": "interface_configuration", "args": {"interface": "GigabitEthernet0/1", "operation": "configure"}} | |
| # Routing configuration keywords | |
| elif any(word in user_lower for word in ["routing", "ospf", "bgp", "route", "routing config"]): | |
| return {"tool": "routing_configuration", "args": {"protocol": "ospf", "operation": "configure"}} | |
| # Security configuration keywords | |
| elif any(word in user_lower for word in ["security", "acl", "access-list", "firewall", "security config"]): | |
| return {"tool": "security_configuration", "args": {"type": "acl", "operation": "create"}} | |
| # Troubleshooting keywords | |
| elif any(word in user_lower for word in ["troubleshoot", "debug", "problem", "issue", "ping", "trace", "show", "status"]): | |
| return {"tool": "troubleshooting", "args": {"operation": "ping", "target": "8.8.8.8"}} | |
| # Default - show help | |
| else: | |
| return {"tool": "help", "args": {}} | |
| def select_terraform_tool_and_args(self, user_input: str) -> Dict[str, Any]: | |
| """Analyze user input to select appropriate Terraform tool and arguments""" | |
| user_lower = user_input.lower() | |
| # Configuration generation keywords | |
| if any(word in user_lower for word in ["generate", "create config", "terraform config", "infrastructure"]): | |
| if "vpc" in user_lower: | |
| return {"tool": "generate_terraform_config", "args": {"resource_type": "vpc", "provider": "aws", "name": "example"}} | |
| elif "ec2" in user_lower or "instance" in user_lower: | |
| return {"tool": "generate_terraform_config", "args": {"resource_type": "ec2", "provider": "aws", "name": "example"}} | |
| elif "s3" in user_lower or "bucket" in user_lower: | |
| return {"tool": "generate_terraform_config", "args": {"resource_type": "s3", "provider": "aws", "name": "example"}} | |
| elif "eks" in user_lower or "kubernetes" in user_lower: | |
| return {"tool": "generate_terraform_config", "args": {"resource_type": "eks", "provider": "aws", "name": "example"}} | |
| else: | |
| return {"tool": "generate_terraform_config", "args": {"resource_type": "vpc", "provider": "aws", "name": "example"}} | |
| # Validation keywords | |
| elif any(word in user_lower for word in ["validate", "check", "syntax", "validation"]): | |
| return {"tool": "validate_terraform_config", "args": {"config_content": "# Paste your Terraform configuration here", "check_best_practices": True}} | |
| # Workflow keywords | |
| elif any(word in user_lower for word in ["workflow", "deployment", "deploy", "plan", "apply"]): | |
| if "production" in user_lower or "prod" in user_lower: | |
| return {"tool": "get_deployment_workflow", "args": {"workflow_type": "production", "environment": "production"}} | |
| elif "module" in user_lower: | |
| return {"tool": "get_deployment_workflow", "args": {"workflow_type": "module_development"}} | |
| else: | |
| return {"tool": "get_deployment_workflow", "args": {"workflow_type": "basic", "environment": "development"}} | |
| # Module conversion keywords | |
| elif any(word in user_lower for word in ["module", "convert", "reusable"]): | |
| return {"tool": "convert_to_module", "args": {"config_content": "# Paste configuration to convert", "module_name": "example-module"}} | |
| # Formatting keywords | |
| elif any(word in user_lower for word in ["format", "fmt", "formatting", "style"]): | |
| return {"tool": "format_terraform_code", "args": {"config_content": "# Paste Terraform code to format"}} | |
| # Documentation keywords | |
| elif any(word in user_lower for word in ["documentation", "docs", "document", "readme"]): | |
| return {"tool": "generate_terraform_docs", "args": {"config_content": "# Paste configuration to document", "module_name": "terraform-module"}} | |
| # Default - show help | |
| else: | |
| return {"tool": "help", "args": {}} | |
| """Analyze user input to select appropriate Cisco tool and arguments based on actual server""" | |
| user_lower = user_input.lower() | |
| # VLAN management keywords | |
| if any(word in user_lower for word in ["vlan", "create vlan", "assign vlan", "show vlan"]): | |
| if "create" in user_lower: | |
| return {"tool": "vlan_management", "args": {"action": "create_vlan", "vlan_id": 100, "vlan_name": "DATA_VLAN"}} | |
| elif "assign" in user_lower: | |
| return {"tool": "vlan_management", "args": {"action": "assign_vlan", "vlan_id": 100, "interface": "GigabitEthernet0/1"}} | |
| elif "delete" in user_lower: | |
| return {"tool": "vlan_management", "args": {"action": "delete_vlan", "vlan_id": 100}} | |
| else: | |
| return {"tool": "vlan_management", "args": {"action": "show_vlan"}} | |
| # Interface configuration keywords | |
| elif any(word in user_lower for word in ["interface", "port", "access", "trunk", "ethernet"]): | |
| if "trunk" in user_lower: | |
| return {"tool": "interface_configuration", "args": {"action": "configure_trunk", "interface": "GigabitEthernet0/1"}} | |
| elif "ip" in user_lower or "address" in user_lower: | |
| return {"tool": "interface_configuration", "args": {"action": "set_ip", "interface": "GigabitEthernet0/1", "ip_address": "192.168.1.1"}} | |
| elif "shutdown" in user_lower: | |
| return {"tool": "interface_configuration", "args": {"action": "shutdown", "interface": "GigabitEthernet0/1"}} | |
| elif "enable" in user_lower or "no shutdown" in user_lower: | |
| return {"tool": "interface_configuration", "args": {"action": "no_shutdown", "interface": "GigabitEthernet0/1"}} | |
| else: | |
| return {"tool": "interface_configuration", "args": {"action": "configure_access", "interface": "GigabitEthernet0/1", "vlan_id": 10}} | |
| # Routing keywords | |
| elif any(word in user_lower for word in ["routing", "route", "ospf", "eigrp", "static"]): | |
| if "ospf" in user_lower: | |
| return {"tool": "routing_configuration", "args": {"protocol": "ospf", "action": "configure", "network": "192.168.1.0 0.0.0.255"}} | |
| elif "static" in user_lower: | |
| return {"tool": "routing_configuration", "args": {"protocol": "static", "action": "configure", "network": "10.0.0.0 255.0.0.0", "next_hop": "192.168.1.1"}} | |
| elif "eigrp" in user_lower: | |
| return {"tool": "routing_configuration", "args": {"protocol": "eigrp", "action": "configure", "network": "192.168.1.0 0.0.0.255"}} | |
| else: | |
| return {"tool": "routing_configuration", "args": {"protocol": "ospf", "action": "show"}} | |
| # Security keywords | |
| elif any(word in user_lower for word in ["security", "ssh", "acl", "access-list", "port-security"]): | |
| if "ssh" in user_lower: | |
| return {"tool": "security_configuration", "args": {"feature": "ssh", "action": "enable"}} | |
| elif "acl" in user_lower or "access-list" in user_lower: | |
| return {"tool": "security_configuration", "args": {"feature": "acl", "action": "configure", "acl_number": 100}} | |
| elif "port-security" in user_lower or "port security" in user_lower: | |
| return {"tool": "security_configuration", "args": {"feature": "port_security", "action": "enable", "interface": "GigabitEthernet0/1"}} | |
| else: | |
| return {"tool": "security_configuration", "args": {"feature": "ssh", "action": "enable"}} | |
| # Troubleshooting keywords | |
| elif any(word in user_lower for word in ["troubleshoot", "debug", "ping", "traceroute", "show", "connectivity"]): | |
| if "connectivity" in user_lower or "ping" in user_lower: | |
| return {"tool": "troubleshooting", "args": {"category": "connectivity", "target": "8.8.8.8"}} | |
| elif "interface" in user_lower: | |
| return {"tool": "troubleshooting", "args": {"category": "interface"}} | |
| elif "routing" in user_lower or "route" in user_lower: | |
| return {"tool": "troubleshooting", "args": {"category": "routing"}} | |
| elif "switching" in user_lower or "switch" in user_lower: | |
| return {"tool": "troubleshooting", "args": {"category": "switching"}} | |
| else: | |
| return {"tool": "troubleshooting", "args": {"category": "general"}} | |
| # Default - show help | |
| else: | |
| return {"tool": "help", "args": {}} | |
| """Analyze user input to select appropriate Linux tool and arguments""" | |
| user_lower = user_input.lower() | |
| # User management keywords | |
| if any(word in user_lower for word in ["user", "group", "adduser", "useradd", "usermod"]): | |
| if "add" in user_lower and "group" in user_lower: | |
| return {"tool": "user_management", "args": {"action": "add_user_to_group", "username": "example_user", "groupname": "example_group"}} | |
| elif "create" in user_lower or ("add" in user_lower and "user" in user_lower): | |
| return {"tool": "user_management", "args": {"action": "create_user", "username": "new_user"}} | |
| elif "delete" in user_lower and "user" in user_lower: | |
| return {"tool": "user_management", "args": {"action": "delete_user", "username": "old_user"}} | |
| else: | |
| return {"tool": "user_management", "args": {"action": "list_groups"}} | |
| # File permission keywords | |
| elif any(word in user_lower for word in ["permission", "chmod", "chown", "file", "ownership"]): | |
| if "chmod" in user_lower: | |
| return {"tool": "file_permissions", "args": {"action": "chmod", "path": "/path/to/file", "permissions": "755"}} | |
| elif "chown" in user_lower or "ownership" in user_lower: | |
| return {"tool": "file_permissions", "args": {"action": "chown", "path": "/path/to/file", "owner": "username"}} | |
| elif "find" in user_lower: | |
| return {"tool": "file_permissions", "args": {"action": "find_permissions", "path": "/", "permissions": "777"}} | |
| else: | |
| return {"tool": "file_permissions", "args": {"action": "chmod", "path": "/path/to/file", "permissions": "644"}} | |
| # System command keywords | |
| elif any(word in user_lower for word in ["process", "service", "network", "disk", "firewall", "systemctl", "ps"]): | |
| if any(word in user_lower for word in ["process", "ps", "top", "kill"]): | |
| return {"tool": "system_commands", "args": {"category": "process"}} | |
| elif any(word in user_lower for word in ["service", "systemctl", "daemon"]): | |
| return {"tool": "system_commands", "args": {"category": "service"}} | |
| elif any(word in user_lower for word in ["network", "ping", "ip", "interface"]): | |
| return {"tool": "system_commands", "args": {"category": "network"}} | |
| elif any(word in user_lower for word in ["disk", "df", "mount", "storage"]): | |
| return {"tool": "system_commands", "args": {"category": "disk"}} | |
| elif any(word in user_lower for word in ["firewall", "ufw", "iptables"]): | |
| return {"tool": "system_commands", "args": {"category": "firewall"}} | |
| else: | |
| return {"tool": "system_commands", "args": {"category": "process"}} | |
| # Default - show help | |
| else: | |
| return {"tool": "help", "args": {}} | |
| def get_available_tools(self, server_key: str) -> List[Dict[str, Any]]: | |
| """Get available tools from an MCP server""" | |
| if server_key == "linux": | |
| # Return known tools for Linux server | |
| return self.mcp_servers["linux"]["tools"] | |
| # For other servers, try to call them (though they might not have HTTP endpoints) | |
| try: | |
| result = self.call_mcp_server(server_key, "tools/list", {}) | |
| if result["success"]: | |
| data = result["data"] | |
| if isinstance(data, dict): | |
| return data.get("tools", data.get("result", [])) | |
| elif isinstance(data, list): | |
| return data | |
| except: | |
| pass | |
| return [] | |
| def call_mcp_server(self, server_key: str, endpoint: str, payload: Dict[str, Any]) -> Dict[str, Any]: | |
| """Make a call to an MCP server (for non-Linux servers)""" | |
| try: | |
| server_config = self.mcp_servers[server_key] | |
| # For FastAPI MCP servers, we need to use different endpoints | |
| if endpoint == "tools/list": | |
| url = f"{server_config['space_url']}/tools" | |
| response = requests.get(url, timeout=30) | |
| elif endpoint == "tools/call": | |
| url = f"{server_config['space_url']}/tools/call" | |
| response = requests.post( | |
| url, | |
| json=payload, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=30 | |
| ) | |
| else: | |
| # Fallback to direct endpoint | |
| url = f"{server_config['space_url']}/{endpoint}" | |
| response = requests.post( | |
| url, | |
| json=payload, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=30 | |
| ) | |
| if response.status_code == 200: | |
| return {"success": True, "data": response.json()} | |
| else: | |
| return { | |
| "success": False, | |
| "error": f"HTTP {response.status_code}: {response.text}", | |
| "server": server_config['name'] | |
| } | |
| except requests.exceptions.RequestException as e: | |
| return {"success": False, "error": f"Request failed: {str(e)}", "server": server_config['name']} | |
| except Exception as e: | |
| return {"success": False, "error": f"Unexpected error: {str(e)}", "server": server_config['name']} | |
| def execute_tool(self, server_key: str, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: | |
| """Execute a tool on an MCP server""" | |
| payload = { | |
| "name": tool_name, | |
| "arguments": arguments | |
| } | |
| return self.call_mcp_server(server_key, "tools/call", payload) | |
| def analyze_user_request(self, user_input: str) -> Dict[str, Any]: | |
| """Use OpenAI to analyze user request and determine which MCP server to use""" | |
| system_prompt = """ | |
| You are an intelligent assistant that helps users interact with MCP (Model Context Protocol) servers. | |
| Available MCP servers (use these EXACT keys in your response): | |
| 1. terraform: Terraform infrastructure management | |
| 2. linux: Linux system operations | |
| 3. cisco: Cisco network management | |
| IMPORTANT: For the "recommended_server" field, you MUST use one of these exact keys: "terraform", "linux", or "cisco" | |
| Analyze the user's request and determine which MCP server would be most appropriate. | |
| Respond in JSON format with: | |
| { | |
| "recommended_server": "terraform|linux|cisco", | |
| "reasoning": "explanation of why this server was chosen", | |
| "suggested_action": "what action to take" | |
| } | |
| """ | |
| try: | |
| response = self.openai_client.chat.completions.create( | |
| model="gpt-4", | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_input} | |
| ], | |
| temperature=0.3 | |
| ) | |
| content = response.choices[0].message.content.strip() | |
| # Try to parse JSON, handle cases where GPT returns non-JSON | |
| try: | |
| return json.loads(content) | |
| except json.JSONDecodeError: | |
| # If not valid JSON, extract server recommendation manually | |
| content_lower = content.lower() | |
| if "cisco" in content_lower: | |
| recommended_server = "cisco" | |
| elif "linux" in content_lower: | |
| recommended_server = "linux" | |
| elif "terraform" in content_lower: | |
| recommended_server = "terraform" | |
| else: | |
| recommended_server = "cisco" # Default for network questions | |
| return { | |
| "recommended_server": recommended_server, | |
| "reasoning": "Based on content analysis", | |
| "suggested_action": content | |
| } | |
| except Exception as e: | |
| return { | |
| "error": f"Failed to analyze request: {str(e)}", | |
| "recommended_server": "cisco" | |
| } | |
| def generate_response(self, user_input: str, mcp_results: Dict[str, Any] = None) -> str: | |
| """Generate a natural language response using OpenAI""" | |
| context = "" | |
| if mcp_results: | |
| context = f"\nMCP Server Results: {json.dumps(mcp_results, indent=2)}" | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": f""" | |
| You are a helpful assistant that works with MCP servers for infrastructure management. | |
| Provide clear, helpful responses based on the user's request and any MCP server results. | |
| Available MCP Servers: | |
| - Terraform: Infrastructure as Code management | |
| - Linux: System administration and operations | |
| - Cisco: Network device management | |
| {context} | |
| """ | |
| } | |
| ] | |
| # Add conversation history (last 6 messages) | |
| messages.extend(self.conversation_history[-6:]) | |
| messages.append({"role": "user", "content": user_input}) | |
| try: | |
| response = self.openai_client.chat.completions.create( | |
| model="gpt-4", | |
| messages=messages, | |
| temperature=0.7, | |
| max_tokens=1000 | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| return f"Error generating response: {str(e)}" | |
| def process_request(self, user_input: str, selected_server: str = None) -> tuple[str, str]: | |
| """Process a user request end-to-end""" | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| # Add to conversation history | |
| self.conversation_history.append({"role": "user", "content": user_input}) | |
| try: | |
| # Analyze request if no server specified or auto-detect selected | |
| if not selected_server or selected_server == "Auto-detect": | |
| analysis = self.analyze_user_request(user_input) | |
| if "error" in analysis: | |
| response = f"β Analysis Error: {analysis['error']}" | |
| self.conversation_history.append({"role": "assistant", "content": response}) | |
| return response, f"[{timestamp}] Analysis failed" | |
| selected_server = analysis.get("recommended_server", "terraform") | |
| reasoning = analysis.get("reasoning", "No reasoning provided") | |
| status = f"[{timestamp}] Selected: {self.mcp_servers[selected_server]['name']} - {reasoning}" | |
| else: | |
| # Map display names back to keys if needed | |
| server_mapping = { | |
| "MCP Terraform": "terraform", | |
| "MCP Linux": "linux", | |
| "MCP Cisco": "cisco", | |
| "Terraform": "terraform", | |
| "Linux": "linux", | |
| "Cisco": "cisco" | |
| } | |
| selected_server = server_mapping.get(selected_server, selected_server.lower()) | |
| # Validate server key | |
| if selected_server not in self.mcp_servers: | |
| response = f"β Invalid server selection: {selected_server}. Available: {', '.join(self.mcp_servers.keys())}" | |
| self.conversation_history.append({"role": "assistant", "content": response}) | |
| return response, f"[{timestamp}] Invalid server" | |
| status = f"[{timestamp}] Using: {self.mcp_servers[selected_server]['name']}" | |
| # Check server health | |
| health_check = self.check_server_health(selected_server) | |
| if not health_check["success"]: | |
| response = f"β οΈ Cannot connect to {self.mcp_servers[selected_server]['name']} server: {health_check.get('error', 'Server offline')}" | |
| self.conversation_history.append({"role": "assistant", "content": response}) | |
| return response, status + " - Server offline" | |
| # Handle Linux server specifically (full functionality) | |
| if selected_server == "linux": | |
| tool_selection = self.select_linux_tool_and_args(user_input) | |
| if tool_selection["tool"] == "help": | |
| response = """**Available Linux MCP Tools:** | |
| π§ **user_management**: Create, delete, and manage users and groups | |
| π **file_permissions**: Change file permissions and ownership (chmod, chown) | |
| βοΈ **system_commands**: System administration commands (processes, services, network, disk, firewall) | |
| **Example commands you can ask for:** | |
| - "Show me user management commands" | |
| - "How do I change file permissions?" | |
| - "Give me process management commands" | |
| - "Show network diagnostic commands" | |
| - "Help with service management" | |
| - "Configure firewall rules" | |
| - "Check disk usage commands" | |
| """ | |
| else: | |
| # Execute the selected Linux tool | |
| result = self.execute_linux_tool(tool_selection["tool"], tool_selection["args"]) | |
| response = result | |
| # Handle Cisco server specifically (full functionality) | |
| elif selected_server == "cisco": | |
| tool_selection = self.select_cisco_tool_and_args(user_input) | |
| if tool_selection["tool"] == "help": | |
| response = """**Available Cisco MCP Tools:** | |
| π·οΈ **vlan_management**: Create, delete, assign, and show VLANs | |
| π **interface_configuration**: Configure access/trunk ports, IP addresses, shutdown/enable | |
| π£οΈ **routing_configuration**: Set up OSPF, EIGRP, static routes | |
| π **security_configuration**: Configure SSH, ACLs, port security | |
| π§ **troubleshooting**: Network connectivity, interface, routing, switching diagnostics | |
| **Example commands you can ask for:** | |
| - "Create a VLAN" or "Show VLAN information" | |
| - "Configure interface as access port" or "Set up trunk port" | |
| - "Configure OSPF routing" or "Set up static routes" | |
| - "Enable SSH access" or "Configure ACL" | |
| - "Troubleshoot connectivity" or "Check interface status" | |
| """ | |
| else: | |
| # Execute the selected Cisco tool | |
| result = self.execute_cisco_tool(tool_selection["tool"], tool_selection["args"]) | |
| response = result | |
| # Handle other servers (Future expansion) | |
| else: | |
| # Try to get server information first | |
| try: | |
| # Check if server provides any status info | |
| server_url = self.mcp_servers[selected_server]['space_url'] | |
| status_response = requests.get(server_url, timeout=10) | |
| if status_response.status_code == 200: | |
| try: | |
| server_data = status_response.json() | |
| tools_count = server_data.get('tools', 'unknown') | |
| service_name = server_data.get('service', 'unknown') | |
| if selected_server == "cisco": | |
| response = f"""**Cisco MCP Server** β Connected! | |
| **Server Status**: Online | |
| **Service**: {service_name} | |
| **Available Tools**: {tools_count} | |
| **MCP Endpoint**: {server_url}/mcp/sse | |
| **What you can ask for with Cisco:** | |
| - "Show me Cisco interface configuration commands" | |
| - "Help with VLAN configuration" | |
| - "Generate Cisco routing commands" | |
| - "Show network troubleshooting commands" | |
| - "Configure access control lists" | |
| - "Help with Cisco switch configuration" | |
| **Note**: This server has {tools_count} tools available via MCP protocol. The tools likely include: | |
| - Interface configuration management | |
| - VLAN configuration and management | |
| - Routing protocol setup | |
| - ACL (Access Control List) configuration | |
| - Network troubleshooting commands | |
| - Device status and monitoring | |
| Would you like me to help you with any specific Cisco networking task? | |
| """ | |
| elif selected_server == "terraform": | |
| response = f"""**Terraform MCP Server** β Connected! | |
| **Server Status**: Online | |
| **Service**: {service_name} | |
| **Available Tools**: {tools_count} | |
| **MCP Endpoint**: {server_url}/mcp/sse | |
| **What you can ask for with Terraform:** | |
| - "Show me Terraform plan commands" | |
| - "Help with infrastructure deployment" | |
| - "Generate Terraform configuration" | |
| - "Show resource management commands" | |
| - "Help with Terraform state management" | |
| **Note**: This server has {tools_count} tools available via MCP protocol. | |
| Would you like me to help you with any specific Terraform task? | |
| """ | |
| except json.JSONDecodeError: | |
| # Server responded but not with JSON | |
| if selected_server == "cisco": | |
| response = f"""**Cisco MCP Server** β οΈ Connected but Limited Info | |
| **Server Status**: Online (non-JSON response) | |
| **MCP Endpoint**: {server_url}/mcp/sse | |
| **Common Cisco operations I can help with:** | |
| - Interface configuration commands | |
| - VLAN setup and management | |
| - Routing protocol configuration | |
| - Access Control Lists (ACLs) | |
| - Network troubleshooting | |
| - Switch and router configuration | |
| Would you like me to help with any specific Cisco networking task? | |
| """ | |
| else: | |
| response = f"""**Terraform MCP Server** β οΈ Connected but Limited Info | |
| **Server Status**: Online (non-JSON response) | |
| **MCP Endpoint**: {server_url}/mcp/sse | |
| **Common Terraform operations I can help with:** | |
| - Infrastructure planning and deployment | |
| - Resource configuration and management | |
| - State file management | |
| - Module development | |
| - Provider configuration | |
| Would you like me to help with any specific Terraform task? | |
| """ | |
| else: | |
| # Server not responding properly | |
| if selected_server == "cisco": | |
| response = f"""**Cisco MCP Server** β Connection Issue | |
| **Server Status**: {health_check['status']} | |
| **Error**: HTTP {status_response.status_code} | |
| **MCP Endpoint**: {server_url}/mcp/sse | |
| The server appears to be having issues. Please try again later or check if the server is running properly. | |
| """ | |
| else: | |
| response = f"""**Terraform MCP Server** β Connection Issue | |
| **Server Status**: {health_check['status']} | |
| **Error**: HTTP {status_response.status_code} | |
| **MCP Endpoint**: {server_url}/mcp/sse | |
| The server appears to be having issues. Please try again later or check if the server is running properly. | |
| """ | |
| except Exception as e: | |
| # Connection failed | |
| if selected_server == "cisco": | |
| response = f"""**Cisco MCP Server** β Connection Failed | |
| **Error**: {str(e)} | |
| **MCP Endpoint**: {self.mcp_servers[selected_server]['space_url']}/mcp/sse | |
| **Common Cisco operations (offline reference):** | |
| - Interface configuration: `interface GigabitEthernet0/1` | |
| - VLAN configuration: `vlan 10` | |
| - Routing: `ip route 0.0.0.0 0.0.0.0 192.168.1.1` | |
| - ACL: `access-list 10 permit 192.168.1.0 0.0.0.255` | |
| Please check if the server is running and try again. | |
| """ | |
| else: | |
| response = f"""**Terraform MCP Server** β Connection Failed | |
| **Error**: {str(e)} | |
| **MCP Endpoint**: {self.mcp_servers[selected_server]['space_url']}/mcp/sse | |
| **Common Terraform operations (offline reference):** | |
| - Plan: `terraform plan` | |
| - Apply: `terraform apply` | |
| - Destroy: `terraform destroy` | |
| - Validate: `terraform validate` | |
| Please check if the server is running and try again. | |
| """ | |
| self.conversation_history.append({"role": "assistant", "content": response}) | |
| return response, status + f" - {health_check['status']}" | |
| except Exception as e: | |
| error_response = f"β Error processing request: {str(e)}" | |
| self.conversation_history.append({"role": "assistant", "content": error_response}) | |
| return error_response, f"[{timestamp}] Error occurred" | |
| def create_gradio_interface(): | |
| """Create the Gradio interface""" | |
| # Try to get API key from environment (HuggingFace Secrets) | |
| openai_api_key = os.getenv("OPEN_AI_API_KEY") | |
| # Initialize MCP Client if API key is available | |
| mcp_client = None | |
| initial_status = "" | |
| if openai_api_key: | |
| try: | |
| mcp_client = MCPClient(openai_api_key) | |
| initial_status = "β OpenAI client initialized automatically from HF Secrets!" | |
| except Exception as e: | |
| initial_status = f"β Failed to initialize with HF Secret: {str(e)}" | |
| else: | |
| initial_status = "β οΈ OpenAI API key not found in HF Secrets. Please enter manually." | |
| def initialize_client(api_key): | |
| nonlocal mcp_client | |
| # Use provided key or fall back to environment | |
| key_to_use = api_key.strip() if api_key and api_key.strip() else openai_api_key | |
| if not key_to_use: | |
| return "β Please provide your OpenAI API key", "" | |
| try: | |
| mcp_client = MCPClient(key_to_use) | |
| return "β OpenAI client initialized successfully!", "" | |
| except Exception as e: | |
| return f"β Failed to initialize: {str(e)}", "" | |
| def process_message(message, server_choice, history): | |
| if not mcp_client: | |
| return history + [["β Error", "Please initialize the OpenAI client first using the button above."]], "" | |
| if not message.strip(): | |
| return history, "" | |
| try: | |
| response, status = mcp_client.process_request(message, server_choice) | |
| history.append([message, response]) | |
| return history, "" | |
| except Exception as e: | |
| error_msg = f"β Error: {str(e)}" | |
| history.append([message, error_msg]) | |
| return history, "" | |
| def get_server_status(): | |
| if not mcp_client: | |
| return "Please initialize the OpenAI client first." | |
| status_info = "## MCP Servers Status\n\n" | |
| for key, config in mcp_client.mcp_servers.items(): | |
| health_check = mcp_client.check_server_health(key) | |
| if health_check["success"]: | |
| if health_check["status"] == "online": | |
| status_icon = "π’" | |
| status_text = "Online" | |
| else: | |
| status_icon = "π‘" | |
| status_text = "Online (Different Structure)" | |
| else: | |
| status_icon = "π΄" | |
| status_text = "Offline" | |
| status_info += f"**{config['name']}**: {status_icon} {status_text}\n" | |
| status_info += f"- URL: {config['space_url']}\n" | |
| status_info += f"- Description: {config['description']}\n" | |
| # Show tools for Linux server | |
| if key == "linux" and "tools" in config: | |
| status_info += f"- Available Tools: {len(config['tools'])}\n" | |
| for tool in config['tools']: | |
| status_info += f" - **{tool['name']}**: {tool['description']}\n" | |
| # Show MCP endpoint info | |
| status_info += f"- MCP Endpoint: {config['space_url']}/mcp/sse\n" | |
| if health_check["success"] and "response" in health_check: | |
| response_data = health_check["response"] | |
| if isinstance(response_data, dict) and "service" in response_data: | |
| status_info += f"- Service: {response_data.get('service', 'Unknown')}\n" | |
| status_info += "\n" | |
| return status_info | |
| # Create Gradio interface | |
| with gr.Blocks(title="MCP Client with OpenAI", theme=gr.themes.Soft()) as app: | |
| gr.Markdown("# π€ MCP Client with OpenAI Integration") | |
| gr.Markdown("Connect to your Terraform, Linux, and Cisco MCP servers with AI-powered assistance") | |
| gr.Markdown("π‘ **Tip**: Your OpenAI API key is automatically loaded from HuggingFace Secrets!") | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| # API Key input (optional if already in HF Secrets) | |
| api_key = gr.Textbox( | |
| label="OpenAI API Key (Optional - will use HF Secret if available)", | |
| type="password", | |
| placeholder="Leave empty to use HF Secret OPEN_AI_API_KEY...", | |
| value="" | |
| ) | |
| init_btn = gr.Button("Initialize/Reinitialize Client", variant="primary") | |
| init_status = gr.Textbox( | |
| label="Initialization Status", | |
| interactive=False, | |
| value=initial_status | |
| ) | |
| # Server selection | |
| server_choice = gr.Dropdown( | |
| choices=["Auto-detect", "linux", "terraform", "cisco"], | |
| value="Auto-detect", | |
| label="Select MCP Server" | |
| ) | |
| # Chat interface | |
| chatbot = gr.Chatbot( | |
| label="Conversation", | |
| height=400, | |
| show_label=True | |
| ) | |
| msg = gr.Textbox( | |
| label="Message", | |
| placeholder="Ask about infrastructure, Linux systems, or Cisco networks...", | |
| lines=2 | |
| ) | |
| with gr.Row(): | |
| send_btn = gr.Button("Send", variant="primary") | |
| clear_btn = gr.Button("Clear Chat") | |
| with gr.Column(scale=1): | |
| gr.Markdown("## π Server Status") | |
| server_status = gr.Markdown("Initialize client to see server status") | |
| refresh_btn = gr.Button("Refresh Status") | |
| gr.Markdown("## π‘ Example Queries") | |
| gr.Markdown(""" | |
| **Linux (Full Support):** | |
| - "Show me user management commands" | |
| - "How do I change file permissions?" | |
| - "Give me process management commands" | |
| - "Show network diagnostic commands" | |
| - "Help with service management" | |
| - "Configure firewall rules" | |
| **Terraform:** | |
| - "Show me terraform commands" | |
| - "Plan infrastructure deployment" | |
| - "Help with terraform apply" | |
| **Cisco:** | |
| - "Show interface configuration" | |
| - "Configure VLAN commands" | |
| - "Help with port configuration" | |
| """) | |
| # Event handlers | |
| init_btn.click( | |
| initialize_client, | |
| inputs=[api_key], | |
| outputs=[init_status, server_status] | |
| ) | |
| send_btn.click( | |
| process_message, | |
| inputs=[msg, server_choice, chatbot], | |
| outputs=[chatbot, msg] | |
| ) | |
| msg.submit( | |
| process_message, | |
| inputs=[msg, server_choice, chatbot], | |
| outputs=[chatbot, msg] | |
| ) | |
| clear_btn.click( | |
| lambda: [], | |
| outputs=[chatbot] | |
| ) | |
| refresh_btn.click( | |
| get_server_status, | |
| outputs=[server_status] | |
| ) | |
| return app | |
| if __name__ == "__main__": | |
| # Create and launch the Gradio app | |
| app = create_gradio_interface() | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True, | |
| debug=True | |
| ) |