File size: 5,136 Bytes
37f9172 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | #!/usr/bin/env python3
import asyncio
import json
import logging
import os
import sys
from typing import Any, Dict, List, Sequence
# MCP SDK imports
from mcp import Tool
from mcp.server import Server
from mcp.types import (
TextContent,
ImageContent,
EmbeddedResource,
LoggingLevel
)
# HTTP client for API calls
import requests
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("kali-mcp-server")
# Configuration
KALI_API_URL = os.environ.get("KALI_API_URL", "https://sauserla-kali.hf.space")
class KaliMCPServer:
"""MCP Server for Kali Linux tools"""
def __init__(self, api_url: str = KALI_API_URL):
self.api_url = api_url.rstrip('/')
self.session = requests.Session()
logger.info(f"Initialized Kali MCP Server with API: {self.api_url}")
def _call_api(self, endpoint: str, method: str = "GET", data: Dict = None) -> Dict[str, Any]:
"""Make API call to Kali server"""
url = f"{self.api_url}{endpoint}"
try:
if method.upper() == "POST":
response = self.session.post(url, json=data, timeout=300)
else:
response = self.session.get(url, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"API call failed: {str(e)}")
return {"error": str(e)}
def get_capabilities(self) -> Dict[str, Any]:
"""Get MCP tool capabilities"""
return self._call_api("/mcp/capabilities")
def execute_command(self, command: str) -> Dict[str, Any]:
"""Execute a command via the API"""
data = {"command": command}
return self._call_api("/api/command", "POST", data)
def health_check(self) -> Dict[str, Any]:
"""Check server health"""
return self._call_api("/health")
# Global server instance
kali_server = KaliMCPServer()
# MCP Server setup
server = Server("kali-mcp-server")
@server.tool()
async def execute_command(command: str) -> str:
"""Execute arbitrary commands on Kali Linux
Args:
command: The shell command to execute on the Kali Linux system
Returns:
Command execution results including stdout, stderr, and exit code
"""
logger.info(f"Executing command: {command}")
try:
result = kali_server.execute_command(command)
if result.get("error"):
return f"Error: {result['error']}"
output = []
if result.get("stdout"):
output.append(f"STDOUT:\n{result['stdout']}")
if result.get("stderr"):
output.append(f"STDERR:\n{result['stderr']}")
status = "SUCCESS" if result.get("success") else "FAILED"
output.append(f"Exit Code: {result.get('return_code', 'unknown')}")
output.append(f"Status: {status}")
if result.get("timed_out"):
output.append("WARNING: Command timed out after 180 seconds")
return "\n\n".join(output)
except Exception as e:
logger.error(f"Tool execution error: {str(e)}")
return f"Execution failed: {str(e)}"
@server.tool()
async def server_health() -> str:
"""Check the health status of the Kali Linux server and available tools
Returns:
Server health information and tool availability status
"""
try:
health = kali_server.health_check()
if health.get("error"):
return f"Health check failed: {health['error']}"
output = []
output.append(f"Status: {health.get('status', 'unknown')}")
output.append(f"Message: {health.get('message', '')}")
output.append(f"All Essential Tools Available: {health.get('all_essential_tools_available', False)}")
tools_status = health.get('tools_status', {})
if tools_status:
output.append("\nTool Status:")
for tool, available in tools_status.items():
status_icon = "✅" if available else "❌"
output.append(f" {status_icon} {tool}")
return "\n".join(output)
except Exception as e:
logger.error(f"Health check error: {str(e)}")
return f"Health check failed: {str(e)}"
async def main():
"""Main server entry point"""
# Import here to avoid issues if MCP SDK is not available
from mcp.server.stdio import stdio_server
logger.info("Starting Kali Linux MCP Server")
# Test connection on startup
try:
health = kali_server.health_check()
if health.get("error"):
logger.error(f"Failed to connect to Kali API: {health['error']}")
sys.exit(1)
else:
logger.info("✅ Successfully connected to Kali API server")
except Exception as e:
logger.error(f"Connection test failed: {str(e)}")
sys.exit(1)
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main()) |