import gradio as gr import openai import requests import json from typing import Dict, List, Any import os from datetime import datetime class MCPClient: def __init__(self, openai_api_key: str): """Initialize the MCP Client with OpenAI integration""" self.openai_client = openai.OpenAI(api_key=openai_api_key) # MCP Server configurations self.mcp_servers = { "terraform": { "name": "MCP Terraform", "space_url": "https://chrissacrumcor-mcp-terraform.hf.space", "description": "Terraform infrastructure management" }, "linux": { "name": "MCP Linux", "space_url": "https://chrissacrumcor-mcp-linux.hf.space", "description": "Linux system operations", # Known tools from the Linux server code "tools": [ { "name": "user_management", "description": "Generate Linux user management commands for creating, deleting, and managing users and groups" }, { "name": "file_permissions", "description": "Generate file permission and ownership commands for chmod, chown, and permission discovery" }, { "name": "system_commands", "description": "Generate common Linux system administration commands for processes, disk, network, services, and firewall" } ] }, "cisco": { "name": "MCP Cisco", "space_url": "https://chrissacrumcor-mcp-cisco.hf.space", "description": "Cisco network management" } } self.conversation_history = [] def check_server_health(self, server_key: str) -> Dict[str, Any]: """Check if MCP server is responsive""" try: server_config = self.mcp_servers[server_key] # Try health endpoints health_endpoints = ["/health", "/", "/docs"] for endpoint in health_endpoints: try: url = f"{server_config['space_url']}{endpoint}" response = requests.get(url, timeout=10) if response.status_code == 200: return { "success": True, "status": "online", "endpoint": endpoint, "response": response.json() if endpoint in ["/health", "/"] else {"status": "docs_available"} } elif response.status_code in [404, 422]: return { "success": True, "status": "online_different_structure", "endpoint": endpoint } except requests.RequestException: continue return {"success": False, "status": "offline", "error": "No responsive endpoints"} except Exception as e: return {"success": False, "status": "error", "error": str(e)} def execute_linux_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str: """Execute Linux MCP tools - recreated from server logic""" if tool_name == "user_management": action = arguments.get("action", "") username = arguments.get("username", "example_user") groupname = arguments.get("groupname", "example_group") if action == "add_user_to_group": return f"""**Add User to Group Commands:** ```bash # Add user '{username}' to group '{groupname}' sudo usermod -a -G {groupname} {username} # Alternative methods: sudo gpasswd -a {username} {groupname} sudo adduser {username} {groupname} # Ubuntu/Debian # Verify group membership: groups {username} id {username} getent group {groupname} ```""" elif action == "create_user": return f"""**Create User Commands:** ```bash # Create new user '{username}' sudo useradd -m -s /bin/bash {username} sudo passwd {username} # Create user with specific group: sudo useradd -m -g {groupname} -s /bin/bash {username} # Add to sudo group (Ubuntu/Debian): sudo usermod -a -G sudo {username} # Add to wheel group (RHEL/CentOS): sudo usermod -a -G wheel {username} # Verify user creation: id {username} getent passwd {username} ```""" elif action == "delete_user": return f"""**Delete User Commands:** ```bash # Delete user '{username}' sudo userdel {username} # Delete user and home directory: sudo userdel -r {username} # Remove user from all groups first (if needed): sudo gpasswd -d {username} groupname # Verify user deletion: getent passwd {username} # Should return no results if deleted successfully ```""" elif action == "list_groups": return """**List Groups and Users Commands:** ```bash # List all groups: cat /etc/group getent group # List groups for current user: groups $USER id $USER # List groups for specific user: groups username id username # List users in specific group: getent group sudo getent group wheel # List all users: cut -d: -f1 /etc/passwd getent passwd ```""" else: return f"❌ Unknown user management action: {action}. Available: add_user_to_group, create_user, delete_user, list_groups" elif tool_name == "file_permissions": action = arguments.get("action", "") path = arguments.get("path", "/path/to/file") permissions = arguments.get("permissions", "755") owner = arguments.get("owner", "username") recursive = arguments.get("recursive", False) recursive_flag = "-R " if recursive else "" if action == "chmod": return f"""**Change File Permissions Commands:** ```bash # Change permissions for {path} chmod {recursive_flag}{permissions} {path} # Common permission examples: # 755 = rwxr-xr-x (executable for owner, readable for others) # 644 = rw-r--r-- (readable/writable for owner, readable for others) # 600 = rw------- (readable/writable for owner only) # 777 = rwxrwxrwx (full permissions for all - use with caution!) # Symbolic notation examples: chmod {recursive_flag}u+x {path} # Add execute for owner chmod {recursive_flag}g-w {path} # Remove write for group chmod {recursive_flag}o-r {path} # Remove read for others # Verify permissions: ls -la {path} stat {path} ```""" elif action == "chown": return f"""**Change File Ownership Commands:** ```bash # Change ownership for {path} sudo chown {recursive_flag}{owner} {path} # Change owner and group: sudo chown {recursive_flag}{owner}:{owner} {path} sudo chown {recursive_flag}{owner}:users {path} # Change only group: sudo chgrp {recursive_flag}{owner} {path} # Verify ownership: ls -la {path} stat {path} ```""" elif action == "find_permissions": return f"""**Find Files by Permissions:** ```bash # Find files with specific permissions in {path} find {path} -type f -perm {permissions} find {path} -type f -perm -{permissions} # At least these permissions # Find files by owner: find {path} -user {owner} find {path} -group {owner} # Find files with dangerous permissions: find {path} -type f -perm -002 # World-writable files find {path} -type f -perm -004 # World-readable files # Find SUID/SGID files: find {path} -type f -perm -4000 # SUID files find {path} -type f -perm -2000 # SGID files # Find world-writable directories: find {path} -type d -perm 777 ```""" else: return f"❌ Unknown file permission action: {action}. Available: chmod, chown, find_permissions" elif tool_name == "system_commands": category = arguments.get("category", "") target = arguments.get("target", "nginx") if category == "process": return f"""**Process Management Commands:** ```bash # List and analyze processes: ps aux # List all processes with details ps -ef # Alternative process listing pstree # Show process tree top # Real-time process monitor htop # Enhanced process monitor # Find and manage specific processes: ps aux | grep {target} # Find specific process pgrep -f {target} # Get PID by name pkill -f {target} # Kill process by name killall {target} # Kill all processes by name # Process control: kill -15 PID # Graceful termination (SIGTERM) kill -9 PID # Force kill (SIGKILL) kill -HUP PID # Reload configuration (SIGHUP) # Background processes: nohup command & # Run command in background screen -S session_name # Start screen session tmux new -s session_name # Start tmux session ```""" elif category == "service": return f"""**Service Management Commands (systemd):** ```bash # Service status and control: systemctl status {target} # Check service status systemctl start {target} # Start service systemctl stop {target} # Stop service systemctl restart {target} # Restart service systemctl reload {target} # Reload configuration # Service persistence: systemctl enable {target} # Enable at boot systemctl disable {target} # Disable at boot systemctl is-enabled {target} # Check if enabled # Service discovery: systemctl list-units # List all active services systemctl list-units --failed # List failed services systemctl list-unit-files # List all unit files # Service logs: journalctl -u {target} # View service logs journalctl -u {target} -f # Follow service logs journalctl -u {target} --since today # Today's logs ```""" elif category == "network": return f"""**Network Diagnostic Commands:** ```bash # Network interface information: ip addr show # Show IP addresses ip link show # Show network interfaces ip route show # Show routing table ip neigh show # Show ARP table # Port and connection analysis: ss -tuln # Show listening ports ss -tulpn # Show ports with process names netstat -tuln # Alternative port listing netstat -rn # Show routing table # Connectivity testing: ping -c 4 {target} # Test connectivity traceroute {target} # Trace network path mtr {target} # Network diagnostic tool nslookup {target} # DNS lookup dig {target} # DNS information # Network utilities: wget -O- ifconfig.me # Get public IP curl -s ifconfig.me # Alternative public IP curl -s ipinfo.io # Detailed IP information ```""" elif category == "disk": return f"""**Disk Usage and Management Commands:** ```bash # Disk usage analysis: df -h # Show disk usage (human readable) df -i # Show inode usage du -sh /* # Show directory sizes in root du -sh /var/log/* | sort -hr # Sort by size # Disk and filesystem information: lsblk # List block devices lsblk -f # Show filesystems blkid # Show UUIDs and labels fdisk -l # List disk partitions parted -l # Alternative partition listing # Mount management: mount # Show mounted filesystems findmnt # Tree view of mounts mount /dev/sdb1 /mnt/backup # Mount filesystem umount /mnt/backup # Unmount filesystem # Cleanup commands: apt autoremove # Remove unused packages (Debian/Ubuntu) yum autoremove # Remove unused packages (RHEL/CentOS) journalctl --disk-usage # Check journal disk usage journalctl --vacuum-time=3d # Clean old journal entries ```""" elif category == "firewall": return f"""**Firewall Management Commands:** ```bash # UFW (Uncomplicated Firewall): sudo ufw status verbose # Check UFW status (detailed) sudo ufw enable # Enable UFW sudo ufw disable # Disable UFW # UFW rule management: sudo ufw allow {target} # Allow port/service sudo ufw deny {target} # Block port/service sudo ufw allow ssh # Allow SSH service sudo ufw allow 80/tcp # Allow HTTP sudo ufw allow 443/tcp # Allow HTTPS sudo ufw allow from 192.168.1.0/24 # Allow from subnet # UFW advanced operations: sudo ufw delete allow 80 # Remove rule sudo ufw insert 1 allow 22 # Insert rule at position sudo ufw logging on # Enable logging # iptables (advanced): sudo iptables -L # List iptables rules sudo iptables -L -n # List with numeric output sudo iptables -A INPUT -p tcp --dport 80 -j ACCEPT # Allow HTTP sudo iptables-save > /etc/iptables/rules.v4 # Save rules ```""" else: return f"❌ Unknown system command category: {category}. Available: process, service, network, disk, firewall" else: return f"❌ Unknown tool: {tool_name}. Available: user_management, file_permissions, system_commands" def execute_cisco_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str: """Execute Cisco MCP tools - based on actual server code""" if tool_name == "vlan_management": action = arguments.get("action", "create_vlan") vlan_id = arguments.get("vlan_id", 100) vlan_name = arguments.get("vlan_name", f"VLAN_{vlan_id}") interface = arguments.get("interface", "") if action == "create_vlan": return f"""**Cisco VLAN Creation Commands:** ```cisco ! Create VLAN {vlan_id} configure terminal vlan {vlan_id} name {vlan_name} state active exit exit ! Verify VLAN creation: show vlan brief show vlan id {vlan_id} ```""" elif action == "assign_vlan": interface = interface or "GigabitEthernet0/1" return f"""**Assign Interface to VLAN Commands:** ```cisco ! Assign interface {interface} to VLAN {vlan_id} configure terminal interface {interface} switchport mode access switchport access vlan {vlan_id} no shutdown exit exit ! Verify interface assignment: show interfaces {interface} switchport show vlan brief ```""" elif action == "show_vlan": return f"""**VLAN Information Commands:** ```cisco ! VLAN Information Commands show vlan brief show vlan summary show vlan id {vlan_id} show interfaces status show spanning-tree vlan brief ```""" elif action == "delete_vlan": return f"""**Delete VLAN Commands:** ```cisco ! Delete VLAN {vlan_id} configure terminal no vlan {vlan_id} exit ! Verify VLAN deletion: show vlan brief ```""" elif tool_name == "interface_configuration": action = arguments.get("action", "configure_access") interface = arguments.get("interface", "GigabitEthernet0/1") vlan_id = arguments.get("vlan_id", 1) ip_address = arguments.get("ip_address", "") description = arguments.get("description", "") if action == "configure_access": return f"""**Configure Access Port Commands:** ```cisco ! Configure {interface} as access port for VLAN {vlan_id} configure terminal interface {interface} {f' description {description}' if description else ''} switchport mode access switchport access vlan {vlan_id} spanning-tree portfast no shutdown exit exit ! Verify configuration: show interfaces {interface} switchport show interfaces {interface} status ```""" elif action == "configure_trunk": return f"""**Configure Trunk Port Commands:** ```cisco ! Configure {interface} as trunk port configure terminal interface {interface} {f' description {description}' if description else ''} switchport mode trunk switchport trunk encapsulation dot1q switchport trunk allowed vlan {vlan_id if vlan_id != 1 else 'all'} no shutdown exit exit ! Verify trunk configuration: show interfaces {interface} trunk show interfaces {interface} switchport ```""" elif action == "set_ip": if not ip_address: ip_address = "192.168.1.1" subnet_mask = arguments.get("subnet_mask", "255.255.255.0") return f"""**Configure IP Address Commands:** ```cisco ! Configure IP address on {interface} configure terminal interface {interface} ip address {ip_address} {subnet_mask} {f' description {description}' if description else ''} no shutdown exit exit ! Verify IP configuration: show ip interface {interface} show ip interface brief ```""" elif action == "shutdown": return f"""**Shutdown Interface Commands:** ```cisco ! Shutdown interface {interface} configure terminal interface {interface} shutdown exit exit ! Verify interface status: show interfaces {interface} status ```""" elif action == "no_shutdown": return f"""**Enable Interface Commands:** ```cisco ! Enable interface {interface} configure terminal interface {interface} no shutdown exit exit ! Verify interface status: show interfaces {interface} status ```""" elif tool_name == "routing_configuration": protocol = arguments.get("protocol", "ospf") action = arguments.get("action", "configure") network = arguments.get("network", "192.168.1.0 0.0.0.255") area = arguments.get("area", "0") process_id = arguments.get("process_id", 1) next_hop = arguments.get("next_hop", "192.168.1.1") if protocol == "ospf": if action == "configure": return f"""**OSPF Configuration Commands:** ```cisco ! Configure OSPF process {process_id} configure terminal router ospf {process_id} network {network} area {area} router-id 1.1.1.1 exit exit ! Verify OSPF configuration: show ip ospf show ip ospf neighbor show ip ospf database ```""" elif action == "show": return f"""**OSPF Information Commands:** ```cisco ! OSPF Information Commands show ip ospf show ip ospf neighbor show ip ospf database show ip ospf interface show ip route ospf ```""" elif protocol == "static": if action == "configure": return f"""**Static Route Configuration:** ```cisco ! Configure static route to {network} configure terminal ip route {network} {next_hop} exit ! Verify static route: show ip route static show ip route ```""" elif protocol == "eigrp": if action == "configure": return f"""**EIGRP Configuration Commands:** ```cisco ! Configure EIGRP AS {process_id} configure terminal router eigrp {process_id} network {network} no auto-summary exit exit ! Verify EIGRP configuration: show ip eigrp neighbors show ip eigrp topology ```""" elif tool_name == "security_configuration": feature = arguments.get("feature", "ssh") action = arguments.get("action", "enable") acl_number = arguments.get("acl_number", 100) interface = arguments.get("interface", "GigabitEthernet0/1") if feature == "ssh" and action == "enable": return f"""**SSH Configuration Commands:** ```cisco ! Enable SSH access configure terminal hostname CiscoRouter ip domain-name company.local crypto key generate rsa general-keys modulus 2048 ip ssh version 2 ip ssh time-out 60 ip ssh authentication-retries 3 ! Configure user account username admin privilege 15 secret cisco123 ! Configure VTY lines line vty 0 4 transport input ssh login local exit exit ! Verify SSH configuration: show ip ssh show ssh ```""" elif feature == "acl" and action == "configure": source_ip = arguments.get("source_ip", "192.168.1.0 0.0.0.255") destination_ip = arguments.get("destination_ip", "any") return f"""**Access Control List Configuration:** ```cisco ! Configure Access Control List {acl_number} configure terminal access-list {acl_number} permit ip {source_ip} {destination_ip} access-list {acl_number} deny ip any any ! Apply ACL to interface {interface} interface {interface} ip access-group {acl_number} in exit exit ! Verify ACL: show access-lists {acl_number} show ip interface {interface} ```""" elif feature == "port_security" and action == "enable": return f"""**Port Security Configuration:** ```cisco ! Enable port security on {interface} configure terminal interface {interface} switchport mode access switchport port-security switchport port-security maximum 2 switchport port-security mac-address sticky switchport port-security violation restrict exit exit ! Verify port security: show port-security interface {interface} show port-security address ```""" elif tool_name == "troubleshooting": category = arguments.get("category", "connectivity") target = arguments.get("target", "8.8.8.8") interface = arguments.get("interface", "") if category == "connectivity": return f"""**Connectivity Troubleshooting Commands:** ```cisco ! Connectivity Troubleshooting to {target} ping {target} traceroute {target} show ip route show arp show ip interface brief ! Extended ping test: ping {target} repeat 100 ping {target} size 1500 ```""" elif category == "interface": return f"""**Interface Troubleshooting Commands:** ```cisco ! Interface Troubleshooting for {interface or 'all interfaces'} {f'show interfaces {interface}' if interface else 'show interfaces'} {f'show interfaces {interface} status' if interface else 'show interfaces status'} {f'show controllers {interface}' if interface else 'show controllers'} show ip interface brief ! Interface statistics: {f'show interfaces {interface} | include error' if interface else 'show interfaces | include error'} ```""" elif category == "routing": return f"""**Routing Troubleshooting Commands:** ```cisco ! Routing Troubleshooting show ip route show ip protocols show ip ospf neighbor show ip eigrp neighbors show cdp neighbors ! Routing table analysis: show ip route summary show ip route {target if target else '0.0.0.0'} ```""" elif category == "switching": return f"""**Switching Troubleshooting Commands:** ```cisco ! Switching Troubleshooting show vlan brief show interfaces status show spanning-tree show mac address-table show cdp neighbors detail ! Port and VLAN status: show interfaces trunk show spanning-tree blockedports ```""" elif category == "general": return f"""**General Device Information Commands:** ```cisco ! General Device Information show version show running-config show startup-config show processes cpu show memory show environment show logging ! System status: show clock show users show sessions ```""" else: return f"❌ Unknown Cisco tool: {tool_name}. Available: vlan_management, interface_configuration, routing_configuration, security_configuration, troubleshooting" def execute_terraform_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str: """Execute Terraform MCP tools - based on actual server code""" if tool_name == "generate_terraform_config": resource_type = arguments.get("resource_type", "vpc") provider = arguments.get("provider", "aws") name = arguments.get("name", "example") options = arguments.get("options", {}) # Generate different responses based on resource type if resource_type == "vpc" and provider == "aws": cidr = options.get("cidr", "10.0.0.0/16") return f"""**Generated AWS VPC Terraform Configuration:** ```hcl # {name} VPC Infrastructure Configuration terraform {{ required_version = ">= 1.0" required_providers {{ aws = {{ source = "hashicorp/aws" version = "~> 5.0" }} }} }} provider "aws" {{ region = var.aws_region default_tags {{ tags = {{ Project = var.project_name ManagedBy = "Terraform" Environment = var.environment }} }} }} variable "aws_region" {{ description = "AWS region" type = string default = "us-west-2" }} variable "vpc_cidr" {{ description = "CIDR block for VPC" type = string default = "{cidr}" }} # VPC Module module "{name}_vpc" {{ source = "terraform-aws-modules/vpc/aws" name = "${{var.environment}}-{name}-vpc" cidr = var.vpc_cidr azs = ["us-west-2a", "us-west-2b"] public_subnets = [for k, v in ["us-west-2a", "us-west-2b"] : cidrsubnet(var.vpc_cidr, 8, k)] private_subnets = [for k, v in ["us-west-2a", "us-west-2b"] : cidrsubnet(var.vpc_cidr, 8, k + 10)] enable_nat_gateway = true enable_dns_hostnames = true enable_dns_support = true }} # Outputs output "vpc_id" {{ description = "ID of the VPC" value = module.{name}_vpc.vpc_id }} ``` **Next Steps:** 1. Save as `main.tf` 2. Run `terraform init` 3. Run `terraform plan` 4. Run `terraform apply` """ elif resource_type == "ec2" and provider == "aws": instance_type = options.get("instance_type", "t3.micro") return f"""**Generated AWS EC2 Terraform Configuration:** ```hcl # {name} EC2 Instance Configuration terraform {{ required_version = ">= 1.0" required_providers {{ aws = {{ source = "hashicorp/aws" version = "~> 5.0" }} }} }} variable "instance_type" {{ description = "EC2 instance type" type = string default = "{instance_type}" }} # Data sources data "aws_ami" "app_ami" {{ most_recent = true owners = ["amazon"] filter {{ name = "name" values = ["amzn2-ami-hvm-*-x86_64-gp2"] }} }} # EC2 Instance Module module "{name}_instance" {{ source = "terraform-aws-modules/ec2-instance/aws" name = "{name}-instance" instance_type = var.instance_type ami = data.aws_ami.app_ami.id create_iam_instance_profile = true root_block_device = [{{ encrypted = true volume_type = "gp3" volume_size = 20 }}] }} output "instance_id" {{ value = module.{name}_instance.id }} ``` """ elif resource_type == "s3" and provider == "aws": return f"""**Generated AWS S3 Bucket Terraform Configuration:** ```hcl # {name} S3 Bucket Configuration terraform {{ required_providers {{ aws = {{ source = "hashicorp/aws" version = "~> 5.0" }} }} }} # Random suffix for unique bucket name resource "random_string" "bucket_suffix" {{ length = 8 special = false upper = false }} # S3 Bucket Module module "{name}_s3_bucket" {{ source = "terraform-aws-modules/s3-bucket/aws" bucket = "{name}-bucket-${{random_string.bucket_suffix.result}}" versioning = {{ enabled = true }} server_side_encryption_configuration = {{ rule = {{ apply_server_side_encryption_by_default = {{ sse_algorithm = "AES256" }} }} }} block_public_acls = true block_public_policy = true }} output "bucket_name" {{ value = module.{name}_s3_bucket.s3_bucket_id }} ``` """ elif resource_type == "eks" and provider == "aws": return f"""**Generated AWS EKS Cluster Terraform Configuration:** ```hcl # {name} EKS Cluster Configuration terraform {{ required_providers {{ aws = {{ source = "hashicorp/aws" version = "~> 5.0" }} }} }} # EKS Cluster Module module "eks" {{ source = "terraform-aws-modules/eks/aws" cluster_name = "{name}-eks-cluster" cluster_version = "1.28" vpc_id = module.vpc.vpc_id subnet_ids = module.vpc.private_subnets eks_managed_node_groups = {{ main = {{ name = "{name}-nodes" instance_types = ["t3.medium"] min_size = 1 max_size = 3 desired_size = 2 }} }} }} ``` """ else: return f"""**Terraform Configuration Generation:** Resource type `{resource_type}` for provider `{provider}` is supported by the server. **Available Resource Types:** - vpc, ec2, s3, eks, rds (AWS) - vnet, compute, aks (Azure) - Custom configurations **To generate full configuration, specify:** - resource_type: {resource_type} - provider: {provider} - name: {name} - options: Custom parameters """ elif tool_name == "validate_terraform_config": config_content = arguments.get("config_content", "") if not config_content: return """**Terraform Configuration Validation:** ❌ **No configuration provided** Please provide Terraform configuration content to validate. **Example usage:** - Paste your .tf file contents - Enable best practices checking - Get syntax and style recommendations """ return f"""**Terraform Configuration Validation Results:** ## ✅ Syntax Validation - Configuration syntax appears valid - Terraform blocks found - Provider configuration detected ## 💡 Best Practices Review - Consider adding resource tags - Use remote state storage for production - Add variable descriptions - Include outputs for important resources ## 🔧 Recommendations 1. **Run terraform validate** - Official validation 2. **Run terraform fmt** - Format consistency 3. **Add validation rules** - Variable constraints 4. **Security scan** - Use tools like tfsec **Configuration Length:** {len(config_content)} characters """ elif tool_name == "get_deployment_workflow": workflow_type = arguments.get("workflow_type", "basic") backend_type = arguments.get("backend_type", "local") environment = arguments.get("environment", "development") if workflow_type == "basic": return f"""**Basic Terraform Deployment Workflow:** ```bash # Basic Terraform Workflow terraform init terraform validate terraform fmt terraform plan terraform apply terraform output ``` **Environment:** {environment.title()} **Backend:** {backend_type.upper()} **Steps Explained:** 1. **init** - Initialize working directory 2. **validate** - Check configuration syntax 3. **fmt** - Format code consistently 4. **plan** - Preview changes 5. **apply** - Create/update resources 6. **output** - Display output values """ elif workflow_type == "production": return f"""**Production Terraform Deployment Workflow:** ```bash # Production Terraform Workflow with {backend_type.upper()} backend # 1. Initialize with backend configuration terraform init # 2. Validate configuration terraform validate # 3. Format code terraform fmt # 4. Security scan (optional) # tfsec . # 5. Plan and save terraform plan -out=tfplan # 6. Review plan output carefully terraform show tfplan # 7. Apply saved plan terraform apply tfplan # 8. Verify outputs terraform output # 9. Clean up plan file rm tfplan ``` ## 🛡️ Production Security - ✅ Use remote state storage - ✅ Enable state locking - ✅ Implement approval workflows - ✅ Run security scans - ✅ Monitor all changes """ elif workflow_type == "module_development": return f"""**Module Development Workflow:** ```bash # Module Development Workflow # 1. Initialize module terraform init # 2. Validate module terraform validate # 3. Format module code terraform fmt -recursive # 4. Generate documentation # terraform-docs markdown . > README.md # 5. Test module (in examples/ directory) cd examples/basic terraform init terraform plan # 6. Clean up test terraform destroy cd ../.. # 7. Tag version git tag v1.0.0 git push --tags ``` **Module Structure:** ``` modules/{environment}/ ├── main.tf ├── variables.tf ├── outputs.tf ├── versions.tf └── examples/ ``` """ elif tool_name == "convert_to_module": config_content = arguments.get("config_content", "") module_name = arguments.get("module_name", "terraform-module") return f"""**Convert to Terraform Module: {module_name}** ## 📦 Module Structure ``` modules/{module_name}/ ├── main.tf # Main configuration ├── variables.tf # Input variables ├── outputs.tf # Output values ├── versions.tf # Provider requirements ├── README.md # Documentation └── examples/ └── basic/ ├── main.tf # Example usage └── README.md # Example docs ``` ## variables.tf ```hcl variable "project_name" {{ description = "Name of the project" type = string }} variable "environment" {{ description = "Environment name" type = string default = "dev" validation {{ condition = contains(["dev", "staging", "prod"], var.environment) error_message = "Environment must be dev, staging, or prod." }} }} variable "tags" {{ description = "A map of tags to assign to resources" type = map(string) default = {{}} }} ``` ## outputs.tf ```hcl output "{module_name}_id" {{ description = "ID of the main resource" value = # Add appropriate resource reference }} ``` ## Example Usage ```hcl module "{module_name}" {{ source = "./modules/{module_name}" project_name = "my-project" environment = "dev" tags = {{ Team = "infrastructure" }} }} ``` """ elif tool_name == "format_terraform_code": config_content = arguments.get("config_content", "") if not config_content: return """**Terraform Code Formatting:** ❌ **No configuration provided** **Formatting features:** - Normalize indentation (2 spaces) - Consistent spacing - Proper block alignment - Standard Terraform formatting **Usage:** Provide Terraform configuration content to format. """ # Simple formatting demonstration formatted_lines = [] for line in config_content.split('\n'): stripped = line.strip() if stripped: formatted_lines.append(stripped) return f"""**Terraform Code Formatting Results:** ## ✨ Formatted Configuration ```hcl {chr(10).join(formatted_lines)} ``` ## 📋 Formatting Applied - ✅ Normalized indentation - ✅ Consistent spacing - ✅ Proper block alignment - ✅ Standard Terraform style ## 🔧 Additional Tips 1. **Use `terraform fmt`** - Official formatter 2. **Configure editor** - Auto-formatting in IDE 3. **Pre-commit hooks** - Format before commits 4. **Consistent naming** - Use lowercase_with_underscores """ elif tool_name == "generate_terraform_docs": config_content = arguments.get("config_content", "") module_name = arguments.get("module_name", "terraform-module") return f"""**Terraform Documentation: {module_name}** ## 📋 Module Overview This Terraform configuration manages infrastructure resources with automated documentation generation. ## 🏗️ Resources | Resource Type | Description | |---------------|-------------| | `aws_instance` | EC2 compute instances | | `aws_vpc` | Virtual Private Cloud | | `aws_s3_bucket` | Object storage bucket | ## 📥 Input Variables | Name | Description | Type | Default | |------|-------------|------|---------| | `project_name` | Name of the project | `string` | `null` | | `environment` | Environment name | `string` | `"dev"` | | `instance_type` | EC2 instance type | `string` | `"t3.micro"` | ## 📤 Outputs | Name | Description | |------|-------------| | `vpc_id` | ID of the VPC | | `instance_id` | ID of the EC2 instance | ## 🚀 Usage Example ```hcl module "{module_name}" {{ source = "./modules/{module_name}" project_name = "my-project" environment = "production" tags = {{ Team = "infrastructure" Project = "my-project" }} }} ``` ## 📋 Requirements | Name | Version | |------|---------| | terraform | >= 1.0 | | aws | >= 5.0 | ## 🔧 Setup Instructions 1. Clone or download this module 2. Configure variables in `terraform.tfvars` 3. Run `terraform init` 4. Run `terraform plan` 5. Run `terraform apply` """ else: return f"❌ Unknown Terraform tool: {tool_name}. Available: generate_terraform_config, validate_terraform_config, get_deployment_workflow, convert_to_module, format_terraform_code, generate_terraform_docs" def select_linux_tool_and_args(self, user_input: str) -> Dict[str, Any]: """Analyze user input to select appropriate Linux tool and arguments""" user_lower = user_input.lower() # File operations if any(word in user_lower for word in ["list", "ls", "show files", "directory"]): if "detailed" in user_lower or "-l" in user_lower: return {"tool": "list_files", "args": {"path": ".", "detailed": True}} else: return {"tool": "list_files", "args": {"path": ".", "detailed": False}} elif any(word in user_lower for word in ["read", "cat", "view", "show content"]): return {"tool": "read_file", "args": {"filepath": "/etc/hostname"}} elif any(word in user_lower for word in ["write", "create file", "echo"]): return {"tool": "write_file", "args": {"filepath": "/tmp/test.txt", "content": "Hello from MCP"}} # System information elif any(word in user_lower for word in ["system", "info", "uname", "system info"]): return {"tool": "get_system_info", "args": {}} elif any(word in user_lower for word in ["processes", "ps", "running", "process list"]): return {"tool": "list_processes", "args": {"filter": ""}} elif any(word in user_lower for word in ["memory", "mem", "ram", "memory usage"]): return {"tool": "get_memory_usage", "args": {}} elif any(word in user_lower for word in ["disk", "storage", "df", "disk usage"]): return {"tool": "get_disk_usage", "args": {}} # Network operations elif any(word in user_lower for word in ["network", "netstat", "connections"]): return {"tool": "get_network_info", "args": {}} elif any(word in user_lower for word in ["ping", "connectivity", "test connection"]): return {"tool": "ping_host", "args": {"host": "google.com", "count": 4}} # Process management elif any(word in user_lower for word in ["kill", "stop", "terminate"]): return {"tool": "kill_process", "args": {"pid_or_name": "example_process"}} # Package management elif any(word in user_lower for word in ["install", "package", "apt", "yum"]): return {"tool": "manage_packages", "args": {"operation": "install", "package": "curl"}} elif any(word in user_lower for word in ["search package", "find package"]): return {"tool": "manage_packages", "args": {"operation": "search", "package": "python"}} # Service management elif any(word in user_lower for word in ["service", "systemctl", "start service", "stop service"]): if "start" in user_lower: return {"tool": "manage_services", "args": {"service": "nginx", "action": "start"}} elif "stop" in user_lower: return {"tool": "manage_services", "args": {"service": "nginx", "action": "stop"}} elif "status" in user_lower: return {"tool": "manage_services", "args": {"service": "nginx", "action": "status"}} else: return {"tool": "manage_services", "args": {"service": "nginx", "action": "status"}} # Log viewing elif any(word in user_lower for word in ["log", "logs", "syslog", "journal"]): return {"tool": "view_logs", "args": {"log_type": "syslog", "lines": 50}} # Environment variables elif any(word in user_lower for word in ["env", "environment", "variables"]): return {"tool": "get_environment", "args": {}} # Default - show help else: return {"tool": "help", "args": {}} def select_cisco_tool_and_args(self, user_input: str) -> Dict[str, Any]: """Analyze user input to select appropriate Cisco tool and arguments - using ACTUAL tool names""" user_lower = user_input.lower() # VLAN management keywords if any(word in user_lower for word in ["vlan", "vlan management", "create vlan", "vlan config"]): return {"tool": "vlan_management", "args": {"operation": "create", "vlan_id": 100, "name": "example_vlan"}} # Interface configuration keywords elif any(word in user_lower for word in ["interface", "port", "configure interface", "interface config"]): return {"tool": "interface_configuration", "args": {"interface": "GigabitEthernet0/1", "operation": "configure"}} # Routing configuration keywords elif any(word in user_lower for word in ["routing", "ospf", "bgp", "route", "routing config"]): return {"tool": "routing_configuration", "args": {"protocol": "ospf", "operation": "configure"}} # Security configuration keywords elif any(word in user_lower for word in ["security", "acl", "access-list", "firewall", "security config"]): return {"tool": "security_configuration", "args": {"type": "acl", "operation": "create"}} # Troubleshooting keywords elif any(word in user_lower for word in ["troubleshoot", "debug", "problem", "issue", "ping", "trace", "show", "status"]): return {"tool": "troubleshooting", "args": {"operation": "ping", "target": "8.8.8.8"}} # Default - show help else: return {"tool": "help", "args": {}} def select_terraform_tool_and_args(self, user_input: str) -> Dict[str, Any]: """Analyze user input to select appropriate Terraform tool and arguments""" user_lower = user_input.lower() # Configuration generation keywords if any(word in user_lower for word in ["generate", "create config", "terraform config", "infrastructure"]): if "vpc" in user_lower: return {"tool": "generate_terraform_config", "args": {"resource_type": "vpc", "provider": "aws", "name": "example"}} elif "ec2" in user_lower or "instance" in user_lower: return {"tool": "generate_terraform_config", "args": {"resource_type": "ec2", "provider": "aws", "name": "example"}} elif "s3" in user_lower or "bucket" in user_lower: return {"tool": "generate_terraform_config", "args": {"resource_type": "s3", "provider": "aws", "name": "example"}} elif "eks" in user_lower or "kubernetes" in user_lower: return {"tool": "generate_terraform_config", "args": {"resource_type": "eks", "provider": "aws", "name": "example"}} else: return {"tool": "generate_terraform_config", "args": {"resource_type": "vpc", "provider": "aws", "name": "example"}} # Validation keywords elif any(word in user_lower for word in ["validate", "check", "syntax", "validation"]): return {"tool": "validate_terraform_config", "args": {"config_content": "# Paste your Terraform configuration here", "check_best_practices": True}} # Workflow keywords elif any(word in user_lower for word in ["workflow", "deployment", "deploy", "plan", "apply"]): if "production" in user_lower or "prod" in user_lower: return {"tool": "get_deployment_workflow", "args": {"workflow_type": "production", "environment": "production"}} elif "module" in user_lower: return {"tool": "get_deployment_workflow", "args": {"workflow_type": "module_development"}} else: return {"tool": "get_deployment_workflow", "args": {"workflow_type": "basic", "environment": "development"}} # Module conversion keywords elif any(word in user_lower for word in ["module", "convert", "reusable"]): return {"tool": "convert_to_module", "args": {"config_content": "# Paste configuration to convert", "module_name": "example-module"}} # Formatting keywords elif any(word in user_lower for word in ["format", "fmt", "formatting", "style"]): return {"tool": "format_terraform_code", "args": {"config_content": "# Paste Terraform code to format"}} # Documentation keywords elif any(word in user_lower for word in ["documentation", "docs", "document", "readme"]): return {"tool": "generate_terraform_docs", "args": {"config_content": "# Paste configuration to document", "module_name": "terraform-module"}} # Default - show help else: return {"tool": "help", "args": {}} """Analyze user input to select appropriate Cisco tool and arguments based on actual server""" user_lower = user_input.lower() # VLAN management keywords if any(word in user_lower for word in ["vlan", "create vlan", "assign vlan", "show vlan"]): if "create" in user_lower: return {"tool": "vlan_management", "args": {"action": "create_vlan", "vlan_id": 100, "vlan_name": "DATA_VLAN"}} elif "assign" in user_lower: return {"tool": "vlan_management", "args": {"action": "assign_vlan", "vlan_id": 100, "interface": "GigabitEthernet0/1"}} elif "delete" in user_lower: return {"tool": "vlan_management", "args": {"action": "delete_vlan", "vlan_id": 100}} else: return {"tool": "vlan_management", "args": {"action": "show_vlan"}} # Interface configuration keywords elif any(word in user_lower for word in ["interface", "port", "access", "trunk", "ethernet"]): if "trunk" in user_lower: return {"tool": "interface_configuration", "args": {"action": "configure_trunk", "interface": "GigabitEthernet0/1"}} elif "ip" in user_lower or "address" in user_lower: return {"tool": "interface_configuration", "args": {"action": "set_ip", "interface": "GigabitEthernet0/1", "ip_address": "192.168.1.1"}} elif "shutdown" in user_lower: return {"tool": "interface_configuration", "args": {"action": "shutdown", "interface": "GigabitEthernet0/1"}} elif "enable" in user_lower or "no shutdown" in user_lower: return {"tool": "interface_configuration", "args": {"action": "no_shutdown", "interface": "GigabitEthernet0/1"}} else: return {"tool": "interface_configuration", "args": {"action": "configure_access", "interface": "GigabitEthernet0/1", "vlan_id": 10}} # Routing keywords elif any(word in user_lower for word in ["routing", "route", "ospf", "eigrp", "static"]): if "ospf" in user_lower: return {"tool": "routing_configuration", "args": {"protocol": "ospf", "action": "configure", "network": "192.168.1.0 0.0.0.255"}} elif "static" in user_lower: return {"tool": "routing_configuration", "args": {"protocol": "static", "action": "configure", "network": "10.0.0.0 255.0.0.0", "next_hop": "192.168.1.1"}} elif "eigrp" in user_lower: return {"tool": "routing_configuration", "args": {"protocol": "eigrp", "action": "configure", "network": "192.168.1.0 0.0.0.255"}} else: return {"tool": "routing_configuration", "args": {"protocol": "ospf", "action": "show"}} # Security keywords elif any(word in user_lower for word in ["security", "ssh", "acl", "access-list", "port-security"]): if "ssh" in user_lower: return {"tool": "security_configuration", "args": {"feature": "ssh", "action": "enable"}} elif "acl" in user_lower or "access-list" in user_lower: return {"tool": "security_configuration", "args": {"feature": "acl", "action": "configure", "acl_number": 100}} elif "port-security" in user_lower or "port security" in user_lower: return {"tool": "security_configuration", "args": {"feature": "port_security", "action": "enable", "interface": "GigabitEthernet0/1"}} else: return {"tool": "security_configuration", "args": {"feature": "ssh", "action": "enable"}} # Troubleshooting keywords elif any(word in user_lower for word in ["troubleshoot", "debug", "ping", "traceroute", "show", "connectivity"]): if "connectivity" in user_lower or "ping" in user_lower: return {"tool": "troubleshooting", "args": {"category": "connectivity", "target": "8.8.8.8"}} elif "interface" in user_lower: return {"tool": "troubleshooting", "args": {"category": "interface"}} elif "routing" in user_lower or "route" in user_lower: return {"tool": "troubleshooting", "args": {"category": "routing"}} elif "switching" in user_lower or "switch" in user_lower: return {"tool": "troubleshooting", "args": {"category": "switching"}} else: return {"tool": "troubleshooting", "args": {"category": "general"}} # Default - show help else: return {"tool": "help", "args": {}} """Analyze user input to select appropriate Linux tool and arguments""" user_lower = user_input.lower() # User management keywords if any(word in user_lower for word in ["user", "group", "adduser", "useradd", "usermod"]): if "add" in user_lower and "group" in user_lower: return {"tool": "user_management", "args": {"action": "add_user_to_group", "username": "example_user", "groupname": "example_group"}} elif "create" in user_lower or ("add" in user_lower and "user" in user_lower): return {"tool": "user_management", "args": {"action": "create_user", "username": "new_user"}} elif "delete" in user_lower and "user" in user_lower: return {"tool": "user_management", "args": {"action": "delete_user", "username": "old_user"}} else: return {"tool": "user_management", "args": {"action": "list_groups"}} # File permission keywords elif any(word in user_lower for word in ["permission", "chmod", "chown", "file", "ownership"]): if "chmod" in user_lower: return {"tool": "file_permissions", "args": {"action": "chmod", "path": "/path/to/file", "permissions": "755"}} elif "chown" in user_lower or "ownership" in user_lower: return {"tool": "file_permissions", "args": {"action": "chown", "path": "/path/to/file", "owner": "username"}} elif "find" in user_lower: return {"tool": "file_permissions", "args": {"action": "find_permissions", "path": "/", "permissions": "777"}} else: return {"tool": "file_permissions", "args": {"action": "chmod", "path": "/path/to/file", "permissions": "644"}} # System command keywords elif any(word in user_lower for word in ["process", "service", "network", "disk", "firewall", "systemctl", "ps"]): if any(word in user_lower for word in ["process", "ps", "top", "kill"]): return {"tool": "system_commands", "args": {"category": "process"}} elif any(word in user_lower for word in ["service", "systemctl", "daemon"]): return {"tool": "system_commands", "args": {"category": "service"}} elif any(word in user_lower for word in ["network", "ping", "ip", "interface"]): return {"tool": "system_commands", "args": {"category": "network"}} elif any(word in user_lower for word in ["disk", "df", "mount", "storage"]): return {"tool": "system_commands", "args": {"category": "disk"}} elif any(word in user_lower for word in ["firewall", "ufw", "iptables"]): return {"tool": "system_commands", "args": {"category": "firewall"}} else: return {"tool": "system_commands", "args": {"category": "process"}} # Default - show help else: return {"tool": "help", "args": {}} def get_available_tools(self, server_key: str) -> List[Dict[str, Any]]: """Get available tools from an MCP server""" if server_key == "linux": # Return known tools for Linux server return self.mcp_servers["linux"]["tools"] # For other servers, try to call them (though they might not have HTTP endpoints) try: result = self.call_mcp_server(server_key, "tools/list", {}) if result["success"]: data = result["data"] if isinstance(data, dict): return data.get("tools", data.get("result", [])) elif isinstance(data, list): return data except: pass return [] def call_mcp_server(self, server_key: str, endpoint: str, payload: Dict[str, Any]) -> Dict[str, Any]: """Make a call to an MCP server (for non-Linux servers)""" try: server_config = self.mcp_servers[server_key] # For FastAPI MCP servers, we need to use different endpoints if endpoint == "tools/list": url = f"{server_config['space_url']}/tools" response = requests.get(url, timeout=30) elif endpoint == "tools/call": url = f"{server_config['space_url']}/tools/call" response = requests.post( url, json=payload, headers={"Content-Type": "application/json"}, timeout=30 ) else: # Fallback to direct endpoint url = f"{server_config['space_url']}/{endpoint}" response = requests.post( url, json=payload, headers={"Content-Type": "application/json"}, timeout=30 ) if response.status_code == 200: return {"success": True, "data": response.json()} else: return { "success": False, "error": f"HTTP {response.status_code}: {response.text}", "server": server_config['name'] } except requests.exceptions.RequestException as e: return {"success": False, "error": f"Request failed: {str(e)}", "server": server_config['name']} except Exception as e: return {"success": False, "error": f"Unexpected error: {str(e)}", "server": server_config['name']} def execute_tool(self, server_key: str, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute a tool on an MCP server""" payload = { "name": tool_name, "arguments": arguments } return self.call_mcp_server(server_key, "tools/call", payload) def analyze_user_request(self, user_input: str) -> Dict[str, Any]: """Use OpenAI to analyze user request and determine which MCP server to use""" system_prompt = """ You are an intelligent assistant that helps users interact with MCP (Model Context Protocol) servers. Available MCP servers (use these EXACT keys in your response): 1. terraform: Terraform infrastructure management 2. linux: Linux system operations 3. cisco: Cisco network management IMPORTANT: For the "recommended_server" field, you MUST use one of these exact keys: "terraform", "linux", or "cisco" Analyze the user's request and determine which MCP server would be most appropriate. Respond in JSON format with: { "recommended_server": "terraform|linux|cisco", "reasoning": "explanation of why this server was chosen", "suggested_action": "what action to take" } """ try: response = self.openai_client.chat.completions.create( model="gpt-4", messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_input} ], temperature=0.3 ) content = response.choices[0].message.content.strip() # Try to parse JSON, handle cases where GPT returns non-JSON try: return json.loads(content) except json.JSONDecodeError: # If not valid JSON, extract server recommendation manually content_lower = content.lower() if "cisco" in content_lower: recommended_server = "cisco" elif "linux" in content_lower: recommended_server = "linux" elif "terraform" in content_lower: recommended_server = "terraform" else: recommended_server = "cisco" # Default for network questions return { "recommended_server": recommended_server, "reasoning": "Based on content analysis", "suggested_action": content } except Exception as e: return { "error": f"Failed to analyze request: {str(e)}", "recommended_server": "cisco" } def generate_response(self, user_input: str, mcp_results: Dict[str, Any] = None) -> str: """Generate a natural language response using OpenAI""" context = "" if mcp_results: context = f"\nMCP Server Results: {json.dumps(mcp_results, indent=2)}" messages = [ { "role": "system", "content": f""" You are a helpful assistant that works with MCP servers for infrastructure management. Provide clear, helpful responses based on the user's request and any MCP server results. Available MCP Servers: - Terraform: Infrastructure as Code management - Linux: System administration and operations - Cisco: Network device management {context} """ } ] # Add conversation history (last 6 messages) messages.extend(self.conversation_history[-6:]) messages.append({"role": "user", "content": user_input}) try: response = self.openai_client.chat.completions.create( model="gpt-4", messages=messages, temperature=0.7, max_tokens=1000 ) return response.choices[0].message.content except Exception as e: return f"Error generating response: {str(e)}" def process_request(self, user_input: str, selected_server: str = None) -> tuple[str, str]: """Process a user request end-to-end""" timestamp = datetime.now().strftime("%H:%M:%S") # Add to conversation history self.conversation_history.append({"role": "user", "content": user_input}) try: # Analyze request if no server specified or auto-detect selected if not selected_server or selected_server == "Auto-detect": analysis = self.analyze_user_request(user_input) if "error" in analysis: response = f"❌ Analysis Error: {analysis['error']}" self.conversation_history.append({"role": "assistant", "content": response}) return response, f"[{timestamp}] Analysis failed" selected_server = analysis.get("recommended_server", "terraform") reasoning = analysis.get("reasoning", "No reasoning provided") status = f"[{timestamp}] Selected: {self.mcp_servers[selected_server]['name']} - {reasoning}" else: # Map display names back to keys if needed server_mapping = { "MCP Terraform": "terraform", "MCP Linux": "linux", "MCP Cisco": "cisco", "Terraform": "terraform", "Linux": "linux", "Cisco": "cisco" } selected_server = server_mapping.get(selected_server, selected_server.lower()) # Validate server key if selected_server not in self.mcp_servers: response = f"❌ Invalid server selection: {selected_server}. Available: {', '.join(self.mcp_servers.keys())}" self.conversation_history.append({"role": "assistant", "content": response}) return response, f"[{timestamp}] Invalid server" status = f"[{timestamp}] Using: {self.mcp_servers[selected_server]['name']}" # Check server health health_check = self.check_server_health(selected_server) if not health_check["success"]: response = f"⚠️ Cannot connect to {self.mcp_servers[selected_server]['name']} server: {health_check.get('error', 'Server offline')}" self.conversation_history.append({"role": "assistant", "content": response}) return response, status + " - Server offline" # Handle Linux server specifically (full functionality) if selected_server == "linux": tool_selection = self.select_linux_tool_and_args(user_input) if tool_selection["tool"] == "help": response = """**Available Linux MCP Tools:** 🔧 **user_management**: Create, delete, and manage users and groups 📁 **file_permissions**: Change file permissions and ownership (chmod, chown) ⚙️ **system_commands**: System administration commands (processes, services, network, disk, firewall) **Example commands you can ask for:** - "Show me user management commands" - "How do I change file permissions?" - "Give me process management commands" - "Show network diagnostic commands" - "Help with service management" - "Configure firewall rules" - "Check disk usage commands" """ else: # Execute the selected Linux tool result = self.execute_linux_tool(tool_selection["tool"], tool_selection["args"]) response = result # Handle Cisco server specifically (full functionality) elif selected_server == "cisco": tool_selection = self.select_cisco_tool_and_args(user_input) if tool_selection["tool"] == "help": response = """**Available Cisco MCP Tools:** 🏷️ **vlan_management**: Create, delete, assign, and show VLANs 🌐 **interface_configuration**: Configure access/trunk ports, IP addresses, shutdown/enable 🛣️ **routing_configuration**: Set up OSPF, EIGRP, static routes 🔒 **security_configuration**: Configure SSH, ACLs, port security 🔧 **troubleshooting**: Network connectivity, interface, routing, switching diagnostics **Example commands you can ask for:** - "Create a VLAN" or "Show VLAN information" - "Configure interface as access port" or "Set up trunk port" - "Configure OSPF routing" or "Set up static routes" - "Enable SSH access" or "Configure ACL" - "Troubleshoot connectivity" or "Check interface status" """ else: # Execute the selected Cisco tool result = self.execute_cisco_tool(tool_selection["tool"], tool_selection["args"]) response = result # Handle other servers (Future expansion) else: # Try to get server information first try: # Check if server provides any status info server_url = self.mcp_servers[selected_server]['space_url'] status_response = requests.get(server_url, timeout=10) if status_response.status_code == 200: try: server_data = status_response.json() tools_count = server_data.get('tools', 'unknown') service_name = server_data.get('service', 'unknown') if selected_server == "cisco": response = f"""**Cisco MCP Server** ✅ Connected! **Server Status**: Online **Service**: {service_name} **Available Tools**: {tools_count} **MCP Endpoint**: {server_url}/mcp/sse **What you can ask for with Cisco:** - "Show me Cisco interface configuration commands" - "Help with VLAN configuration" - "Generate Cisco routing commands" - "Show network troubleshooting commands" - "Configure access control lists" - "Help with Cisco switch configuration" **Note**: This server has {tools_count} tools available via MCP protocol. The tools likely include: - Interface configuration management - VLAN configuration and management - Routing protocol setup - ACL (Access Control List) configuration - Network troubleshooting commands - Device status and monitoring Would you like me to help you with any specific Cisco networking task? """ elif selected_server == "terraform": response = f"""**Terraform MCP Server** ✅ Connected! **Server Status**: Online **Service**: {service_name} **Available Tools**: {tools_count} **MCP Endpoint**: {server_url}/mcp/sse **What you can ask for with Terraform:** - "Show me Terraform plan commands" - "Help with infrastructure deployment" - "Generate Terraform configuration" - "Show resource management commands" - "Help with Terraform state management" **Note**: This server has {tools_count} tools available via MCP protocol. Would you like me to help you with any specific Terraform task? """ except json.JSONDecodeError: # Server responded but not with JSON if selected_server == "cisco": response = f"""**Cisco MCP Server** ⚠️ Connected but Limited Info **Server Status**: Online (non-JSON response) **MCP Endpoint**: {server_url}/mcp/sse **Common Cisco operations I can help with:** - Interface configuration commands - VLAN setup and management - Routing protocol configuration - Access Control Lists (ACLs) - Network troubleshooting - Switch and router configuration Would you like me to help with any specific Cisco networking task? """ else: response = f"""**Terraform MCP Server** ⚠️ Connected but Limited Info **Server Status**: Online (non-JSON response) **MCP Endpoint**: {server_url}/mcp/sse **Common Terraform operations I can help with:** - Infrastructure planning and deployment - Resource configuration and management - State file management - Module development - Provider configuration Would you like me to help with any specific Terraform task? """ else: # Server not responding properly if selected_server == "cisco": response = f"""**Cisco MCP Server** ❌ Connection Issue **Server Status**: {health_check['status']} **Error**: HTTP {status_response.status_code} **MCP Endpoint**: {server_url}/mcp/sse The server appears to be having issues. Please try again later or check if the server is running properly. """ else: response = f"""**Terraform MCP Server** ❌ Connection Issue **Server Status**: {health_check['status']} **Error**: HTTP {status_response.status_code} **MCP Endpoint**: {server_url}/mcp/sse The server appears to be having issues. Please try again later or check if the server is running properly. """ except Exception as e: # Connection failed if selected_server == "cisco": response = f"""**Cisco MCP Server** ❌ Connection Failed **Error**: {str(e)} **MCP Endpoint**: {self.mcp_servers[selected_server]['space_url']}/mcp/sse **Common Cisco operations (offline reference):** - Interface configuration: `interface GigabitEthernet0/1` - VLAN configuration: `vlan 10` - Routing: `ip route 0.0.0.0 0.0.0.0 192.168.1.1` - ACL: `access-list 10 permit 192.168.1.0 0.0.0.255` Please check if the server is running and try again. """ else: response = f"""**Terraform MCP Server** ❌ Connection Failed **Error**: {str(e)} **MCP Endpoint**: {self.mcp_servers[selected_server]['space_url']}/mcp/sse **Common Terraform operations (offline reference):** - Plan: `terraform plan` - Apply: `terraform apply` - Destroy: `terraform destroy` - Validate: `terraform validate` Please check if the server is running and try again. """ self.conversation_history.append({"role": "assistant", "content": response}) return response, status + f" - {health_check['status']}" except Exception as e: error_response = f"❌ Error processing request: {str(e)}" self.conversation_history.append({"role": "assistant", "content": error_response}) return error_response, f"[{timestamp}] Error occurred" def create_gradio_interface(): """Create the Gradio interface""" # Try to get API key from environment (HuggingFace Secrets) openai_api_key = os.getenv("OPEN_AI_API_KEY") # Initialize MCP Client if API key is available mcp_client = None initial_status = "" if openai_api_key: try: mcp_client = MCPClient(openai_api_key) initial_status = "✅ OpenAI client initialized automatically from HF Secrets!" except Exception as e: initial_status = f"❌ Failed to initialize with HF Secret: {str(e)}" else: initial_status = "⚠️ OpenAI API key not found in HF Secrets. Please enter manually." def initialize_client(api_key): nonlocal mcp_client # Use provided key or fall back to environment key_to_use = api_key.strip() if api_key and api_key.strip() else openai_api_key if not key_to_use: return "❌ Please provide your OpenAI API key", "" try: mcp_client = MCPClient(key_to_use) return "✅ OpenAI client initialized successfully!", "" except Exception as e: return f"❌ Failed to initialize: {str(e)}", "" def process_message(message, server_choice, history): if not mcp_client: return history + [["❌ Error", "Please initialize the OpenAI client first using the button above."]], "" if not message.strip(): return history, "" try: response, status = mcp_client.process_request(message, server_choice) history.append([message, response]) return history, "" except Exception as e: error_msg = f"❌ Error: {str(e)}" history.append([message, error_msg]) return history, "" def get_server_status(): if not mcp_client: return "Please initialize the OpenAI client first." status_info = "## MCP Servers Status\n\n" for key, config in mcp_client.mcp_servers.items(): health_check = mcp_client.check_server_health(key) if health_check["success"]: if health_check["status"] == "online": status_icon = "🟢" status_text = "Online" else: status_icon = "🟡" status_text = "Online (Different Structure)" else: status_icon = "🔴" status_text = "Offline" status_info += f"**{config['name']}**: {status_icon} {status_text}\n" status_info += f"- URL: {config['space_url']}\n" status_info += f"- Description: {config['description']}\n" # Show tools for Linux server if key == "linux" and "tools" in config: status_info += f"- Available Tools: {len(config['tools'])}\n" for tool in config['tools']: status_info += f" - **{tool['name']}**: {tool['description']}\n" # Show MCP endpoint info status_info += f"- MCP Endpoint: {config['space_url']}/mcp/sse\n" if health_check["success"] and "response" in health_check: response_data = health_check["response"] if isinstance(response_data, dict) and "service" in response_data: status_info += f"- Service: {response_data.get('service', 'Unknown')}\n" status_info += "\n" return status_info # Create Gradio interface with gr.Blocks(title="MCP Client with OpenAI", theme=gr.themes.Soft()) as app: gr.Markdown("# 🤖 MCP Client with OpenAI Integration") gr.Markdown("Connect to your Terraform, Linux, and Cisco MCP servers with AI-powered assistance") gr.Markdown("💡 **Tip**: Your OpenAI API key is automatically loaded from HuggingFace Secrets!") with gr.Row(): with gr.Column(scale=3): # API Key input (optional if already in HF Secrets) api_key = gr.Textbox( label="OpenAI API Key (Optional - will use HF Secret if available)", type="password", placeholder="Leave empty to use HF Secret OPEN_AI_API_KEY...", value="" ) init_btn = gr.Button("Initialize/Reinitialize Client", variant="primary") init_status = gr.Textbox( label="Initialization Status", interactive=False, value=initial_status ) # Server selection server_choice = gr.Dropdown( choices=["Auto-detect", "linux", "terraform", "cisco"], value="Auto-detect", label="Select MCP Server" ) # Chat interface chatbot = gr.Chatbot( label="Conversation", height=400, show_label=True ) msg = gr.Textbox( label="Message", placeholder="Ask about infrastructure, Linux systems, or Cisco networks...", lines=2 ) with gr.Row(): send_btn = gr.Button("Send", variant="primary") clear_btn = gr.Button("Clear Chat") with gr.Column(scale=1): gr.Markdown("## 📊 Server Status") server_status = gr.Markdown("Initialize client to see server status") refresh_btn = gr.Button("Refresh Status") gr.Markdown("## 💡 Example Queries") gr.Markdown(""" **Linux (Full Support):** - "Show me user management commands" - "How do I change file permissions?" - "Give me process management commands" - "Show network diagnostic commands" - "Help with service management" - "Configure firewall rules" **Terraform:** - "Show me terraform commands" - "Plan infrastructure deployment" - "Help with terraform apply" **Cisco:** - "Show interface configuration" - "Configure VLAN commands" - "Help with port configuration" """) # Event handlers init_btn.click( initialize_client, inputs=[api_key], outputs=[init_status, server_status] ) send_btn.click( process_message, inputs=[msg, server_choice, chatbot], outputs=[chatbot, msg] ) msg.submit( process_message, inputs=[msg, server_choice, chatbot], outputs=[chatbot, msg] ) clear_btn.click( lambda: [], outputs=[chatbot] ) refresh_btn.click( get_server_status, outputs=[server_status] ) return app if __name__ == "__main__": # Create and launch the Gradio app app = create_gradio_interface() app.launch( server_name="0.0.0.0", server_port=7860, share=True, debug=True )