Spaces:
Sleeping
Sleeping
| import os | |
| import asyncio | |
| import uvicorn | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import StreamingResponse | |
| import json | |
| from mcp.server import Server | |
| from mcp.types import Tool, TextContent | |
| # Create FastAPI app | |
| app = FastAPI(title="Cisco MCP Server") | |
| # Create MCP server instance | |
| mcp_app = Server("cisco-mcp") | |
| # ============================================================================ | |
| # CISCO MCP HANDLER FUNCTIONS | |
| # ============================================================================ | |
| def handle_vlan_management(args): | |
| """Generate VLAN management commands""" | |
| action = args.get("action") | |
| vlan_id = args.get("vlan_id", 100) | |
| vlan_name = args.get("vlan_name", f"VLAN_{vlan_id}") | |
| interface = args.get("interface", "") | |
| description = args.get("description", "") | |
| commands = [] | |
| if action == "create_vlan": | |
| commands = [ | |
| f"! Create VLAN {vlan_id}", | |
| "configure terminal", | |
| f"vlan {vlan_id}", | |
| f" name {vlan_name}", | |
| f" state active", | |
| ] | |
| if description: | |
| commands.append(f" description {description}") | |
| commands.extend([ | |
| "exit", | |
| "exit", | |
| "", | |
| "! Verify VLAN creation:", | |
| "show vlan brief", | |
| f"show vlan id {vlan_id}" | |
| ]) | |
| elif action == "delete_vlan": | |
| commands = [ | |
| f"! Delete VLAN {vlan_id}", | |
| "configure terminal", | |
| f"no vlan {vlan_id}", | |
| "exit", | |
| "", | |
| "! Verify VLAN deletion:", | |
| "show vlan brief" | |
| ] | |
| elif action == "assign_vlan": | |
| if not interface: | |
| return [TextContent(type="text", text="β Interface required for VLAN assignment")] | |
| commands = [ | |
| f"! Assign interface {interface} to VLAN {vlan_id}", | |
| "configure terminal", | |
| f"interface {interface}", | |
| " switchport mode access", | |
| f" switchport access vlan {vlan_id}", | |
| " no shutdown", | |
| "exit", | |
| "exit", | |
| "", | |
| "! Verify interface assignment:", | |
| f"show interfaces {interface} switchport", | |
| "show vlan brief" | |
| ] | |
| elif action == "show_vlan": | |
| commands = [ | |
| "! VLAN Information Commands", | |
| "show vlan brief", | |
| "show vlan summary", | |
| f"show vlan id {vlan_id}" if vlan_id else "show vlan", | |
| "show interfaces status", | |
| "show spanning-tree vlan brief" | |
| ] | |
| result = "\n".join([cmd for cmd in commands if cmd]) | |
| return [TextContent(type="text", text=result)] | |
| def handle_interface_configuration(args): | |
| """Generate interface configuration commands""" | |
| action = args.get("action") | |
| interface = args.get("interface") | |
| vlan_id = args.get("vlan_id", 1) | |
| ip_address = args.get("ip_address", "") | |
| subnet_mask = args.get("subnet_mask", "255.255.255.0") | |
| description = args.get("description", "") | |
| if not interface: | |
| return [TextContent(type="text", text="β Interface name is required")] | |
| commands = [] | |
| if action == "configure_access": | |
| commands = [ | |
| f"! Configure {interface} as access port for VLAN {vlan_id}", | |
| "configure terminal", | |
| f"interface {interface}", | |
| ] | |
| if description: | |
| commands.append(f" description {description}") | |
| commands.extend([ | |
| " switchport mode access", | |
| f" switchport access vlan {vlan_id}", | |
| " spanning-tree portfast", | |
| " no shutdown", | |
| "exit", | |
| "exit", | |
| "", | |
| "! Verify configuration:", | |
| f"show interfaces {interface} switchport", | |
| f"show interfaces {interface} status" | |
| ]) | |
| elif action == "configure_trunk": | |
| commands = [ | |
| f"! Configure {interface} as trunk port", | |
| "configure terminal", | |
| f"interface {interface}", | |
| ] | |
| if description: | |
| commands.append(f" description {description}") | |
| commands.extend([ | |
| " switchport mode trunk", | |
| " switchport trunk encapsulation dot1q", | |
| f" switchport trunk allowed vlan {vlan_id}" if vlan_id != 1 else " switchport trunk allowed vlan all", | |
| " no shutdown", | |
| "exit", | |
| "exit", | |
| "", | |
| "! Verify trunk configuration:", | |
| f"show interfaces {interface} trunk", | |
| f"show interfaces {interface} switchport" | |
| ]) | |
| elif action == "set_ip": | |
| if not ip_address: | |
| return [TextContent(type="text", text="β IP address is required")] | |
| commands = [ | |
| f"! Configure IP address on {interface}", | |
| "configure terminal", | |
| f"interface {interface}", | |
| f" ip address {ip_address} {subnet_mask}", | |
| ] | |
| if description: | |
| commands.append(f" description {description}") | |
| commands.extend([ | |
| " no shutdown", | |
| "exit", | |
| "exit", | |
| "", | |
| "! Verify IP configuration:", | |
| f"show ip interface {interface}", | |
| "show ip interface brief" | |
| ]) | |
| elif action in ["shutdown", "no_shutdown"]: | |
| cmd = "shutdown" if action == "shutdown" else "no shutdown" | |
| status = "Shutdown" if action == "shutdown" else "Enable" | |
| commands = [ | |
| f"! {status} interface {interface}", | |
| "configure terminal", | |
| f"interface {interface}", | |
| f" {cmd}", | |
| "exit", | |
| "exit", | |
| "", | |
| "! Verify interface status:", | |
| f"show interfaces {interface} status" | |
| ] | |
| result = "\n".join([cmd for cmd in commands if cmd]) | |
| return [TextContent(type="text", text=result)] | |
| def handle_routing_configuration(args): | |
| """Generate routing configuration commands""" | |
| protocol = args.get("protocol") | |
| action = args.get("action") | |
| network = args.get("network", "") | |
| area = args.get("area", "0") | |
| next_hop = args.get("next_hop", "") | |
| process_id = args.get("process_id", 1) | |
| commands = [] | |
| if protocol == "ospf": | |
| if action == "configure": | |
| commands = [ | |
| f"! Configure OSPF process {process_id}", | |
| "configure terminal", | |
| f"router ospf {process_id}", | |
| ] | |
| if network: | |
| commands.append(f" network {network} area {area}") | |
| commands.extend([ | |
| " router-id 1.1.1.1", | |
| "exit", | |
| "exit", | |
| "", | |
| "! Verify OSPF configuration:", | |
| "show ip ospf", | |
| "show ip ospf neighbor", | |
| "show ip ospf database" | |
| ]) | |
| elif action == "show": | |
| commands = [ | |
| "! OSPF Information Commands", | |
| "show ip ospf", | |
| "show ip ospf neighbor", | |
| "show ip ospf database", | |
| "show ip ospf interface", | |
| "show ip route ospf" | |
| ] | |
| elif protocol == "static": | |
| if action == "configure": | |
| if not network or not next_hop: | |
| return [TextContent(type="text", text="β Network and next_hop required for static routes")] | |
| commands = [ | |
| f"! Configure static route to {network}", | |
| "configure terminal", | |
| f"ip route {network} {next_hop}", | |
| "exit", | |
| "", | |
| "! Verify static route:", | |
| "show ip route static", | |
| "show ip route" | |
| ] | |
| elif protocol == "eigrp": | |
| if action == "configure": | |
| commands = [ | |
| f"! Configure EIGRP AS {process_id}", | |
| "configure terminal", | |
| f"router eigrp {process_id}", | |
| ] | |
| if network: | |
| commands.append(f" network {network}") | |
| commands.extend([ | |
| " no auto-summary", | |
| "exit", | |
| "exit", | |
| "", | |
| "! Verify EIGRP configuration:", | |
| "show ip eigrp neighbors", | |
| "show ip eigrp topology" | |
| ]) | |
| result = "\n".join([cmd for cmd in commands if cmd]) | |
| return [TextContent(type="text", text=result)] | |
| def handle_security_configuration(args): | |
| """Generate security configuration commands""" | |
| feature = args.get("feature") | |
| action = args.get("action") | |
| acl_number = args.get("acl_number", 100) | |
| source_ip = args.get("source_ip", "") | |
| destination_ip = args.get("destination_ip", "") | |
| port = args.get("port", 22) | |
| interface = args.get("interface", "") | |
| commands = [] | |
| if feature == "ssh": | |
| if action == "enable": | |
| commands = [ | |
| "! Enable SSH access", | |
| "configure terminal", | |
| "hostname CiscoRouter", | |
| "ip domain-name company.local", | |
| "crypto key generate rsa general-keys modulus 2048", | |
| "ip ssh version 2", | |
| "ip ssh time-out 60", | |
| "ip ssh authentication-retries 3", | |
| "", | |
| "! Configure user account", | |
| "username admin privilege 15 secret cisco123", | |
| "", | |
| "! Configure VTY lines", | |
| "line vty 0 4", | |
| " transport input ssh", | |
| " login local", | |
| "exit", | |
| "exit", | |
| "", | |
| "! Verify SSH configuration:", | |
| "show ip ssh", | |
| "show ssh" | |
| ] | |
| elif feature == "acl": | |
| if action == "configure": | |
| commands = [ | |
| f"! Configure Access Control List {acl_number}", | |
| "configure terminal", | |
| ] | |
| if source_ip and destination_ip: | |
| commands.append(f"access-list {acl_number} permit ip {source_ip} {destination_ip}") | |
| commands.extend([ | |
| f"access-list {acl_number} deny ip any any", | |
| "", | |
| f"! Apply ACL to interface {interface if interface else '[interface-name]'}", | |
| f"interface {interface if interface else '[interface-name]'}", | |
| f" ip access-group {acl_number} in", | |
| "exit", | |
| "exit", | |
| "", | |
| "! Verify ACL:", | |
| f"show access-lists {acl_number}", | |
| f"show ip interface {interface if interface else '[interface]'}" | |
| ]) | |
| elif feature == "port_security": | |
| if action == "enable": | |
| commands = [ | |
| f"! Enable port security on {interface if interface else '[interface-name]'}", | |
| "configure terminal", | |
| f"interface {interface if interface else '[interface-name]'}", | |
| " switchport mode access", | |
| " switchport port-security", | |
| " switchport port-security maximum 2", | |
| " switchport port-security mac-address sticky", | |
| " switchport port-security violation restrict", | |
| "exit", | |
| "exit", | |
| "", | |
| "! Verify port security:", | |
| f"show port-security interface {interface if interface else '[interface]'}", | |
| "show port-security address" | |
| ] | |
| result = "\n".join([cmd for cmd in commands if cmd]) | |
| return [TextContent(type="text", text=result)] | |
| def handle_troubleshooting(args): | |
| """Generate troubleshooting commands""" | |
| category = args.get("category") | |
| target = args.get("target", "8.8.8.8") | |
| interface = args.get("interface", "") | |
| commands = [] | |
| if category == "connectivity": | |
| commands = [ | |
| f"! Connectivity Troubleshooting to {target}", | |
| f"ping {target}", | |
| f"traceroute {target}", | |
| "show ip route", | |
| "show arp", | |
| "show ip interface brief", | |
| "", | |
| "! Extended ping test:", | |
| f"ping {target} repeat 100", | |
| f"ping {target} size 1500" | |
| ] | |
| elif category == "interface": | |
| commands = [ | |
| f"! Interface Troubleshooting for {interface if interface else 'all interfaces'}", | |
| f"show interfaces {interface}" if interface else "show interfaces", | |
| f"show interfaces {interface} status" if interface else "show interfaces status", | |
| f"show controllers {interface}" if interface else "show controllers", | |
| "show ip interface brief", | |
| "", | |
| "! Interface statistics:", | |
| f"show interfaces {interface} | include error" if interface else "show interfaces | include error" | |
| ] | |
| elif category == "routing": | |
| commands = [ | |
| "! Routing Troubleshooting", | |
| "show ip route", | |
| "show ip protocols", | |
| "show ip ospf neighbor", | |
| "show ip eigrp neighbors", | |
| "show cdp neighbors", | |
| "", | |
| "! Routing table analysis:", | |
| "show ip route summary", | |
| f"show ip route {target}" if target else "show ip route 0.0.0.0" | |
| ] | |
| elif category == "switching": | |
| commands = [ | |
| "! Switching Troubleshooting", | |
| "show vlan brief", | |
| "show interfaces status", | |
| "show spanning-tree", | |
| "show mac address-table", | |
| "show cdp neighbors detail", | |
| "", | |
| "! Port and VLAN status:", | |
| "show interfaces trunk", | |
| "show spanning-tree blockedports" | |
| ] | |
| elif category == "general": | |
| commands = [ | |
| "! General Device Information", | |
| "show version", | |
| "show running-config", | |
| "show startup-config", | |
| "show processes cpu", | |
| "show memory", | |
| "show environment", | |
| "show logging", | |
| "", | |
| "! System status:", | |
| "show clock", | |
| "show users", | |
| "show sessions" | |
| ] | |
| result = "\n".join(commands) | |
| return [TextContent(type="text", text=result)] | |
| # ============================================================================ | |
| # MCP SERVER CONFIGURATION | |
| # ============================================================================ | |
| async def list_tools(): | |
| """List all available Cisco networking tools""" | |
| return [ | |
| Tool( | |
| name="vlan_management", | |
| description="Generate Cisco VLAN configuration commands", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "action": {"type": "string", "enum": ["create_vlan", "delete_vlan", "assign_vlan", "show_vlan"]}, | |
| "vlan_id": {"type": "integer", "minimum": 1, "maximum": 4094}, | |
| "vlan_name": {"type": "string"}, | |
| "interface": {"type": "string"}, | |
| "description": {"type": "string"} | |
| }, | |
| "required": ["action"] | |
| } | |
| ), | |
| Tool( | |
| name="interface_configuration", | |
| description="Generate Cisco interface configuration commands", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "action": {"type": "string", "enum": ["configure_access", "configure_trunk", "set_ip", "shutdown", "no_shutdown"]}, | |
| "interface": {"type": "string"}, | |
| "vlan_id": {"type": "integer"}, | |
| "ip_address": {"type": "string"}, | |
| "subnet_mask": {"type": "string"}, | |
| "description": {"type": "string"} | |
| }, | |
| "required": ["action", "interface"] | |
| } | |
| ), | |
| Tool( | |
| name="routing_configuration", | |
| description="Generate Cisco routing protocol commands", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "protocol": {"type": "string", "enum": ["ospf", "eigrp", "static", "rip"]}, | |
| "action": {"type": "string", "enum": ["configure", "remove", "show"]}, | |
| "network": {"type": "string"}, | |
| "area": {"type": "string"}, | |
| "next_hop": {"type": "string"}, | |
| "process_id": {"type": "integer"} | |
| }, | |
| "required": ["protocol", "action"] | |
| } | |
| ), | |
| Tool( | |
| name="security_configuration", | |
| description="Generate Cisco security commands", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "feature": {"type": "string", "enum": ["ssh", "acl", "port_security", "dhcp_snooping"]}, | |
| "action": {"type": "string", "enum": ["enable", "disable", "configure", "show"]}, | |
| "acl_number": {"type": "integer"}, | |
| "source_ip": {"type": "string"}, | |
| "destination_ip": {"type": "string"}, | |
| "interface": {"type": "string"} | |
| }, | |
| "required": ["feature", "action"] | |
| } | |
| ), | |
| Tool( | |
| name="troubleshooting", | |
| description="Generate Cisco troubleshooting commands", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "category": {"type": "string", "enum": ["connectivity", "interface", "routing", "switching", "general"]}, | |
| "target": {"type": "string"}, | |
| "interface": {"type": "string"} | |
| }, | |
| "required": ["category"] | |
| } | |
| ) | |
| ] | |
| async def call_tool(name: str, arguments: dict): | |
| """Handle tool calls""" | |
| try: | |
| if name == "vlan_management": | |
| return handle_vlan_management(arguments) | |
| elif name == "interface_configuration": | |
| return handle_interface_configuration(arguments) | |
| elif name == "routing_configuration": | |
| return handle_routing_configuration(arguments) | |
| elif name == "security_configuration": | |
| return handle_security_configuration(arguments) | |
| elif name == "troubleshooting": | |
| return handle_troubleshooting(arguments) | |
| else: | |
| return [TextContent(type="text", text=f"Unknown tool: {name}")] | |
| except Exception as e: | |
| return [TextContent(type="text", text=f"Error: {str(e)}")] | |
| # ============================================================================ | |
| # FASTAPI WEB ENDPOINTS | |
| # ============================================================================ | |
| async def root(): | |
| """Status endpoint""" | |
| return { | |
| "service": "cisco-mcp-server", | |
| "status": "running", | |
| "mcp_endpoint": "/mcp/sse", | |
| "tools": 5 | |
| } | |
| async def health(): | |
| """Health check""" | |
| return {"status": "healthy"} | |
| async def mcp_sse_endpoint(request: Request): | |
| """MCP SSE endpoint for agent connections""" | |
| async def event_stream(): | |
| try: | |
| # This would be replaced with actual MCP SSE transport | |
| # For now, just keep connection alive | |
| while True: | |
| # Check if client disconnected | |
| if await request.is_disconnected(): | |
| break | |
| # Send keepalive | |
| yield f"data: {json.dumps({'type': 'keepalive'})}\n\n" | |
| await asyncio.sleep(30) | |
| except asyncio.CancelledError: | |
| pass | |
| return StreamingResponse( | |
| event_stream(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "Access-Control-Allow-Origin": "*", | |
| } | |
| ) | |
| def main(): | |
| """Main entry point""" | |
| port = int(os.getenv("PORT", 7860)) | |
| print(f"π Starting Cisco MCP Server on port {port}") | |
| print(f"π Status: http://localhost:{port}/") | |
| print(f"π MCP SSE: http://localhost:{port}/mcp/sse") | |
| uvicorn.run(app, host="0.0.0.0", port=port, log_level="info") | |
| if __name__ == "__main__": | |
| main() |