#!/usr/bin/env python3 import asyncio import json import logging import sys import requests # Configure logging logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") logger = logging.getLogger("kali-mcp-bridge") # Configuration KALI_API_URL = "https://sauserla-kali.hf.space" class KaliMCPBridge: """MCP Bridge to Kali Linux API""" def __init__(self, api_url: str = KALI_API_URL): self.api_url = api_url.rstrip('/') self.session = requests.Session() logger.info(f"Initialized Kali MCP Bridge with API: {self.api_url}") def call_api(self, endpoint: str, method: str = "GET", data: dict = None) -> dict: """Make API call to Kali server""" url = f"{self.api_url}{endpoint}" try: if method.upper() == "POST": response = self.session.post(url, json=data, timeout=300) else: response = self.session.get(url, timeout=30) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.error(f"API call failed: {str(e)}") return {"error": str(e)} def execute_command(self, command: str) -> dict: """Execute a command via the API""" data = {"command": command} return self.call_api("/api/command", "POST", data) def health_check(self) -> dict: """Check server health""" return self.call_api("/health") # MCP Protocol Implementation (Simplified) class MCPServer: """Simple MCP server implementation""" def __init__(self): self.kali_bridge = KaliMCPBridge() self.tools = { "execute_command": { "name": "execute_command", "description": "Execute arbitrary commands on Kali Linux", "inputSchema": { "type": "object", "properties": { "command": { "type": "string", "description": "The shell command to execute on Kali Linux" } }, "required": ["command"] } }, "server_health": { "name": "server_health", "description": "Check the health status of the Kali Linux server", "inputSchema": { "type": "object", "properties": {} } } } def handle_initialize(self, request: dict) -> dict: """Handle initialization request""" return { "jsonrpc": "2.0", "id": request.get("id"), "result": { "protocolVersion": "2024-11-05", "capabilities": { "tools": {} }, "serverInfo": { "name": "kali-mcp-bridge", "version": "1.0.0" } } } def handle_tools_list(self, request: dict) -> dict: """Handle tools/list request""" return { "jsonrpc": "2.0", "id": request.get("id"), "result": { "tools": list(self.tools.values()) } } def handle_tools_call(self, request: dict) -> dict: """Handle tools/call request""" params = request.get("params", {}) tool_name = params.get("name") tool_args = params.get("arguments", {}) try: if tool_name == "execute_command": command = tool_args.get("command", "") if not command: raise ValueError("Command parameter is required") result = self.kali_bridge.execute_command(command) if result.get("error"): content = f"Error: {result['error']}" else: output_parts = [] if result.get("stdout"): output_parts.append(f"STDOUT:\n{result['stdout']}") if result.get("stderr"): output_parts.append(f"STDERR:\n{result['stderr']}") status = "SUCCESS" if result.get("success") else "FAILED" output_parts.append(f"Exit Code: {result.get('return_code', 'unknown')}") output_parts.append(f"Status: {status}") if result.get("timed_out"): output_parts.append("WARNING: Command timed out after 180 seconds") content = "\n\n".join(output_parts) elif tool_name == "server_health": health = self.kali_bridge.health_check() if health.get("error"): content = f"Health check failed: {health['error']}" else: output_parts = [] output_parts.append(f"Status: {health.get('status', 'unknown')}") output_parts.append(f"Message: {health.get('message', '')}") output_parts.append(f"All Essential Tools Available: {health.get('all_essential_tools_available', False)}") tools_status = health.get('tools_status', {}) if tools_status: output_parts.append("\nTool Status:") for tool_name, available in tools_status.items(): status_icon = "✅" if available else "❌" output_parts.append(f" {status_icon} {tool_name}") content = "\n".join(output_parts) else: content = f"Unknown tool: {tool_name}" return { "jsonrpc": "2.0", "id": request.get("id"), "result": { "content": [{"type": "text", "text": content}] } } except Exception as e: logger.error(f"Tool execution error: {str(e)}") return { "jsonrpc": "2.0", "id": request.get("id"), "error": { "code": -32000, "message": f"Tool execution failed: {str(e)}" } } def process_request(self, line: str) -> str: """Process a single JSON-RPC request""" try: request = json.loads(line.strip()) method = request.get("method") if method == "initialize": response = self.handle_initialize(request) elif method == "tools/list": response = self.handle_tools_list(request) elif method == "tools/call": response = self.handle_tools_call(request) else: response = { "jsonrpc": "2.0", "id": request.get("id"), "error": { "code": -32601, "message": f"Method not found: {method}" } } return json.dumps(response) except json.JSONDecodeError as e: logger.error(f"Invalid JSON: {str(e)}") return json.dumps({ "jsonrpc": "2.0", "id": None, "error": { "code": -32700, "message": "Parse error" } }) except Exception as e: logger.error(f"Request processing error: {str(e)}") return json.dumps({ "jsonrpc": "2.0", "id": None, "error": { "code": -32603, "message": f"Internal error: {str(e)}" } }) def main(): """Main server loop""" server = MCPServer() # Test connection on startup logger.info("Testing connection to Kali API...") health = server.kali_bridge.health_check() if health.get("error"): logger.error(f"Failed to connect to Kali API: {health['error']}") sys.exit(1) else: logger.info("✅ Successfully connected to Kali API server") logger.info("Kali MCP Bridge started. Ready to accept MCP requests...") # Main request/response loop for line in sys.stdin: if line.strip(): response = server.process_request(line) print(response, flush=True) if __name__ == "__main__": main()