mcp-Cisco / app.py
ChrisSacrumCor's picture
Create app.py
e4bf787 verified
import os
import asyncio
import uvicorn
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import json
from mcp.server import Server
from mcp.types import Tool, TextContent
# Create FastAPI app
app = FastAPI(title="Cisco MCP Server")
# Create MCP server instance
mcp_app = Server("cisco-mcp")
# ============================================================================
# CISCO MCP HANDLER FUNCTIONS
# ============================================================================
def handle_vlan_management(args):
"""Generate VLAN management commands"""
action = args.get("action")
vlan_id = args.get("vlan_id", 100)
vlan_name = args.get("vlan_name", f"VLAN_{vlan_id}")
interface = args.get("interface", "")
description = args.get("description", "")
commands = []
if action == "create_vlan":
commands = [
f"! Create VLAN {vlan_id}",
"configure terminal",
f"vlan {vlan_id}",
f" name {vlan_name}",
f" state active",
]
if description:
commands.append(f" description {description}")
commands.extend([
"exit",
"exit",
"",
"! Verify VLAN creation:",
"show vlan brief",
f"show vlan id {vlan_id}"
])
elif action == "delete_vlan":
commands = [
f"! Delete VLAN {vlan_id}",
"configure terminal",
f"no vlan {vlan_id}",
"exit",
"",
"! Verify VLAN deletion:",
"show vlan brief"
]
elif action == "assign_vlan":
if not interface:
return [TextContent(type="text", text="❌ Interface required for VLAN assignment")]
commands = [
f"! Assign interface {interface} to VLAN {vlan_id}",
"configure terminal",
f"interface {interface}",
" switchport mode access",
f" switchport access vlan {vlan_id}",
" no shutdown",
"exit",
"exit",
"",
"! Verify interface assignment:",
f"show interfaces {interface} switchport",
"show vlan brief"
]
elif action == "show_vlan":
commands = [
"! VLAN Information Commands",
"show vlan brief",
"show vlan summary",
f"show vlan id {vlan_id}" if vlan_id else "show vlan",
"show interfaces status",
"show spanning-tree vlan brief"
]
result = "\n".join([cmd for cmd in commands if cmd])
return [TextContent(type="text", text=result)]
def handle_interface_configuration(args):
"""Generate interface configuration commands"""
action = args.get("action")
interface = args.get("interface")
vlan_id = args.get("vlan_id", 1)
ip_address = args.get("ip_address", "")
subnet_mask = args.get("subnet_mask", "255.255.255.0")
description = args.get("description", "")
if not interface:
return [TextContent(type="text", text="❌ Interface name is required")]
commands = []
if action == "configure_access":
commands = [
f"! Configure {interface} as access port for VLAN {vlan_id}",
"configure terminal",
f"interface {interface}",
]
if description:
commands.append(f" description {description}")
commands.extend([
" switchport mode access",
f" switchport access vlan {vlan_id}",
" spanning-tree portfast",
" no shutdown",
"exit",
"exit",
"",
"! Verify configuration:",
f"show interfaces {interface} switchport",
f"show interfaces {interface} status"
])
elif action == "configure_trunk":
commands = [
f"! Configure {interface} as trunk port",
"configure terminal",
f"interface {interface}",
]
if description:
commands.append(f" description {description}")
commands.extend([
" switchport mode trunk",
" switchport trunk encapsulation dot1q",
f" switchport trunk allowed vlan {vlan_id}" if vlan_id != 1 else " switchport trunk allowed vlan all",
" no shutdown",
"exit",
"exit",
"",
"! Verify trunk configuration:",
f"show interfaces {interface} trunk",
f"show interfaces {interface} switchport"
])
elif action == "set_ip":
if not ip_address:
return [TextContent(type="text", text="❌ IP address is required")]
commands = [
f"! Configure IP address on {interface}",
"configure terminal",
f"interface {interface}",
f" ip address {ip_address} {subnet_mask}",
]
if description:
commands.append(f" description {description}")
commands.extend([
" no shutdown",
"exit",
"exit",
"",
"! Verify IP configuration:",
f"show ip interface {interface}",
"show ip interface brief"
])
elif action in ["shutdown", "no_shutdown"]:
cmd = "shutdown" if action == "shutdown" else "no shutdown"
status = "Shutdown" if action == "shutdown" else "Enable"
commands = [
f"! {status} interface {interface}",
"configure terminal",
f"interface {interface}",
f" {cmd}",
"exit",
"exit",
"",
"! Verify interface status:",
f"show interfaces {interface} status"
]
result = "\n".join([cmd for cmd in commands if cmd])
return [TextContent(type="text", text=result)]
def handle_routing_configuration(args):
"""Generate routing configuration commands"""
protocol = args.get("protocol")
action = args.get("action")
network = args.get("network", "")
area = args.get("area", "0")
next_hop = args.get("next_hop", "")
process_id = args.get("process_id", 1)
commands = []
if protocol == "ospf":
if action == "configure":
commands = [
f"! Configure OSPF process {process_id}",
"configure terminal",
f"router ospf {process_id}",
]
if network:
commands.append(f" network {network} area {area}")
commands.extend([
" router-id 1.1.1.1",
"exit",
"exit",
"",
"! Verify OSPF configuration:",
"show ip ospf",
"show ip ospf neighbor",
"show ip ospf database"
])
elif action == "show":
commands = [
"! OSPF Information Commands",
"show ip ospf",
"show ip ospf neighbor",
"show ip ospf database",
"show ip ospf interface",
"show ip route ospf"
]
elif protocol == "static":
if action == "configure":
if not network or not next_hop:
return [TextContent(type="text", text="❌ Network and next_hop required for static routes")]
commands = [
f"! Configure static route to {network}",
"configure terminal",
f"ip route {network} {next_hop}",
"exit",
"",
"! Verify static route:",
"show ip route static",
"show ip route"
]
elif protocol == "eigrp":
if action == "configure":
commands = [
f"! Configure EIGRP AS {process_id}",
"configure terminal",
f"router eigrp {process_id}",
]
if network:
commands.append(f" network {network}")
commands.extend([
" no auto-summary",
"exit",
"exit",
"",
"! Verify EIGRP configuration:",
"show ip eigrp neighbors",
"show ip eigrp topology"
])
result = "\n".join([cmd for cmd in commands if cmd])
return [TextContent(type="text", text=result)]
def handle_security_configuration(args):
"""Generate security configuration commands"""
feature = args.get("feature")
action = args.get("action")
acl_number = args.get("acl_number", 100)
source_ip = args.get("source_ip", "")
destination_ip = args.get("destination_ip", "")
port = args.get("port", 22)
interface = args.get("interface", "")
commands = []
if feature == "ssh":
if action == "enable":
commands = [
"! Enable SSH access",
"configure terminal",
"hostname CiscoRouter",
"ip domain-name company.local",
"crypto key generate rsa general-keys modulus 2048",
"ip ssh version 2",
"ip ssh time-out 60",
"ip ssh authentication-retries 3",
"",
"! Configure user account",
"username admin privilege 15 secret cisco123",
"",
"! Configure VTY lines",
"line vty 0 4",
" transport input ssh",
" login local",
"exit",
"exit",
"",
"! Verify SSH configuration:",
"show ip ssh",
"show ssh"
]
elif feature == "acl":
if action == "configure":
commands = [
f"! Configure Access Control List {acl_number}",
"configure terminal",
]
if source_ip and destination_ip:
commands.append(f"access-list {acl_number} permit ip {source_ip} {destination_ip}")
commands.extend([
f"access-list {acl_number} deny ip any any",
"",
f"! Apply ACL to interface {interface if interface else '[interface-name]'}",
f"interface {interface if interface else '[interface-name]'}",
f" ip access-group {acl_number} in",
"exit",
"exit",
"",
"! Verify ACL:",
f"show access-lists {acl_number}",
f"show ip interface {interface if interface else '[interface]'}"
])
elif feature == "port_security":
if action == "enable":
commands = [
f"! Enable port security on {interface if interface else '[interface-name]'}",
"configure terminal",
f"interface {interface if interface else '[interface-name]'}",
" switchport mode access",
" switchport port-security",
" switchport port-security maximum 2",
" switchport port-security mac-address sticky",
" switchport port-security violation restrict",
"exit",
"exit",
"",
"! Verify port security:",
f"show port-security interface {interface if interface else '[interface]'}",
"show port-security address"
]
result = "\n".join([cmd for cmd in commands if cmd])
return [TextContent(type="text", text=result)]
def handle_troubleshooting(args):
"""Generate troubleshooting commands"""
category = args.get("category")
target = args.get("target", "8.8.8.8")
interface = args.get("interface", "")
commands = []
if category == "connectivity":
commands = [
f"! Connectivity Troubleshooting to {target}",
f"ping {target}",
f"traceroute {target}",
"show ip route",
"show arp",
"show ip interface brief",
"",
"! Extended ping test:",
f"ping {target} repeat 100",
f"ping {target} size 1500"
]
elif category == "interface":
commands = [
f"! Interface Troubleshooting for {interface if interface else 'all interfaces'}",
f"show interfaces {interface}" if interface else "show interfaces",
f"show interfaces {interface} status" if interface else "show interfaces status",
f"show controllers {interface}" if interface else "show controllers",
"show ip interface brief",
"",
"! Interface statistics:",
f"show interfaces {interface} | include error" if interface else "show interfaces | include error"
]
elif category == "routing":
commands = [
"! Routing Troubleshooting",
"show ip route",
"show ip protocols",
"show ip ospf neighbor",
"show ip eigrp neighbors",
"show cdp neighbors",
"",
"! Routing table analysis:",
"show ip route summary",
f"show ip route {target}" if target else "show ip route 0.0.0.0"
]
elif category == "switching":
commands = [
"! Switching Troubleshooting",
"show vlan brief",
"show interfaces status",
"show spanning-tree",
"show mac address-table",
"show cdp neighbors detail",
"",
"! Port and VLAN status:",
"show interfaces trunk",
"show spanning-tree blockedports"
]
elif category == "general":
commands = [
"! General Device Information",
"show version",
"show running-config",
"show startup-config",
"show processes cpu",
"show memory",
"show environment",
"show logging",
"",
"! System status:",
"show clock",
"show users",
"show sessions"
]
result = "\n".join(commands)
return [TextContent(type="text", text=result)]
# ============================================================================
# MCP SERVER CONFIGURATION
# ============================================================================
@mcp_app.list_tools()
async def list_tools():
"""List all available Cisco networking tools"""
return [
Tool(
name="vlan_management",
description="Generate Cisco VLAN configuration commands",
inputSchema={
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["create_vlan", "delete_vlan", "assign_vlan", "show_vlan"]},
"vlan_id": {"type": "integer", "minimum": 1, "maximum": 4094},
"vlan_name": {"type": "string"},
"interface": {"type": "string"},
"description": {"type": "string"}
},
"required": ["action"]
}
),
Tool(
name="interface_configuration",
description="Generate Cisco interface configuration commands",
inputSchema={
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["configure_access", "configure_trunk", "set_ip", "shutdown", "no_shutdown"]},
"interface": {"type": "string"},
"vlan_id": {"type": "integer"},
"ip_address": {"type": "string"},
"subnet_mask": {"type": "string"},
"description": {"type": "string"}
},
"required": ["action", "interface"]
}
),
Tool(
name="routing_configuration",
description="Generate Cisco routing protocol commands",
inputSchema={
"type": "object",
"properties": {
"protocol": {"type": "string", "enum": ["ospf", "eigrp", "static", "rip"]},
"action": {"type": "string", "enum": ["configure", "remove", "show"]},
"network": {"type": "string"},
"area": {"type": "string"},
"next_hop": {"type": "string"},
"process_id": {"type": "integer"}
},
"required": ["protocol", "action"]
}
),
Tool(
name="security_configuration",
description="Generate Cisco security commands",
inputSchema={
"type": "object",
"properties": {
"feature": {"type": "string", "enum": ["ssh", "acl", "port_security", "dhcp_snooping"]},
"action": {"type": "string", "enum": ["enable", "disable", "configure", "show"]},
"acl_number": {"type": "integer"},
"source_ip": {"type": "string"},
"destination_ip": {"type": "string"},
"interface": {"type": "string"}
},
"required": ["feature", "action"]
}
),
Tool(
name="troubleshooting",
description="Generate Cisco troubleshooting commands",
inputSchema={
"type": "object",
"properties": {
"category": {"type": "string", "enum": ["connectivity", "interface", "routing", "switching", "general"]},
"target": {"type": "string"},
"interface": {"type": "string"}
},
"required": ["category"]
}
)
]
@mcp_app.call_tool()
async def call_tool(name: str, arguments: dict):
"""Handle tool calls"""
try:
if name == "vlan_management":
return handle_vlan_management(arguments)
elif name == "interface_configuration":
return handle_interface_configuration(arguments)
elif name == "routing_configuration":
return handle_routing_configuration(arguments)
elif name == "security_configuration":
return handle_security_configuration(arguments)
elif name == "troubleshooting":
return handle_troubleshooting(arguments)
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
# ============================================================================
# FASTAPI WEB ENDPOINTS
# ============================================================================
@app.get("/")
async def root():
"""Status endpoint"""
return {
"service": "cisco-mcp-server",
"status": "running",
"mcp_endpoint": "/mcp/sse",
"tools": 5
}
@app.get("/health")
async def health():
"""Health check"""
return {"status": "healthy"}
@app.get("/mcp/sse")
async def mcp_sse_endpoint(request: Request):
"""MCP SSE endpoint for agent connections"""
async def event_stream():
try:
# This would be replaced with actual MCP SSE transport
# For now, just keep connection alive
while True:
# Check if client disconnected
if await request.is_disconnected():
break
# Send keepalive
yield f"data: {json.dumps({'type': 'keepalive'})}\n\n"
await asyncio.sleep(30)
except asyncio.CancelledError:
pass
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
}
)
def main():
"""Main entry point"""
port = int(os.getenv("PORT", 7860))
print(f"πŸš€ Starting Cisco MCP Server on port {port}")
print(f"πŸ“Š Status: http://localhost:{port}/")
print(f"πŸ”— MCP SSE: http://localhost:{port}/mcp/sse")
uvicorn.run(app, host="0.0.0.0", port=port, log_level="info")
if __name__ == "__main__":
main()