BradMCPClient / app.py
ChrisSacrumCor's picture
Update app.py
cb2fd05 verified
import gradio as gr
import openai
import requests
import json
from typing import Dict, List, Any
import os
from datetime import datetime
class MCPClient:
def __init__(self, openai_api_key: str):
"""Initialize the MCP Client with OpenAI integration"""
self.openai_client = openai.OpenAI(api_key=openai_api_key)
# MCP Server configurations
self.mcp_servers = {
"terraform": {
"name": "MCP Terraform",
"space_url": "https://chrissacrumcor-mcp-terraform.hf.space",
"description": "Terraform infrastructure management"
},
"linux": {
"name": "MCP Linux",
"space_url": "https://chrissacrumcor-mcp-linux.hf.space",
"description": "Linux system operations",
# Known tools from the Linux server code
"tools": [
{
"name": "user_management",
"description": "Generate Linux user management commands for creating, deleting, and managing users and groups"
},
{
"name": "file_permissions",
"description": "Generate file permission and ownership commands for chmod, chown, and permission discovery"
},
{
"name": "system_commands",
"description": "Generate common Linux system administration commands for processes, disk, network, services, and firewall"
}
]
},
"cisco": {
"name": "MCP Cisco",
"space_url": "https://chrissacrumcor-mcp-cisco.hf.space",
"description": "Cisco network management"
}
}
self.conversation_history = []
def check_server_health(self, server_key: str) -> Dict[str, Any]:
"""Check if MCP server is responsive"""
try:
server_config = self.mcp_servers[server_key]
# Try health endpoints
health_endpoints = ["/health", "/", "/docs"]
for endpoint in health_endpoints:
try:
url = f"{server_config['space_url']}{endpoint}"
response = requests.get(url, timeout=10)
if response.status_code == 200:
return {
"success": True,
"status": "online",
"endpoint": endpoint,
"response": response.json() if endpoint in ["/health", "/"] else {"status": "docs_available"}
}
elif response.status_code in [404, 422]:
return {
"success": True,
"status": "online_different_structure",
"endpoint": endpoint
}
except requests.RequestException:
continue
return {"success": False, "status": "offline", "error": "No responsive endpoints"}
except Exception as e:
return {"success": False, "status": "error", "error": str(e)}
def execute_linux_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str:
"""Execute Linux MCP tools - recreated from server logic"""
if tool_name == "user_management":
action = arguments.get("action", "")
username = arguments.get("username", "example_user")
groupname = arguments.get("groupname", "example_group")
if action == "add_user_to_group":
return f"""**Add User to Group Commands:**
```bash
# Add user '{username}' to group '{groupname}'
sudo usermod -a -G {groupname} {username}
# Alternative methods:
sudo gpasswd -a {username} {groupname}
sudo adduser {username} {groupname} # Ubuntu/Debian
# Verify group membership:
groups {username}
id {username}
getent group {groupname}
```"""
elif action == "create_user":
return f"""**Create User Commands:**
```bash
# Create new user '{username}'
sudo useradd -m -s /bin/bash {username}
sudo passwd {username}
# Create user with specific group:
sudo useradd -m -g {groupname} -s /bin/bash {username}
# Add to sudo group (Ubuntu/Debian):
sudo usermod -a -G sudo {username}
# Add to wheel group (RHEL/CentOS):
sudo usermod -a -G wheel {username}
# Verify user creation:
id {username}
getent passwd {username}
```"""
elif action == "delete_user":
return f"""**Delete User Commands:**
```bash
# Delete user '{username}'
sudo userdel {username}
# Delete user and home directory:
sudo userdel -r {username}
# Remove user from all groups first (if needed):
sudo gpasswd -d {username} groupname
# Verify user deletion:
getent passwd {username}
# Should return no results if deleted successfully
```"""
elif action == "list_groups":
return """**List Groups and Users Commands:**
```bash
# List all groups:
cat /etc/group
getent group
# List groups for current user:
groups $USER
id $USER
# List groups for specific user:
groups username
id username
# List users in specific group:
getent group sudo
getent group wheel
# List all users:
cut -d: -f1 /etc/passwd
getent passwd
```"""
else:
return f"❌ Unknown user management action: {action}. Available: add_user_to_group, create_user, delete_user, list_groups"
elif tool_name == "file_permissions":
action = arguments.get("action", "")
path = arguments.get("path", "/path/to/file")
permissions = arguments.get("permissions", "755")
owner = arguments.get("owner", "username")
recursive = arguments.get("recursive", False)
recursive_flag = "-R " if recursive else ""
if action == "chmod":
return f"""**Change File Permissions Commands:**
```bash
# Change permissions for {path}
chmod {recursive_flag}{permissions} {path}
# Common permission examples:
# 755 = rwxr-xr-x (executable for owner, readable for others)
# 644 = rw-r--r-- (readable/writable for owner, readable for others)
# 600 = rw------- (readable/writable for owner only)
# 777 = rwxrwxrwx (full permissions for all - use with caution!)
# Symbolic notation examples:
chmod {recursive_flag}u+x {path} # Add execute for owner
chmod {recursive_flag}g-w {path} # Remove write for group
chmod {recursive_flag}o-r {path} # Remove read for others
# Verify permissions:
ls -la {path}
stat {path}
```"""
elif action == "chown":
return f"""**Change File Ownership Commands:**
```bash
# Change ownership for {path}
sudo chown {recursive_flag}{owner} {path}
# Change owner and group:
sudo chown {recursive_flag}{owner}:{owner} {path}
sudo chown {recursive_flag}{owner}:users {path}
# Change only group:
sudo chgrp {recursive_flag}{owner} {path}
# Verify ownership:
ls -la {path}
stat {path}
```"""
elif action == "find_permissions":
return f"""**Find Files by Permissions:**
```bash
# Find files with specific permissions in {path}
find {path} -type f -perm {permissions}
find {path} -type f -perm -{permissions} # At least these permissions
# Find files by owner:
find {path} -user {owner}
find {path} -group {owner}
# Find files with dangerous permissions:
find {path} -type f -perm -002 # World-writable files
find {path} -type f -perm -004 # World-readable files
# Find SUID/SGID files:
find {path} -type f -perm -4000 # SUID files
find {path} -type f -perm -2000 # SGID files
# Find world-writable directories:
find {path} -type d -perm 777
```"""
else:
return f"❌ Unknown file permission action: {action}. Available: chmod, chown, find_permissions"
elif tool_name == "system_commands":
category = arguments.get("category", "")
target = arguments.get("target", "nginx")
if category == "process":
return f"""**Process Management Commands:**
```bash
# List and analyze processes:
ps aux # List all processes with details
ps -ef # Alternative process listing
pstree # Show process tree
top # Real-time process monitor
htop # Enhanced process monitor
# Find and manage specific processes:
ps aux | grep {target} # Find specific process
pgrep -f {target} # Get PID by name
pkill -f {target} # Kill process by name
killall {target} # Kill all processes by name
# Process control:
kill -15 PID # Graceful termination (SIGTERM)
kill -9 PID # Force kill (SIGKILL)
kill -HUP PID # Reload configuration (SIGHUP)
# Background processes:
nohup command & # Run command in background
screen -S session_name # Start screen session
tmux new -s session_name # Start tmux session
```"""
elif category == "service":
return f"""**Service Management Commands (systemd):**
```bash
# Service status and control:
systemctl status {target} # Check service status
systemctl start {target} # Start service
systemctl stop {target} # Stop service
systemctl restart {target} # Restart service
systemctl reload {target} # Reload configuration
# Service persistence:
systemctl enable {target} # Enable at boot
systemctl disable {target} # Disable at boot
systemctl is-enabled {target} # Check if enabled
# Service discovery:
systemctl list-units # List all active services
systemctl list-units --failed # List failed services
systemctl list-unit-files # List all unit files
# Service logs:
journalctl -u {target} # View service logs
journalctl -u {target} -f # Follow service logs
journalctl -u {target} --since today # Today's logs
```"""
elif category == "network":
return f"""**Network Diagnostic Commands:**
```bash
# Network interface information:
ip addr show # Show IP addresses
ip link show # Show network interfaces
ip route show # Show routing table
ip neigh show # Show ARP table
# Port and connection analysis:
ss -tuln # Show listening ports
ss -tulpn # Show ports with process names
netstat -tuln # Alternative port listing
netstat -rn # Show routing table
# Connectivity testing:
ping -c 4 {target} # Test connectivity
traceroute {target} # Trace network path
mtr {target} # Network diagnostic tool
nslookup {target} # DNS lookup
dig {target} # DNS information
# Network utilities:
wget -O- ifconfig.me # Get public IP
curl -s ifconfig.me # Alternative public IP
curl -s ipinfo.io # Detailed IP information
```"""
elif category == "disk":
return f"""**Disk Usage and Management Commands:**
```bash
# Disk usage analysis:
df -h # Show disk usage (human readable)
df -i # Show inode usage
du -sh /* # Show directory sizes in root
du -sh /var/log/* | sort -hr # Sort by size
# Disk and filesystem information:
lsblk # List block devices
lsblk -f # Show filesystems
blkid # Show UUIDs and labels
fdisk -l # List disk partitions
parted -l # Alternative partition listing
# Mount management:
mount # Show mounted filesystems
findmnt # Tree view of mounts
mount /dev/sdb1 /mnt/backup # Mount filesystem
umount /mnt/backup # Unmount filesystem
# Cleanup commands:
apt autoremove # Remove unused packages (Debian/Ubuntu)
yum autoremove # Remove unused packages (RHEL/CentOS)
journalctl --disk-usage # Check journal disk usage
journalctl --vacuum-time=3d # Clean old journal entries
```"""
elif category == "firewall":
return f"""**Firewall Management Commands:**
```bash
# UFW (Uncomplicated Firewall):
sudo ufw status verbose # Check UFW status (detailed)
sudo ufw enable # Enable UFW
sudo ufw disable # Disable UFW
# UFW rule management:
sudo ufw allow {target} # Allow port/service
sudo ufw deny {target} # Block port/service
sudo ufw allow ssh # Allow SSH service
sudo ufw allow 80/tcp # Allow HTTP
sudo ufw allow 443/tcp # Allow HTTPS
sudo ufw allow from 192.168.1.0/24 # Allow from subnet
# UFW advanced operations:
sudo ufw delete allow 80 # Remove rule
sudo ufw insert 1 allow 22 # Insert rule at position
sudo ufw logging on # Enable logging
# iptables (advanced):
sudo iptables -L # List iptables rules
sudo iptables -L -n # List with numeric output
sudo iptables -A INPUT -p tcp --dport 80 -j ACCEPT # Allow HTTP
sudo iptables-save > /etc/iptables/rules.v4 # Save rules
```"""
else:
return f"❌ Unknown system command category: {category}. Available: process, service, network, disk, firewall"
else:
return f"❌ Unknown tool: {tool_name}. Available: user_management, file_permissions, system_commands"
def execute_cisco_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str:
"""Execute Cisco MCP tools - based on actual server code"""
if tool_name == "vlan_management":
action = arguments.get("action", "create_vlan")
vlan_id = arguments.get("vlan_id", 100)
vlan_name = arguments.get("vlan_name", f"VLAN_{vlan_id}")
interface = arguments.get("interface", "")
if action == "create_vlan":
return f"""**Cisco VLAN Creation Commands:**
```cisco
! Create VLAN {vlan_id}
configure terminal
vlan {vlan_id}
name {vlan_name}
state active
exit
exit
! Verify VLAN creation:
show vlan brief
show vlan id {vlan_id}
```"""
elif action == "assign_vlan":
interface = interface or "GigabitEthernet0/1"
return f"""**Assign Interface to VLAN Commands:**
```cisco
! Assign interface {interface} to VLAN {vlan_id}
configure terminal
interface {interface}
switchport mode access
switchport access vlan {vlan_id}
no shutdown
exit
exit
! Verify interface assignment:
show interfaces {interface} switchport
show vlan brief
```"""
elif action == "show_vlan":
return f"""**VLAN Information Commands:**
```cisco
! VLAN Information Commands
show vlan brief
show vlan summary
show vlan id {vlan_id}
show interfaces status
show spanning-tree vlan brief
```"""
elif action == "delete_vlan":
return f"""**Delete VLAN Commands:**
```cisco
! Delete VLAN {vlan_id}
configure terminal
no vlan {vlan_id}
exit
! Verify VLAN deletion:
show vlan brief
```"""
elif tool_name == "interface_configuration":
action = arguments.get("action", "configure_access")
interface = arguments.get("interface", "GigabitEthernet0/1")
vlan_id = arguments.get("vlan_id", 1)
ip_address = arguments.get("ip_address", "")
description = arguments.get("description", "")
if action == "configure_access":
return f"""**Configure Access Port Commands:**
```cisco
! Configure {interface} as access port for VLAN {vlan_id}
configure terminal
interface {interface}
{f' description {description}' if description else ''}
switchport mode access
switchport access vlan {vlan_id}
spanning-tree portfast
no shutdown
exit
exit
! Verify configuration:
show interfaces {interface} switchport
show interfaces {interface} status
```"""
elif action == "configure_trunk":
return f"""**Configure Trunk Port Commands:**
```cisco
! Configure {interface} as trunk port
configure terminal
interface {interface}
{f' description {description}' if description else ''}
switchport mode trunk
switchport trunk encapsulation dot1q
switchport trunk allowed vlan {vlan_id if vlan_id != 1 else 'all'}
no shutdown
exit
exit
! Verify trunk configuration:
show interfaces {interface} trunk
show interfaces {interface} switchport
```"""
elif action == "set_ip":
if not ip_address:
ip_address = "192.168.1.1"
subnet_mask = arguments.get("subnet_mask", "255.255.255.0")
return f"""**Configure IP Address Commands:**
```cisco
! Configure IP address on {interface}
configure terminal
interface {interface}
ip address {ip_address} {subnet_mask}
{f' description {description}' if description else ''}
no shutdown
exit
exit
! Verify IP configuration:
show ip interface {interface}
show ip interface brief
```"""
elif action == "shutdown":
return f"""**Shutdown Interface Commands:**
```cisco
! Shutdown interface {interface}
configure terminal
interface {interface}
shutdown
exit
exit
! Verify interface status:
show interfaces {interface} status
```"""
elif action == "no_shutdown":
return f"""**Enable Interface Commands:**
```cisco
! Enable interface {interface}
configure terminal
interface {interface}
no shutdown
exit
exit
! Verify interface status:
show interfaces {interface} status
```"""
elif tool_name == "routing_configuration":
protocol = arguments.get("protocol", "ospf")
action = arguments.get("action", "configure")
network = arguments.get("network", "192.168.1.0 0.0.0.255")
area = arguments.get("area", "0")
process_id = arguments.get("process_id", 1)
next_hop = arguments.get("next_hop", "192.168.1.1")
if protocol == "ospf":
if action == "configure":
return f"""**OSPF Configuration Commands:**
```cisco
! Configure OSPF process {process_id}
configure terminal
router ospf {process_id}
network {network} area {area}
router-id 1.1.1.1
exit
exit
! Verify OSPF configuration:
show ip ospf
show ip ospf neighbor
show ip ospf database
```"""
elif action == "show":
return f"""**OSPF Information Commands:**
```cisco
! OSPF Information Commands
show ip ospf
show ip ospf neighbor
show ip ospf database
show ip ospf interface
show ip route ospf
```"""
elif protocol == "static":
if action == "configure":
return f"""**Static Route Configuration:**
```cisco
! Configure static route to {network}
configure terminal
ip route {network} {next_hop}
exit
! Verify static route:
show ip route static
show ip route
```"""
elif protocol == "eigrp":
if action == "configure":
return f"""**EIGRP Configuration Commands:**
```cisco
! Configure EIGRP AS {process_id}
configure terminal
router eigrp {process_id}
network {network}
no auto-summary
exit
exit
! Verify EIGRP configuration:
show ip eigrp neighbors
show ip eigrp topology
```"""
elif tool_name == "security_configuration":
feature = arguments.get("feature", "ssh")
action = arguments.get("action", "enable")
acl_number = arguments.get("acl_number", 100)
interface = arguments.get("interface", "GigabitEthernet0/1")
if feature == "ssh" and action == "enable":
return f"""**SSH Configuration Commands:**
```cisco
! Enable SSH access
configure terminal
hostname CiscoRouter
ip domain-name company.local
crypto key generate rsa general-keys modulus 2048
ip ssh version 2
ip ssh time-out 60
ip ssh authentication-retries 3
! Configure user account
username admin privilege 15 secret cisco123
! Configure VTY lines
line vty 0 4
transport input ssh
login local
exit
exit
! Verify SSH configuration:
show ip ssh
show ssh
```"""
elif feature == "acl" and action == "configure":
source_ip = arguments.get("source_ip", "192.168.1.0 0.0.0.255")
destination_ip = arguments.get("destination_ip", "any")
return f"""**Access Control List Configuration:**
```cisco
! Configure Access Control List {acl_number}
configure terminal
access-list {acl_number} permit ip {source_ip} {destination_ip}
access-list {acl_number} deny ip any any
! Apply ACL to interface {interface}
interface {interface}
ip access-group {acl_number} in
exit
exit
! Verify ACL:
show access-lists {acl_number}
show ip interface {interface}
```"""
elif feature == "port_security" and action == "enable":
return f"""**Port Security Configuration:**
```cisco
! Enable port security on {interface}
configure terminal
interface {interface}
switchport mode access
switchport port-security
switchport port-security maximum 2
switchport port-security mac-address sticky
switchport port-security violation restrict
exit
exit
! Verify port security:
show port-security interface {interface}
show port-security address
```"""
elif tool_name == "troubleshooting":
category = arguments.get("category", "connectivity")
target = arguments.get("target", "8.8.8.8")
interface = arguments.get("interface", "")
if category == "connectivity":
return f"""**Connectivity Troubleshooting Commands:**
```cisco
! Connectivity Troubleshooting to {target}
ping {target}
traceroute {target}
show ip route
show arp
show ip interface brief
! Extended ping test:
ping {target} repeat 100
ping {target} size 1500
```"""
elif category == "interface":
return f"""**Interface Troubleshooting Commands:**
```cisco
! Interface Troubleshooting for {interface or 'all interfaces'}
{f'show interfaces {interface}' if interface else 'show interfaces'}
{f'show interfaces {interface} status' if interface else 'show interfaces status'}
{f'show controllers {interface}' if interface else 'show controllers'}
show ip interface brief
! Interface statistics:
{f'show interfaces {interface} | include error' if interface else 'show interfaces | include error'}
```"""
elif category == "routing":
return f"""**Routing Troubleshooting Commands:**
```cisco
! Routing Troubleshooting
show ip route
show ip protocols
show ip ospf neighbor
show ip eigrp neighbors
show cdp neighbors
! Routing table analysis:
show ip route summary
show ip route {target if target else '0.0.0.0'}
```"""
elif category == "switching":
return f"""**Switching Troubleshooting Commands:**
```cisco
! Switching Troubleshooting
show vlan brief
show interfaces status
show spanning-tree
show mac address-table
show cdp neighbors detail
! Port and VLAN status:
show interfaces trunk
show spanning-tree blockedports
```"""
elif category == "general":
return f"""**General Device Information Commands:**
```cisco
! General Device Information
show version
show running-config
show startup-config
show processes cpu
show memory
show environment
show logging
! System status:
show clock
show users
show sessions
```"""
else:
return f"❌ Unknown Cisco tool: {tool_name}. Available: vlan_management, interface_configuration, routing_configuration, security_configuration, troubleshooting"
def execute_terraform_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str:
"""Execute Terraform MCP tools - based on actual server code"""
if tool_name == "generate_terraform_config":
resource_type = arguments.get("resource_type", "vpc")
provider = arguments.get("provider", "aws")
name = arguments.get("name", "example")
options = arguments.get("options", {})
# Generate different responses based on resource type
if resource_type == "vpc" and provider == "aws":
cidr = options.get("cidr", "10.0.0.0/16")
return f"""**Generated AWS VPC Terraform Configuration:**
```hcl
# {name} VPC Infrastructure Configuration
terraform {{
required_version = ">= 1.0"
required_providers {{
aws = {{
source = "hashicorp/aws"
version = "~> 5.0"
}}
}}
}}
provider "aws" {{
region = var.aws_region
default_tags {{
tags = {{
Project = var.project_name
ManagedBy = "Terraform"
Environment = var.environment
}}
}}
}}
variable "aws_region" {{
description = "AWS region"
type = string
default = "us-west-2"
}}
variable "vpc_cidr" {{
description = "CIDR block for VPC"
type = string
default = "{cidr}"
}}
# VPC Module
module "{name}_vpc" {{
source = "terraform-aws-modules/vpc/aws"
name = "${{var.environment}}-{name}-vpc"
cidr = var.vpc_cidr
azs = ["us-west-2a", "us-west-2b"]
public_subnets = [for k, v in ["us-west-2a", "us-west-2b"] : cidrsubnet(var.vpc_cidr, 8, k)]
private_subnets = [for k, v in ["us-west-2a", "us-west-2b"] : cidrsubnet(var.vpc_cidr, 8, k + 10)]
enable_nat_gateway = true
enable_dns_hostnames = true
enable_dns_support = true
}}
# Outputs
output "vpc_id" {{
description = "ID of the VPC"
value = module.{name}_vpc.vpc_id
}}
```
**Next Steps:**
1. Save as `main.tf`
2. Run `terraform init`
3. Run `terraform plan`
4. Run `terraform apply`
"""
elif resource_type == "ec2" and provider == "aws":
instance_type = options.get("instance_type", "t3.micro")
return f"""**Generated AWS EC2 Terraform Configuration:**
```hcl
# {name} EC2 Instance Configuration
terraform {{
required_version = ">= 1.0"
required_providers {{
aws = {{
source = "hashicorp/aws"
version = "~> 5.0"
}}
}}
}}
variable "instance_type" {{
description = "EC2 instance type"
type = string
default = "{instance_type}"
}}
# Data sources
data "aws_ami" "app_ami" {{
most_recent = true
owners = ["amazon"]
filter {{
name = "name"
values = ["amzn2-ami-hvm-*-x86_64-gp2"]
}}
}}
# EC2 Instance Module
module "{name}_instance" {{
source = "terraform-aws-modules/ec2-instance/aws"
name = "{name}-instance"
instance_type = var.instance_type
ami = data.aws_ami.app_ami.id
create_iam_instance_profile = true
root_block_device = [{{
encrypted = true
volume_type = "gp3"
volume_size = 20
}}]
}}
output "instance_id" {{
value = module.{name}_instance.id
}}
```
"""
elif resource_type == "s3" and provider == "aws":
return f"""**Generated AWS S3 Bucket Terraform Configuration:**
```hcl
# {name} S3 Bucket Configuration
terraform {{
required_providers {{
aws = {{
source = "hashicorp/aws"
version = "~> 5.0"
}}
}}
}}
# Random suffix for unique bucket name
resource "random_string" "bucket_suffix" {{
length = 8
special = false
upper = false
}}
# S3 Bucket Module
module "{name}_s3_bucket" {{
source = "terraform-aws-modules/s3-bucket/aws"
bucket = "{name}-bucket-${{random_string.bucket_suffix.result}}"
versioning = {{
enabled = true
}}
server_side_encryption_configuration = {{
rule = {{
apply_server_side_encryption_by_default = {{
sse_algorithm = "AES256"
}}
}}
}}
block_public_acls = true
block_public_policy = true
}}
output "bucket_name" {{
value = module.{name}_s3_bucket.s3_bucket_id
}}
```
"""
elif resource_type == "eks" and provider == "aws":
return f"""**Generated AWS EKS Cluster Terraform Configuration:**
```hcl
# {name} EKS Cluster Configuration
terraform {{
required_providers {{
aws = {{
source = "hashicorp/aws"
version = "~> 5.0"
}}
}}
}}
# EKS Cluster Module
module "eks" {{
source = "terraform-aws-modules/eks/aws"
cluster_name = "{name}-eks-cluster"
cluster_version = "1.28"
vpc_id = module.vpc.vpc_id
subnet_ids = module.vpc.private_subnets
eks_managed_node_groups = {{
main = {{
name = "{name}-nodes"
instance_types = ["t3.medium"]
min_size = 1
max_size = 3
desired_size = 2
}}
}}
}}
```
"""
else:
return f"""**Terraform Configuration Generation:**
Resource type `{resource_type}` for provider `{provider}` is supported by the server.
**Available Resource Types:**
- vpc, ec2, s3, eks, rds (AWS)
- vnet, compute, aks (Azure)
- Custom configurations
**To generate full configuration, specify:**
- resource_type: {resource_type}
- provider: {provider}
- name: {name}
- options: Custom parameters
"""
elif tool_name == "validate_terraform_config":
config_content = arguments.get("config_content", "")
if not config_content:
return """**Terraform Configuration Validation:**
❌ **No configuration provided**
Please provide Terraform configuration content to validate.
**Example usage:**
- Paste your .tf file contents
- Enable best practices checking
- Get syntax and style recommendations
"""
return f"""**Terraform Configuration Validation Results:**
## βœ… Syntax Validation
- Configuration syntax appears valid
- Terraform blocks found
- Provider configuration detected
## πŸ’‘ Best Practices Review
- Consider adding resource tags
- Use remote state storage for production
- Add variable descriptions
- Include outputs for important resources
## πŸ”§ Recommendations
1. **Run terraform validate** - Official validation
2. **Run terraform fmt** - Format consistency
3. **Add validation rules** - Variable constraints
4. **Security scan** - Use tools like tfsec
**Configuration Length:** {len(config_content)} characters
"""
elif tool_name == "get_deployment_workflow":
workflow_type = arguments.get("workflow_type", "basic")
backend_type = arguments.get("backend_type", "local")
environment = arguments.get("environment", "development")
if workflow_type == "basic":
return f"""**Basic Terraform Deployment Workflow:**
```bash
# Basic Terraform Workflow
terraform init
terraform validate
terraform fmt
terraform plan
terraform apply
terraform output
```
**Environment:** {environment.title()}
**Backend:** {backend_type.upper()}
**Steps Explained:**
1. **init** - Initialize working directory
2. **validate** - Check configuration syntax
3. **fmt** - Format code consistently
4. **plan** - Preview changes
5. **apply** - Create/update resources
6. **output** - Display output values
"""
elif workflow_type == "production":
return f"""**Production Terraform Deployment Workflow:**
```bash
# Production Terraform Workflow with {backend_type.upper()} backend
# 1. Initialize with backend configuration
terraform init
# 2. Validate configuration
terraform validate
# 3. Format code
terraform fmt
# 4. Security scan (optional)
# tfsec .
# 5. Plan and save
terraform plan -out=tfplan
# 6. Review plan output carefully
terraform show tfplan
# 7. Apply saved plan
terraform apply tfplan
# 8. Verify outputs
terraform output
# 9. Clean up plan file
rm tfplan
```
## πŸ›‘οΈ Production Security
- βœ… Use remote state storage
- βœ… Enable state locking
- βœ… Implement approval workflows
- βœ… Run security scans
- βœ… Monitor all changes
"""
elif workflow_type == "module_development":
return f"""**Module Development Workflow:**
```bash
# Module Development Workflow
# 1. Initialize module
terraform init
# 2. Validate module
terraform validate
# 3. Format module code
terraform fmt -recursive
# 4. Generate documentation
# terraform-docs markdown . > README.md
# 5. Test module (in examples/ directory)
cd examples/basic
terraform init
terraform plan
# 6. Clean up test
terraform destroy
cd ../..
# 7. Tag version
git tag v1.0.0
git push --tags
```
**Module Structure:**
```
modules/{environment}/
β”œβ”€β”€ main.tf
β”œβ”€β”€ variables.tf
β”œβ”€β”€ outputs.tf
β”œβ”€β”€ versions.tf
└── examples/
```
"""
elif tool_name == "convert_to_module":
config_content = arguments.get("config_content", "")
module_name = arguments.get("module_name", "terraform-module")
return f"""**Convert to Terraform Module: {module_name}**
## πŸ“¦ Module Structure
```
modules/{module_name}/
β”œβ”€β”€ main.tf # Main configuration
β”œβ”€β”€ variables.tf # Input variables
β”œβ”€β”€ outputs.tf # Output values
β”œβ”€β”€ versions.tf # Provider requirements
β”œβ”€β”€ README.md # Documentation
└── examples/
└── basic/
β”œβ”€β”€ main.tf # Example usage
└── README.md # Example docs
```
## variables.tf
```hcl
variable "project_name" {{
description = "Name of the project"
type = string
}}
variable "environment" {{
description = "Environment name"
type = string
default = "dev"
validation {{
condition = contains(["dev", "staging", "prod"], var.environment)
error_message = "Environment must be dev, staging, or prod."
}}
}}
variable "tags" {{
description = "A map of tags to assign to resources"
type = map(string)
default = {{}}
}}
```
## outputs.tf
```hcl
output "{module_name}_id" {{
description = "ID of the main resource"
value = # Add appropriate resource reference
}}
```
## Example Usage
```hcl
module "{module_name}" {{
source = "./modules/{module_name}"
project_name = "my-project"
environment = "dev"
tags = {{
Team = "infrastructure"
}}
}}
```
"""
elif tool_name == "format_terraform_code":
config_content = arguments.get("config_content", "")
if not config_content:
return """**Terraform Code Formatting:**
❌ **No configuration provided**
**Formatting features:**
- Normalize indentation (2 spaces)
- Consistent spacing
- Proper block alignment
- Standard Terraform formatting
**Usage:** Provide Terraform configuration content to format.
"""
# Simple formatting demonstration
formatted_lines = []
for line in config_content.split('\n'):
stripped = line.strip()
if stripped:
formatted_lines.append(stripped)
return f"""**Terraform Code Formatting Results:**
## ✨ Formatted Configuration
```hcl
{chr(10).join(formatted_lines)}
```
## πŸ“‹ Formatting Applied
- βœ… Normalized indentation
- βœ… Consistent spacing
- βœ… Proper block alignment
- βœ… Standard Terraform style
## πŸ”§ Additional Tips
1. **Use `terraform fmt`** - Official formatter
2. **Configure editor** - Auto-formatting in IDE
3. **Pre-commit hooks** - Format before commits
4. **Consistent naming** - Use lowercase_with_underscores
"""
elif tool_name == "generate_terraform_docs":
config_content = arguments.get("config_content", "")
module_name = arguments.get("module_name", "terraform-module")
return f"""**Terraform Documentation: {module_name}**
## πŸ“‹ Module Overview
This Terraform configuration manages infrastructure resources with automated documentation generation.
## πŸ—οΈ Resources
| Resource Type | Description |
|---------------|-------------|
| `aws_instance` | EC2 compute instances |
| `aws_vpc` | Virtual Private Cloud |
| `aws_s3_bucket` | Object storage bucket |
## πŸ“₯ Input Variables
| Name | Description | Type | Default |
|------|-------------|------|---------|
| `project_name` | Name of the project | `string` | `null` |
| `environment` | Environment name | `string` | `"dev"` |
| `instance_type` | EC2 instance type | `string` | `"t3.micro"` |
## πŸ“€ Outputs
| Name | Description |
|------|-------------|
| `vpc_id` | ID of the VPC |
| `instance_id` | ID of the EC2 instance |
## πŸš€ Usage Example
```hcl
module "{module_name}" {{
source = "./modules/{module_name}"
project_name = "my-project"
environment = "production"
tags = {{
Team = "infrastructure"
Project = "my-project"
}}
}}
```
## πŸ“‹ Requirements
| Name | Version |
|------|---------|
| terraform | >= 1.0 |
| aws | >= 5.0 |
## πŸ”§ Setup Instructions
1. Clone or download this module
2. Configure variables in `terraform.tfvars`
3. Run `terraform init`
4. Run `terraform plan`
5. Run `terraform apply`
"""
else:
return f"❌ Unknown Terraform tool: {tool_name}. Available: generate_terraform_config, validate_terraform_config, get_deployment_workflow, convert_to_module, format_terraform_code, generate_terraform_docs"
def select_linux_tool_and_args(self, user_input: str) -> Dict[str, Any]:
"""Analyze user input to select appropriate Linux tool and arguments"""
user_lower = user_input.lower()
# File operations
if any(word in user_lower for word in ["list", "ls", "show files", "directory"]):
if "detailed" in user_lower or "-l" in user_lower:
return {"tool": "list_files", "args": {"path": ".", "detailed": True}}
else:
return {"tool": "list_files", "args": {"path": ".", "detailed": False}}
elif any(word in user_lower for word in ["read", "cat", "view", "show content"]):
return {"tool": "read_file", "args": {"filepath": "/etc/hostname"}}
elif any(word in user_lower for word in ["write", "create file", "echo"]):
return {"tool": "write_file", "args": {"filepath": "/tmp/test.txt", "content": "Hello from MCP"}}
# System information
elif any(word in user_lower for word in ["system", "info", "uname", "system info"]):
return {"tool": "get_system_info", "args": {}}
elif any(word in user_lower for word in ["processes", "ps", "running", "process list"]):
return {"tool": "list_processes", "args": {"filter": ""}}
elif any(word in user_lower for word in ["memory", "mem", "ram", "memory usage"]):
return {"tool": "get_memory_usage", "args": {}}
elif any(word in user_lower for word in ["disk", "storage", "df", "disk usage"]):
return {"tool": "get_disk_usage", "args": {}}
# Network operations
elif any(word in user_lower for word in ["network", "netstat", "connections"]):
return {"tool": "get_network_info", "args": {}}
elif any(word in user_lower for word in ["ping", "connectivity", "test connection"]):
return {"tool": "ping_host", "args": {"host": "google.com", "count": 4}}
# Process management
elif any(word in user_lower for word in ["kill", "stop", "terminate"]):
return {"tool": "kill_process", "args": {"pid_or_name": "example_process"}}
# Package management
elif any(word in user_lower for word in ["install", "package", "apt", "yum"]):
return {"tool": "manage_packages", "args": {"operation": "install", "package": "curl"}}
elif any(word in user_lower for word in ["search package", "find package"]):
return {"tool": "manage_packages", "args": {"operation": "search", "package": "python"}}
# Service management
elif any(word in user_lower for word in ["service", "systemctl", "start service", "stop service"]):
if "start" in user_lower:
return {"tool": "manage_services", "args": {"service": "nginx", "action": "start"}}
elif "stop" in user_lower:
return {"tool": "manage_services", "args": {"service": "nginx", "action": "stop"}}
elif "status" in user_lower:
return {"tool": "manage_services", "args": {"service": "nginx", "action": "status"}}
else:
return {"tool": "manage_services", "args": {"service": "nginx", "action": "status"}}
# Log viewing
elif any(word in user_lower for word in ["log", "logs", "syslog", "journal"]):
return {"tool": "view_logs", "args": {"log_type": "syslog", "lines": 50}}
# Environment variables
elif any(word in user_lower for word in ["env", "environment", "variables"]):
return {"tool": "get_environment", "args": {}}
# Default - show help
else:
return {"tool": "help", "args": {}}
def select_cisco_tool_and_args(self, user_input: str) -> Dict[str, Any]:
"""Analyze user input to select appropriate Cisco tool and arguments - using ACTUAL tool names"""
user_lower = user_input.lower()
# VLAN management keywords
if any(word in user_lower for word in ["vlan", "vlan management", "create vlan", "vlan config"]):
return {"tool": "vlan_management", "args": {"operation": "create", "vlan_id": 100, "name": "example_vlan"}}
# Interface configuration keywords
elif any(word in user_lower for word in ["interface", "port", "configure interface", "interface config"]):
return {"tool": "interface_configuration", "args": {"interface": "GigabitEthernet0/1", "operation": "configure"}}
# Routing configuration keywords
elif any(word in user_lower for word in ["routing", "ospf", "bgp", "route", "routing config"]):
return {"tool": "routing_configuration", "args": {"protocol": "ospf", "operation": "configure"}}
# Security configuration keywords
elif any(word in user_lower for word in ["security", "acl", "access-list", "firewall", "security config"]):
return {"tool": "security_configuration", "args": {"type": "acl", "operation": "create"}}
# Troubleshooting keywords
elif any(word in user_lower for word in ["troubleshoot", "debug", "problem", "issue", "ping", "trace", "show", "status"]):
return {"tool": "troubleshooting", "args": {"operation": "ping", "target": "8.8.8.8"}}
# Default - show help
else:
return {"tool": "help", "args": {}}
def select_terraform_tool_and_args(self, user_input: str) -> Dict[str, Any]:
"""Analyze user input to select appropriate Terraform tool and arguments"""
user_lower = user_input.lower()
# Configuration generation keywords
if any(word in user_lower for word in ["generate", "create config", "terraform config", "infrastructure"]):
if "vpc" in user_lower:
return {"tool": "generate_terraform_config", "args": {"resource_type": "vpc", "provider": "aws", "name": "example"}}
elif "ec2" in user_lower or "instance" in user_lower:
return {"tool": "generate_terraform_config", "args": {"resource_type": "ec2", "provider": "aws", "name": "example"}}
elif "s3" in user_lower or "bucket" in user_lower:
return {"tool": "generate_terraform_config", "args": {"resource_type": "s3", "provider": "aws", "name": "example"}}
elif "eks" in user_lower or "kubernetes" in user_lower:
return {"tool": "generate_terraform_config", "args": {"resource_type": "eks", "provider": "aws", "name": "example"}}
else:
return {"tool": "generate_terraform_config", "args": {"resource_type": "vpc", "provider": "aws", "name": "example"}}
# Validation keywords
elif any(word in user_lower for word in ["validate", "check", "syntax", "validation"]):
return {"tool": "validate_terraform_config", "args": {"config_content": "# Paste your Terraform configuration here", "check_best_practices": True}}
# Workflow keywords
elif any(word in user_lower for word in ["workflow", "deployment", "deploy", "plan", "apply"]):
if "production" in user_lower or "prod" in user_lower:
return {"tool": "get_deployment_workflow", "args": {"workflow_type": "production", "environment": "production"}}
elif "module" in user_lower:
return {"tool": "get_deployment_workflow", "args": {"workflow_type": "module_development"}}
else:
return {"tool": "get_deployment_workflow", "args": {"workflow_type": "basic", "environment": "development"}}
# Module conversion keywords
elif any(word in user_lower for word in ["module", "convert", "reusable"]):
return {"tool": "convert_to_module", "args": {"config_content": "# Paste configuration to convert", "module_name": "example-module"}}
# Formatting keywords
elif any(word in user_lower for word in ["format", "fmt", "formatting", "style"]):
return {"tool": "format_terraform_code", "args": {"config_content": "# Paste Terraform code to format"}}
# Documentation keywords
elif any(word in user_lower for word in ["documentation", "docs", "document", "readme"]):
return {"tool": "generate_terraform_docs", "args": {"config_content": "# Paste configuration to document", "module_name": "terraform-module"}}
# Default - show help
else:
return {"tool": "help", "args": {}}
"""Analyze user input to select appropriate Cisco tool and arguments based on actual server"""
user_lower = user_input.lower()
# VLAN management keywords
if any(word in user_lower for word in ["vlan", "create vlan", "assign vlan", "show vlan"]):
if "create" in user_lower:
return {"tool": "vlan_management", "args": {"action": "create_vlan", "vlan_id": 100, "vlan_name": "DATA_VLAN"}}
elif "assign" in user_lower:
return {"tool": "vlan_management", "args": {"action": "assign_vlan", "vlan_id": 100, "interface": "GigabitEthernet0/1"}}
elif "delete" in user_lower:
return {"tool": "vlan_management", "args": {"action": "delete_vlan", "vlan_id": 100}}
else:
return {"tool": "vlan_management", "args": {"action": "show_vlan"}}
# Interface configuration keywords
elif any(word in user_lower for word in ["interface", "port", "access", "trunk", "ethernet"]):
if "trunk" in user_lower:
return {"tool": "interface_configuration", "args": {"action": "configure_trunk", "interface": "GigabitEthernet0/1"}}
elif "ip" in user_lower or "address" in user_lower:
return {"tool": "interface_configuration", "args": {"action": "set_ip", "interface": "GigabitEthernet0/1", "ip_address": "192.168.1.1"}}
elif "shutdown" in user_lower:
return {"tool": "interface_configuration", "args": {"action": "shutdown", "interface": "GigabitEthernet0/1"}}
elif "enable" in user_lower or "no shutdown" in user_lower:
return {"tool": "interface_configuration", "args": {"action": "no_shutdown", "interface": "GigabitEthernet0/1"}}
else:
return {"tool": "interface_configuration", "args": {"action": "configure_access", "interface": "GigabitEthernet0/1", "vlan_id": 10}}
# Routing keywords
elif any(word in user_lower for word in ["routing", "route", "ospf", "eigrp", "static"]):
if "ospf" in user_lower:
return {"tool": "routing_configuration", "args": {"protocol": "ospf", "action": "configure", "network": "192.168.1.0 0.0.0.255"}}
elif "static" in user_lower:
return {"tool": "routing_configuration", "args": {"protocol": "static", "action": "configure", "network": "10.0.0.0 255.0.0.0", "next_hop": "192.168.1.1"}}
elif "eigrp" in user_lower:
return {"tool": "routing_configuration", "args": {"protocol": "eigrp", "action": "configure", "network": "192.168.1.0 0.0.0.255"}}
else:
return {"tool": "routing_configuration", "args": {"protocol": "ospf", "action": "show"}}
# Security keywords
elif any(word in user_lower for word in ["security", "ssh", "acl", "access-list", "port-security"]):
if "ssh" in user_lower:
return {"tool": "security_configuration", "args": {"feature": "ssh", "action": "enable"}}
elif "acl" in user_lower or "access-list" in user_lower:
return {"tool": "security_configuration", "args": {"feature": "acl", "action": "configure", "acl_number": 100}}
elif "port-security" in user_lower or "port security" in user_lower:
return {"tool": "security_configuration", "args": {"feature": "port_security", "action": "enable", "interface": "GigabitEthernet0/1"}}
else:
return {"tool": "security_configuration", "args": {"feature": "ssh", "action": "enable"}}
# Troubleshooting keywords
elif any(word in user_lower for word in ["troubleshoot", "debug", "ping", "traceroute", "show", "connectivity"]):
if "connectivity" in user_lower or "ping" in user_lower:
return {"tool": "troubleshooting", "args": {"category": "connectivity", "target": "8.8.8.8"}}
elif "interface" in user_lower:
return {"tool": "troubleshooting", "args": {"category": "interface"}}
elif "routing" in user_lower or "route" in user_lower:
return {"tool": "troubleshooting", "args": {"category": "routing"}}
elif "switching" in user_lower or "switch" in user_lower:
return {"tool": "troubleshooting", "args": {"category": "switching"}}
else:
return {"tool": "troubleshooting", "args": {"category": "general"}}
# Default - show help
else:
return {"tool": "help", "args": {}}
"""Analyze user input to select appropriate Linux tool and arguments"""
user_lower = user_input.lower()
# User management keywords
if any(word in user_lower for word in ["user", "group", "adduser", "useradd", "usermod"]):
if "add" in user_lower and "group" in user_lower:
return {"tool": "user_management", "args": {"action": "add_user_to_group", "username": "example_user", "groupname": "example_group"}}
elif "create" in user_lower or ("add" in user_lower and "user" in user_lower):
return {"tool": "user_management", "args": {"action": "create_user", "username": "new_user"}}
elif "delete" in user_lower and "user" in user_lower:
return {"tool": "user_management", "args": {"action": "delete_user", "username": "old_user"}}
else:
return {"tool": "user_management", "args": {"action": "list_groups"}}
# File permission keywords
elif any(word in user_lower for word in ["permission", "chmod", "chown", "file", "ownership"]):
if "chmod" in user_lower:
return {"tool": "file_permissions", "args": {"action": "chmod", "path": "/path/to/file", "permissions": "755"}}
elif "chown" in user_lower or "ownership" in user_lower:
return {"tool": "file_permissions", "args": {"action": "chown", "path": "/path/to/file", "owner": "username"}}
elif "find" in user_lower:
return {"tool": "file_permissions", "args": {"action": "find_permissions", "path": "/", "permissions": "777"}}
else:
return {"tool": "file_permissions", "args": {"action": "chmod", "path": "/path/to/file", "permissions": "644"}}
# System command keywords
elif any(word in user_lower for word in ["process", "service", "network", "disk", "firewall", "systemctl", "ps"]):
if any(word in user_lower for word in ["process", "ps", "top", "kill"]):
return {"tool": "system_commands", "args": {"category": "process"}}
elif any(word in user_lower for word in ["service", "systemctl", "daemon"]):
return {"tool": "system_commands", "args": {"category": "service"}}
elif any(word in user_lower for word in ["network", "ping", "ip", "interface"]):
return {"tool": "system_commands", "args": {"category": "network"}}
elif any(word in user_lower for word in ["disk", "df", "mount", "storage"]):
return {"tool": "system_commands", "args": {"category": "disk"}}
elif any(word in user_lower for word in ["firewall", "ufw", "iptables"]):
return {"tool": "system_commands", "args": {"category": "firewall"}}
else:
return {"tool": "system_commands", "args": {"category": "process"}}
# Default - show help
else:
return {"tool": "help", "args": {}}
def get_available_tools(self, server_key: str) -> List[Dict[str, Any]]:
"""Get available tools from an MCP server"""
if server_key == "linux":
# Return known tools for Linux server
return self.mcp_servers["linux"]["tools"]
# For other servers, try to call them (though they might not have HTTP endpoints)
try:
result = self.call_mcp_server(server_key, "tools/list", {})
if result["success"]:
data = result["data"]
if isinstance(data, dict):
return data.get("tools", data.get("result", []))
elif isinstance(data, list):
return data
except:
pass
return []
def call_mcp_server(self, server_key: str, endpoint: str, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Make a call to an MCP server (for non-Linux servers)"""
try:
server_config = self.mcp_servers[server_key]
# For FastAPI MCP servers, we need to use different endpoints
if endpoint == "tools/list":
url = f"{server_config['space_url']}/tools"
response = requests.get(url, timeout=30)
elif endpoint == "tools/call":
url = f"{server_config['space_url']}/tools/call"
response = requests.post(
url,
json=payload,
headers={"Content-Type": "application/json"},
timeout=30
)
else:
# Fallback to direct endpoint
url = f"{server_config['space_url']}/{endpoint}"
response = requests.post(
url,
json=payload,
headers={"Content-Type": "application/json"},
timeout=30
)
if response.status_code == 200:
return {"success": True, "data": response.json()}
else:
return {
"success": False,
"error": f"HTTP {response.status_code}: {response.text}",
"server": server_config['name']
}
except requests.exceptions.RequestException as e:
return {"success": False, "error": f"Request failed: {str(e)}", "server": server_config['name']}
except Exception as e:
return {"success": False, "error": f"Unexpected error: {str(e)}", "server": server_config['name']}
def execute_tool(self, server_key: str, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a tool on an MCP server"""
payload = {
"name": tool_name,
"arguments": arguments
}
return self.call_mcp_server(server_key, "tools/call", payload)
def analyze_user_request(self, user_input: str) -> Dict[str, Any]:
"""Use OpenAI to analyze user request and determine which MCP server to use"""
system_prompt = """
You are an intelligent assistant that helps users interact with MCP (Model Context Protocol) servers.
Available MCP servers (use these EXACT keys in your response):
1. terraform: Terraform infrastructure management
2. linux: Linux system operations
3. cisco: Cisco network management
IMPORTANT: For the "recommended_server" field, you MUST use one of these exact keys: "terraform", "linux", or "cisco"
Analyze the user's request and determine which MCP server would be most appropriate.
Respond in JSON format with:
{
"recommended_server": "terraform|linux|cisco",
"reasoning": "explanation of why this server was chosen",
"suggested_action": "what action to take"
}
"""
try:
response = self.openai_client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_input}
],
temperature=0.3
)
content = response.choices[0].message.content.strip()
# Try to parse JSON, handle cases where GPT returns non-JSON
try:
return json.loads(content)
except json.JSONDecodeError:
# If not valid JSON, extract server recommendation manually
content_lower = content.lower()
if "cisco" in content_lower:
recommended_server = "cisco"
elif "linux" in content_lower:
recommended_server = "linux"
elif "terraform" in content_lower:
recommended_server = "terraform"
else:
recommended_server = "cisco" # Default for network questions
return {
"recommended_server": recommended_server,
"reasoning": "Based on content analysis",
"suggested_action": content
}
except Exception as e:
return {
"error": f"Failed to analyze request: {str(e)}",
"recommended_server": "cisco"
}
def generate_response(self, user_input: str, mcp_results: Dict[str, Any] = None) -> str:
"""Generate a natural language response using OpenAI"""
context = ""
if mcp_results:
context = f"\nMCP Server Results: {json.dumps(mcp_results, indent=2)}"
messages = [
{
"role": "system",
"content": f"""
You are a helpful assistant that works with MCP servers for infrastructure management.
Provide clear, helpful responses based on the user's request and any MCP server results.
Available MCP Servers:
- Terraform: Infrastructure as Code management
- Linux: System administration and operations
- Cisco: Network device management
{context}
"""
}
]
# Add conversation history (last 6 messages)
messages.extend(self.conversation_history[-6:])
messages.append({"role": "user", "content": user_input})
try:
response = self.openai_client.chat.completions.create(
model="gpt-4",
messages=messages,
temperature=0.7,
max_tokens=1000
)
return response.choices[0].message.content
except Exception as e:
return f"Error generating response: {str(e)}"
def process_request(self, user_input: str, selected_server: str = None) -> tuple[str, str]:
"""Process a user request end-to-end"""
timestamp = datetime.now().strftime("%H:%M:%S")
# Add to conversation history
self.conversation_history.append({"role": "user", "content": user_input})
try:
# Analyze request if no server specified or auto-detect selected
if not selected_server or selected_server == "Auto-detect":
analysis = self.analyze_user_request(user_input)
if "error" in analysis:
response = f"❌ Analysis Error: {analysis['error']}"
self.conversation_history.append({"role": "assistant", "content": response})
return response, f"[{timestamp}] Analysis failed"
selected_server = analysis.get("recommended_server", "terraform")
reasoning = analysis.get("reasoning", "No reasoning provided")
status = f"[{timestamp}] Selected: {self.mcp_servers[selected_server]['name']} - {reasoning}"
else:
# Map display names back to keys if needed
server_mapping = {
"MCP Terraform": "terraform",
"MCP Linux": "linux",
"MCP Cisco": "cisco",
"Terraform": "terraform",
"Linux": "linux",
"Cisco": "cisco"
}
selected_server = server_mapping.get(selected_server, selected_server.lower())
# Validate server key
if selected_server not in self.mcp_servers:
response = f"❌ Invalid server selection: {selected_server}. Available: {', '.join(self.mcp_servers.keys())}"
self.conversation_history.append({"role": "assistant", "content": response})
return response, f"[{timestamp}] Invalid server"
status = f"[{timestamp}] Using: {self.mcp_servers[selected_server]['name']}"
# Check server health
health_check = self.check_server_health(selected_server)
if not health_check["success"]:
response = f"⚠️ Cannot connect to {self.mcp_servers[selected_server]['name']} server: {health_check.get('error', 'Server offline')}"
self.conversation_history.append({"role": "assistant", "content": response})
return response, status + " - Server offline"
# Handle Linux server specifically (full functionality)
if selected_server == "linux":
tool_selection = self.select_linux_tool_and_args(user_input)
if tool_selection["tool"] == "help":
response = """**Available Linux MCP Tools:**
πŸ”§ **user_management**: Create, delete, and manage users and groups
πŸ“ **file_permissions**: Change file permissions and ownership (chmod, chown)
βš™οΈ **system_commands**: System administration commands (processes, services, network, disk, firewall)
**Example commands you can ask for:**
- "Show me user management commands"
- "How do I change file permissions?"
- "Give me process management commands"
- "Show network diagnostic commands"
- "Help with service management"
- "Configure firewall rules"
- "Check disk usage commands"
"""
else:
# Execute the selected Linux tool
result = self.execute_linux_tool(tool_selection["tool"], tool_selection["args"])
response = result
# Handle Cisco server specifically (full functionality)
elif selected_server == "cisco":
tool_selection = self.select_cisco_tool_and_args(user_input)
if tool_selection["tool"] == "help":
response = """**Available Cisco MCP Tools:**
🏷️ **vlan_management**: Create, delete, assign, and show VLANs
🌐 **interface_configuration**: Configure access/trunk ports, IP addresses, shutdown/enable
πŸ›£οΈ **routing_configuration**: Set up OSPF, EIGRP, static routes
πŸ”’ **security_configuration**: Configure SSH, ACLs, port security
πŸ”§ **troubleshooting**: Network connectivity, interface, routing, switching diagnostics
**Example commands you can ask for:**
- "Create a VLAN" or "Show VLAN information"
- "Configure interface as access port" or "Set up trunk port"
- "Configure OSPF routing" or "Set up static routes"
- "Enable SSH access" or "Configure ACL"
- "Troubleshoot connectivity" or "Check interface status"
"""
else:
# Execute the selected Cisco tool
result = self.execute_cisco_tool(tool_selection["tool"], tool_selection["args"])
response = result
# Handle other servers (Future expansion)
else:
# Try to get server information first
try:
# Check if server provides any status info
server_url = self.mcp_servers[selected_server]['space_url']
status_response = requests.get(server_url, timeout=10)
if status_response.status_code == 200:
try:
server_data = status_response.json()
tools_count = server_data.get('tools', 'unknown')
service_name = server_data.get('service', 'unknown')
if selected_server == "cisco":
response = f"""**Cisco MCP Server** βœ… Connected!
**Server Status**: Online
**Service**: {service_name}
**Available Tools**: {tools_count}
**MCP Endpoint**: {server_url}/mcp/sse
**What you can ask for with Cisco:**
- "Show me Cisco interface configuration commands"
- "Help with VLAN configuration"
- "Generate Cisco routing commands"
- "Show network troubleshooting commands"
- "Configure access control lists"
- "Help with Cisco switch configuration"
**Note**: This server has {tools_count} tools available via MCP protocol. The tools likely include:
- Interface configuration management
- VLAN configuration and management
- Routing protocol setup
- ACL (Access Control List) configuration
- Network troubleshooting commands
- Device status and monitoring
Would you like me to help you with any specific Cisco networking task?
"""
elif selected_server == "terraform":
response = f"""**Terraform MCP Server** βœ… Connected!
**Server Status**: Online
**Service**: {service_name}
**Available Tools**: {tools_count}
**MCP Endpoint**: {server_url}/mcp/sse
**What you can ask for with Terraform:**
- "Show me Terraform plan commands"
- "Help with infrastructure deployment"
- "Generate Terraform configuration"
- "Show resource management commands"
- "Help with Terraform state management"
**Note**: This server has {tools_count} tools available via MCP protocol.
Would you like me to help you with any specific Terraform task?
"""
except json.JSONDecodeError:
# Server responded but not with JSON
if selected_server == "cisco":
response = f"""**Cisco MCP Server** ⚠️ Connected but Limited Info
**Server Status**: Online (non-JSON response)
**MCP Endpoint**: {server_url}/mcp/sse
**Common Cisco operations I can help with:**
- Interface configuration commands
- VLAN setup and management
- Routing protocol configuration
- Access Control Lists (ACLs)
- Network troubleshooting
- Switch and router configuration
Would you like me to help with any specific Cisco networking task?
"""
else:
response = f"""**Terraform MCP Server** ⚠️ Connected but Limited Info
**Server Status**: Online (non-JSON response)
**MCP Endpoint**: {server_url}/mcp/sse
**Common Terraform operations I can help with:**
- Infrastructure planning and deployment
- Resource configuration and management
- State file management
- Module development
- Provider configuration
Would you like me to help with any specific Terraform task?
"""
else:
# Server not responding properly
if selected_server == "cisco":
response = f"""**Cisco MCP Server** ❌ Connection Issue
**Server Status**: {health_check['status']}
**Error**: HTTP {status_response.status_code}
**MCP Endpoint**: {server_url}/mcp/sse
The server appears to be having issues. Please try again later or check if the server is running properly.
"""
else:
response = f"""**Terraform MCP Server** ❌ Connection Issue
**Server Status**: {health_check['status']}
**Error**: HTTP {status_response.status_code}
**MCP Endpoint**: {server_url}/mcp/sse
The server appears to be having issues. Please try again later or check if the server is running properly.
"""
except Exception as e:
# Connection failed
if selected_server == "cisco":
response = f"""**Cisco MCP Server** ❌ Connection Failed
**Error**: {str(e)}
**MCP Endpoint**: {self.mcp_servers[selected_server]['space_url']}/mcp/sse
**Common Cisco operations (offline reference):**
- Interface configuration: `interface GigabitEthernet0/1`
- VLAN configuration: `vlan 10`
- Routing: `ip route 0.0.0.0 0.0.0.0 192.168.1.1`
- ACL: `access-list 10 permit 192.168.1.0 0.0.0.255`
Please check if the server is running and try again.
"""
else:
response = f"""**Terraform MCP Server** ❌ Connection Failed
**Error**: {str(e)}
**MCP Endpoint**: {self.mcp_servers[selected_server]['space_url']}/mcp/sse
**Common Terraform operations (offline reference):**
- Plan: `terraform plan`
- Apply: `terraform apply`
- Destroy: `terraform destroy`
- Validate: `terraform validate`
Please check if the server is running and try again.
"""
self.conversation_history.append({"role": "assistant", "content": response})
return response, status + f" - {health_check['status']}"
except Exception as e:
error_response = f"❌ Error processing request: {str(e)}"
self.conversation_history.append({"role": "assistant", "content": error_response})
return error_response, f"[{timestamp}] Error occurred"
def create_gradio_interface():
"""Create the Gradio interface"""
# Try to get API key from environment (HuggingFace Secrets)
openai_api_key = os.getenv("OPEN_AI_API_KEY")
# Initialize MCP Client if API key is available
mcp_client = None
initial_status = ""
if openai_api_key:
try:
mcp_client = MCPClient(openai_api_key)
initial_status = "βœ… OpenAI client initialized automatically from HF Secrets!"
except Exception as e:
initial_status = f"❌ Failed to initialize with HF Secret: {str(e)}"
else:
initial_status = "⚠️ OpenAI API key not found in HF Secrets. Please enter manually."
def initialize_client(api_key):
nonlocal mcp_client
# Use provided key or fall back to environment
key_to_use = api_key.strip() if api_key and api_key.strip() else openai_api_key
if not key_to_use:
return "❌ Please provide your OpenAI API key", ""
try:
mcp_client = MCPClient(key_to_use)
return "βœ… OpenAI client initialized successfully!", ""
except Exception as e:
return f"❌ Failed to initialize: {str(e)}", ""
def process_message(message, server_choice, history):
if not mcp_client:
return history + [["❌ Error", "Please initialize the OpenAI client first using the button above."]], ""
if not message.strip():
return history, ""
try:
response, status = mcp_client.process_request(message, server_choice)
history.append([message, response])
return history, ""
except Exception as e:
error_msg = f"❌ Error: {str(e)}"
history.append([message, error_msg])
return history, ""
def get_server_status():
if not mcp_client:
return "Please initialize the OpenAI client first."
status_info = "## MCP Servers Status\n\n"
for key, config in mcp_client.mcp_servers.items():
health_check = mcp_client.check_server_health(key)
if health_check["success"]:
if health_check["status"] == "online":
status_icon = "🟒"
status_text = "Online"
else:
status_icon = "🟑"
status_text = "Online (Different Structure)"
else:
status_icon = "πŸ”΄"
status_text = "Offline"
status_info += f"**{config['name']}**: {status_icon} {status_text}\n"
status_info += f"- URL: {config['space_url']}\n"
status_info += f"- Description: {config['description']}\n"
# Show tools for Linux server
if key == "linux" and "tools" in config:
status_info += f"- Available Tools: {len(config['tools'])}\n"
for tool in config['tools']:
status_info += f" - **{tool['name']}**: {tool['description']}\n"
# Show MCP endpoint info
status_info += f"- MCP Endpoint: {config['space_url']}/mcp/sse\n"
if health_check["success"] and "response" in health_check:
response_data = health_check["response"]
if isinstance(response_data, dict) and "service" in response_data:
status_info += f"- Service: {response_data.get('service', 'Unknown')}\n"
status_info += "\n"
return status_info
# Create Gradio interface
with gr.Blocks(title="MCP Client with OpenAI", theme=gr.themes.Soft()) as app:
gr.Markdown("# πŸ€– MCP Client with OpenAI Integration")
gr.Markdown("Connect to your Terraform, Linux, and Cisco MCP servers with AI-powered assistance")
gr.Markdown("πŸ’‘ **Tip**: Your OpenAI API key is automatically loaded from HuggingFace Secrets!")
with gr.Row():
with gr.Column(scale=3):
# API Key input (optional if already in HF Secrets)
api_key = gr.Textbox(
label="OpenAI API Key (Optional - will use HF Secret if available)",
type="password",
placeholder="Leave empty to use HF Secret OPEN_AI_API_KEY...",
value=""
)
init_btn = gr.Button("Initialize/Reinitialize Client", variant="primary")
init_status = gr.Textbox(
label="Initialization Status",
interactive=False,
value=initial_status
)
# Server selection
server_choice = gr.Dropdown(
choices=["Auto-detect", "linux", "terraform", "cisco"],
value="Auto-detect",
label="Select MCP Server"
)
# Chat interface
chatbot = gr.Chatbot(
label="Conversation",
height=400,
show_label=True
)
msg = gr.Textbox(
label="Message",
placeholder="Ask about infrastructure, Linux systems, or Cisco networks...",
lines=2
)
with gr.Row():
send_btn = gr.Button("Send", variant="primary")
clear_btn = gr.Button("Clear Chat")
with gr.Column(scale=1):
gr.Markdown("## πŸ“Š Server Status")
server_status = gr.Markdown("Initialize client to see server status")
refresh_btn = gr.Button("Refresh Status")
gr.Markdown("## πŸ’‘ Example Queries")
gr.Markdown("""
**Linux (Full Support):**
- "Show me user management commands"
- "How do I change file permissions?"
- "Give me process management commands"
- "Show network diagnostic commands"
- "Help with service management"
- "Configure firewall rules"
**Terraform:**
- "Show me terraform commands"
- "Plan infrastructure deployment"
- "Help with terraform apply"
**Cisco:**
- "Show interface configuration"
- "Configure VLAN commands"
- "Help with port configuration"
""")
# Event handlers
init_btn.click(
initialize_client,
inputs=[api_key],
outputs=[init_status, server_status]
)
send_btn.click(
process_message,
inputs=[msg, server_choice, chatbot],
outputs=[chatbot, msg]
)
msg.submit(
process_message,
inputs=[msg, server_choice, chatbot],
outputs=[chatbot, msg]
)
clear_btn.click(
lambda: [],
outputs=[chatbot]
)
refresh_btn.click(
get_server_status,
outputs=[server_status]
)
return app
if __name__ == "__main__":
# Create and launch the Gradio app
app = create_gradio_interface()
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=True,
debug=True
)