#!/usr/bin/env python3 import asyncio import json import logging import os import sys from typing import Any, Dict, List, Sequence # MCP SDK imports from mcp import Tool from mcp.server import Server from mcp.types import ( TextContent, ImageContent, EmbeddedResource, LoggingLevel ) # HTTP client for API calls import requests # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("kali-mcp-server") # Configuration KALI_API_URL = os.environ.get("KALI_API_URL", "https://sauserla-kali.hf.space") class KaliMCPServer: """MCP Server for Kali Linux tools""" def __init__(self, api_url: str = KALI_API_URL): self.api_url = api_url.rstrip('/') self.session = requests.Session() logger.info(f"Initialized Kali MCP Server with API: {self.api_url}") def _call_api(self, endpoint: str, method: str = "GET", data: Dict = None) -> Dict[str, Any]: """Make API call to Kali server""" url = f"{self.api_url}{endpoint}" try: if method.upper() == "POST": response = self.session.post(url, json=data, timeout=300) else: response = self.session.get(url, timeout=30) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.error(f"API call failed: {str(e)}") return {"error": str(e)} def get_capabilities(self) -> Dict[str, Any]: """Get MCP tool capabilities""" return self._call_api("/mcp/capabilities") def execute_command(self, command: str) -> Dict[str, Any]: """Execute a command via the API""" data = {"command": command} return self._call_api("/api/command", "POST", data) def health_check(self) -> Dict[str, Any]: """Check server health""" return self._call_api("/health") # Global server instance kali_server = KaliMCPServer() # MCP Server setup server = Server("kali-mcp-server") @server.tool() async def execute_command(command: str) -> str: """Execute arbitrary commands on Kali Linux Args: command: The shell command to execute on the Kali Linux system Returns: Command execution results including stdout, stderr, and exit code """ logger.info(f"Executing command: {command}") try: result = kali_server.execute_command(command) if result.get("error"): return f"Error: {result['error']}" output = [] if result.get("stdout"): output.append(f"STDOUT:\n{result['stdout']}") if result.get("stderr"): output.append(f"STDERR:\n{result['stderr']}") status = "SUCCESS" if result.get("success") else "FAILED" output.append(f"Exit Code: {result.get('return_code', 'unknown')}") output.append(f"Status: {status}") if result.get("timed_out"): output.append("WARNING: Command timed out after 180 seconds") return "\n\n".join(output) except Exception as e: logger.error(f"Tool execution error: {str(e)}") return f"Execution failed: {str(e)}" @server.tool() async def server_health() -> str: """Check the health status of the Kali Linux server and available tools Returns: Server health information and tool availability status """ try: health = kali_server.health_check() if health.get("error"): return f"Health check failed: {health['error']}" output = [] output.append(f"Status: {health.get('status', 'unknown')}") output.append(f"Message: {health.get('message', '')}") output.append(f"All Essential Tools Available: {health.get('all_essential_tools_available', False)}") tools_status = health.get('tools_status', {}) if tools_status: output.append("\nTool Status:") for tool, available in tools_status.items(): status_icon = "✅" if available else "❌" output.append(f" {status_icon} {tool}") return "\n".join(output) except Exception as e: logger.error(f"Health check error: {str(e)}") return f"Health check failed: {str(e)}" async def main(): """Main server entry point""" # Import here to avoid issues if MCP SDK is not available from mcp.server.stdio import stdio_server logger.info("Starting Kali Linux MCP Server") # Test connection on startup try: health = kali_server.health_check() if health.get("error"): logger.error(f"Failed to connect to Kali API: {health['error']}") sys.exit(1) else: logger.info("✅ Successfully connected to Kali API server") except Exception as e: logger.error(f"Connection test failed: {str(e)}") sys.exit(1) async with stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, server.create_initialization_options() ) if __name__ == "__main__": asyncio.run(main())