import os import asyncio import uvicorn from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse import json from mcp.server import Server from mcp.types import Tool, TextContent # Create FastAPI app app = FastAPI(title="Cisco MCP Server") # Create MCP server instance mcp_app = Server("cisco-mcp") # ============================================================================ # CISCO MCP HANDLER FUNCTIONS # ============================================================================ def handle_vlan_management(args): """Generate VLAN management commands""" action = args.get("action") vlan_id = args.get("vlan_id", 100) vlan_name = args.get("vlan_name", f"VLAN_{vlan_id}") interface = args.get("interface", "") description = args.get("description", "") commands = [] if action == "create_vlan": commands = [ f"! Create VLAN {vlan_id}", "configure terminal", f"vlan {vlan_id}", f" name {vlan_name}", f" state active", ] if description: commands.append(f" description {description}") commands.extend([ "exit", "exit", "", "! Verify VLAN creation:", "show vlan brief", f"show vlan id {vlan_id}" ]) elif action == "delete_vlan": commands = [ f"! Delete VLAN {vlan_id}", "configure terminal", f"no vlan {vlan_id}", "exit", "", "! Verify VLAN deletion:", "show vlan brief" ] elif action == "assign_vlan": if not interface: return [TextContent(type="text", text="❌ Interface required for VLAN assignment")] commands = [ f"! Assign interface {interface} to VLAN {vlan_id}", "configure terminal", f"interface {interface}", " switchport mode access", f" switchport access vlan {vlan_id}", " no shutdown", "exit", "exit", "", "! Verify interface assignment:", f"show interfaces {interface} switchport", "show vlan brief" ] elif action == "show_vlan": commands = [ "! VLAN Information Commands", "show vlan brief", "show vlan summary", f"show vlan id {vlan_id}" if vlan_id else "show vlan", "show interfaces status", "show spanning-tree vlan brief" ] result = "\n".join([cmd for cmd in commands if cmd]) return [TextContent(type="text", text=result)] def handle_interface_configuration(args): """Generate interface configuration commands""" action = args.get("action") interface = args.get("interface") vlan_id = args.get("vlan_id", 1) ip_address = args.get("ip_address", "") subnet_mask = args.get("subnet_mask", "255.255.255.0") description = args.get("description", "") if not interface: return [TextContent(type="text", text="❌ Interface name is required")] commands = [] if action == "configure_access": commands = [ f"! Configure {interface} as access port for VLAN {vlan_id}", "configure terminal", f"interface {interface}", ] if description: commands.append(f" description {description}") commands.extend([ " switchport mode access", f" switchport access vlan {vlan_id}", " spanning-tree portfast", " no shutdown", "exit", "exit", "", "! Verify configuration:", f"show interfaces {interface} switchport", f"show interfaces {interface} status" ]) elif action == "configure_trunk": commands = [ f"! Configure {interface} as trunk port", "configure terminal", f"interface {interface}", ] if description: commands.append(f" description {description}") commands.extend([ " switchport mode trunk", " switchport trunk encapsulation dot1q", f" switchport trunk allowed vlan {vlan_id}" if vlan_id != 1 else " switchport trunk allowed vlan all", " no shutdown", "exit", "exit", "", "! Verify trunk configuration:", f"show interfaces {interface} trunk", f"show interfaces {interface} switchport" ]) elif action == "set_ip": if not ip_address: return [TextContent(type="text", text="❌ IP address is required")] commands = [ f"! Configure IP address on {interface}", "configure terminal", f"interface {interface}", f" ip address {ip_address} {subnet_mask}", ] if description: commands.append(f" description {description}") commands.extend([ " no shutdown", "exit", "exit", "", "! Verify IP configuration:", f"show ip interface {interface}", "show ip interface brief" ]) elif action in ["shutdown", "no_shutdown"]: cmd = "shutdown" if action == "shutdown" else "no shutdown" status = "Shutdown" if action == "shutdown" else "Enable" commands = [ f"! {status} interface {interface}", "configure terminal", f"interface {interface}", f" {cmd}", "exit", "exit", "", "! Verify interface status:", f"show interfaces {interface} status" ] result = "\n".join([cmd for cmd in commands if cmd]) return [TextContent(type="text", text=result)] def handle_routing_configuration(args): """Generate routing configuration commands""" protocol = args.get("protocol") action = args.get("action") network = args.get("network", "") area = args.get("area", "0") next_hop = args.get("next_hop", "") process_id = args.get("process_id", 1) commands = [] if protocol == "ospf": if action == "configure": commands = [ f"! Configure OSPF process {process_id}", "configure terminal", f"router ospf {process_id}", ] if network: commands.append(f" network {network} area {area}") commands.extend([ " router-id 1.1.1.1", "exit", "exit", "", "! Verify OSPF configuration:", "show ip ospf", "show ip ospf neighbor", "show ip ospf database" ]) elif action == "show": commands = [ "! OSPF Information Commands", "show ip ospf", "show ip ospf neighbor", "show ip ospf database", "show ip ospf interface", "show ip route ospf" ] elif protocol == "static": if action == "configure": if not network or not next_hop: return [TextContent(type="text", text="❌ Network and next_hop required for static routes")] commands = [ f"! Configure static route to {network}", "configure terminal", f"ip route {network} {next_hop}", "exit", "", "! Verify static route:", "show ip route static", "show ip route" ] elif protocol == "eigrp": if action == "configure": commands = [ f"! Configure EIGRP AS {process_id}", "configure terminal", f"router eigrp {process_id}", ] if network: commands.append(f" network {network}") commands.extend([ " no auto-summary", "exit", "exit", "", "! Verify EIGRP configuration:", "show ip eigrp neighbors", "show ip eigrp topology" ]) result = "\n".join([cmd for cmd in commands if cmd]) return [TextContent(type="text", text=result)] def handle_security_configuration(args): """Generate security configuration commands""" feature = args.get("feature") action = args.get("action") acl_number = args.get("acl_number", 100) source_ip = args.get("source_ip", "") destination_ip = args.get("destination_ip", "") port = args.get("port", 22) interface = args.get("interface", "") commands = [] if feature == "ssh": if action == "enable": commands = [ "! Enable SSH access", "configure terminal", "hostname CiscoRouter", "ip domain-name company.local", "crypto key generate rsa general-keys modulus 2048", "ip ssh version 2", "ip ssh time-out 60", "ip ssh authentication-retries 3", "", "! Configure user account", "username admin privilege 15 secret cisco123", "", "! Configure VTY lines", "line vty 0 4", " transport input ssh", " login local", "exit", "exit", "", "! Verify SSH configuration:", "show ip ssh", "show ssh" ] elif feature == "acl": if action == "configure": commands = [ f"! Configure Access Control List {acl_number}", "configure terminal", ] if source_ip and destination_ip: commands.append(f"access-list {acl_number} permit ip {source_ip} {destination_ip}") commands.extend([ f"access-list {acl_number} deny ip any any", "", f"! Apply ACL to interface {interface if interface else '[interface-name]'}", f"interface {interface if interface else '[interface-name]'}", f" ip access-group {acl_number} in", "exit", "exit", "", "! Verify ACL:", f"show access-lists {acl_number}", f"show ip interface {interface if interface else '[interface]'}" ]) elif feature == "port_security": if action == "enable": commands = [ f"! Enable port security on {interface if interface else '[interface-name]'}", "configure terminal", f"interface {interface if interface else '[interface-name]'}", " switchport mode access", " switchport port-security", " switchport port-security maximum 2", " switchport port-security mac-address sticky", " switchport port-security violation restrict", "exit", "exit", "", "! Verify port security:", f"show port-security interface {interface if interface else '[interface]'}", "show port-security address" ] result = "\n".join([cmd for cmd in commands if cmd]) return [TextContent(type="text", text=result)] def handle_troubleshooting(args): """Generate troubleshooting commands""" category = args.get("category") target = args.get("target", "8.8.8.8") interface = args.get("interface", "") commands = [] if category == "connectivity": commands = [ f"! Connectivity Troubleshooting to {target}", f"ping {target}", f"traceroute {target}", "show ip route", "show arp", "show ip interface brief", "", "! Extended ping test:", f"ping {target} repeat 100", f"ping {target} size 1500" ] elif category == "interface": commands = [ f"! Interface Troubleshooting for {interface if interface else 'all interfaces'}", f"show interfaces {interface}" if interface else "show interfaces", f"show interfaces {interface} status" if interface else "show interfaces status", f"show controllers {interface}" if interface else "show controllers", "show ip interface brief", "", "! Interface statistics:", f"show interfaces {interface} | include error" if interface else "show interfaces | include error" ] elif category == "routing": commands = [ "! Routing Troubleshooting", "show ip route", "show ip protocols", "show ip ospf neighbor", "show ip eigrp neighbors", "show cdp neighbors", "", "! Routing table analysis:", "show ip route summary", f"show ip route {target}" if target else "show ip route 0.0.0.0" ] elif category == "switching": commands = [ "! Switching Troubleshooting", "show vlan brief", "show interfaces status", "show spanning-tree", "show mac address-table", "show cdp neighbors detail", "", "! Port and VLAN status:", "show interfaces trunk", "show spanning-tree blockedports" ] elif category == "general": commands = [ "! General Device Information", "show version", "show running-config", "show startup-config", "show processes cpu", "show memory", "show environment", "show logging", "", "! System status:", "show clock", "show users", "show sessions" ] result = "\n".join(commands) return [TextContent(type="text", text=result)] # ============================================================================ # MCP SERVER CONFIGURATION # ============================================================================ @mcp_app.list_tools() async def list_tools(): """List all available Cisco networking tools""" return [ Tool( name="vlan_management", description="Generate Cisco VLAN configuration commands", inputSchema={ "type": "object", "properties": { "action": {"type": "string", "enum": ["create_vlan", "delete_vlan", "assign_vlan", "show_vlan"]}, "vlan_id": {"type": "integer", "minimum": 1, "maximum": 4094}, "vlan_name": {"type": "string"}, "interface": {"type": "string"}, "description": {"type": "string"} }, "required": ["action"] } ), Tool( name="interface_configuration", description="Generate Cisco interface configuration commands", inputSchema={ "type": "object", "properties": { "action": {"type": "string", "enum": ["configure_access", "configure_trunk", "set_ip", "shutdown", "no_shutdown"]}, "interface": {"type": "string"}, "vlan_id": {"type": "integer"}, "ip_address": {"type": "string"}, "subnet_mask": {"type": "string"}, "description": {"type": "string"} }, "required": ["action", "interface"] } ), Tool( name="routing_configuration", description="Generate Cisco routing protocol commands", inputSchema={ "type": "object", "properties": { "protocol": {"type": "string", "enum": ["ospf", "eigrp", "static", "rip"]}, "action": {"type": "string", "enum": ["configure", "remove", "show"]}, "network": {"type": "string"}, "area": {"type": "string"}, "next_hop": {"type": "string"}, "process_id": {"type": "integer"} }, "required": ["protocol", "action"] } ), Tool( name="security_configuration", description="Generate Cisco security commands", inputSchema={ "type": "object", "properties": { "feature": {"type": "string", "enum": ["ssh", "acl", "port_security", "dhcp_snooping"]}, "action": {"type": "string", "enum": ["enable", "disable", "configure", "show"]}, "acl_number": {"type": "integer"}, "source_ip": {"type": "string"}, "destination_ip": {"type": "string"}, "interface": {"type": "string"} }, "required": ["feature", "action"] } ), Tool( name="troubleshooting", description="Generate Cisco troubleshooting commands", inputSchema={ "type": "object", "properties": { "category": {"type": "string", "enum": ["connectivity", "interface", "routing", "switching", "general"]}, "target": {"type": "string"}, "interface": {"type": "string"} }, "required": ["category"] } ) ] @mcp_app.call_tool() async def call_tool(name: str, arguments: dict): """Handle tool calls""" try: if name == "vlan_management": return handle_vlan_management(arguments) elif name == "interface_configuration": return handle_interface_configuration(arguments) elif name == "routing_configuration": return handle_routing_configuration(arguments) elif name == "security_configuration": return handle_security_configuration(arguments) elif name == "troubleshooting": return handle_troubleshooting(arguments) else: return [TextContent(type="text", text=f"Unknown tool: {name}")] except Exception as e: return [TextContent(type="text", text=f"Error: {str(e)}")] # ============================================================================ # FASTAPI WEB ENDPOINTS # ============================================================================ @app.get("/") async def root(): """Status endpoint""" return { "service": "cisco-mcp-server", "status": "running", "mcp_endpoint": "/mcp/sse", "tools": 5 } @app.get("/health") async def health(): """Health check""" return {"status": "healthy"} @app.get("/mcp/sse") async def mcp_sse_endpoint(request: Request): """MCP SSE endpoint for agent connections""" async def event_stream(): try: # This would be replaced with actual MCP SSE transport # For now, just keep connection alive while True: # Check if client disconnected if await request.is_disconnected(): break # Send keepalive yield f"data: {json.dumps({'type': 'keepalive'})}\n\n" await asyncio.sleep(30) except asyncio.CancelledError: pass return StreamingResponse( event_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", } ) def main(): """Main entry point""" port = int(os.getenv("PORT", 7860)) print(f"🚀 Starting Cisco MCP Server on port {port}") print(f"📊 Status: http://localhost:{port}/") print(f"🔗 MCP SSE: http://localhost:{port}/mcp/sse") uvicorn.run(app, host="0.0.0.0", port=port, log_level="info") if __name__ == "__main__": main()