| |
|
|
| import asyncio |
| import json |
| import logging |
| import sys |
| import requests |
|
|
| |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") |
| logger = logging.getLogger("kali-mcp-bridge") |
|
|
| |
| KALI_API_URL = "https://sauserla-kali.hf.space" |
|
|
| class KaliMCPBridge: |
| """MCP Bridge to Kali Linux API""" |
|
|
| def __init__(self, api_url: str = KALI_API_URL): |
| self.api_url = api_url.rstrip('/') |
| self.session = requests.Session() |
| logger.info(f"Initialized Kali MCP Bridge with API: {self.api_url}") |
|
|
| def call_api(self, endpoint: str, method: str = "GET", data: dict = None) -> dict: |
| """Make API call to Kali server""" |
| url = f"{self.api_url}{endpoint}" |
| try: |
| if method.upper() == "POST": |
| response = self.session.post(url, json=data, timeout=300) |
| else: |
| response = self.session.get(url, timeout=30) |
| response.raise_for_status() |
| return response.json() |
| except requests.exceptions.RequestException as e: |
| logger.error(f"API call failed: {str(e)}") |
| return {"error": str(e)} |
|
|
| def execute_command(self, command: str) -> dict: |
| """Execute a command via the API""" |
| data = {"command": command} |
| return self.call_api("/api/command", "POST", data) |
|
|
| def health_check(self) -> dict: |
| """Check server health""" |
| return self.call_api("/health") |
|
|
| |
| class MCPServer: |
| """Simple MCP server implementation""" |
|
|
| def __init__(self): |
| self.kali_bridge = KaliMCPBridge() |
| self.tools = { |
| "execute_command": { |
| "name": "execute_command", |
| "description": "Execute arbitrary commands on Kali Linux", |
| "inputSchema": { |
| "type": "object", |
| "properties": { |
| "command": { |
| "type": "string", |
| "description": "The shell command to execute on Kali Linux" |
| } |
| }, |
| "required": ["command"] |
| } |
| }, |
| "server_health": { |
| "name": "server_health", |
| "description": "Check the health status of the Kali Linux server", |
| "inputSchema": { |
| "type": "object", |
| "properties": {} |
| } |
| } |
| } |
|
|
| def handle_initialize(self, request: dict) -> dict: |
| """Handle initialization request""" |
| return { |
| "jsonrpc": "2.0", |
| "id": request.get("id"), |
| "result": { |
| "protocolVersion": "2024-11-05", |
| "capabilities": { |
| "tools": {} |
| }, |
| "serverInfo": { |
| "name": "kali-mcp-bridge", |
| "version": "1.0.0" |
| } |
| } |
| } |
|
|
| def handle_tools_list(self, request: dict) -> dict: |
| """Handle tools/list request""" |
| return { |
| "jsonrpc": "2.0", |
| "id": request.get("id"), |
| "result": { |
| "tools": list(self.tools.values()) |
| } |
| } |
|
|
| def handle_tools_call(self, request: dict) -> dict: |
| """Handle tools/call request""" |
| params = request.get("params", {}) |
| tool_name = params.get("name") |
| tool_args = params.get("arguments", {}) |
|
|
| try: |
| if tool_name == "execute_command": |
| command = tool_args.get("command", "") |
| if not command: |
| raise ValueError("Command parameter is required") |
|
|
| result = self.kali_bridge.execute_command(command) |
|
|
| if result.get("error"): |
| content = f"Error: {result['error']}" |
| else: |
| output_parts = [] |
| if result.get("stdout"): |
| output_parts.append(f"STDOUT:\n{result['stdout']}") |
| if result.get("stderr"): |
| output_parts.append(f"STDERR:\n{result['stderr']}") |
|
|
| status = "SUCCESS" if result.get("success") else "FAILED" |
| output_parts.append(f"Exit Code: {result.get('return_code', 'unknown')}") |
| output_parts.append(f"Status: {status}") |
|
|
| if result.get("timed_out"): |
| output_parts.append("WARNING: Command timed out after 180 seconds") |
|
|
| content = "\n\n".join(output_parts) |
|
|
| elif tool_name == "server_health": |
| health = self.kali_bridge.health_check() |
|
|
| if health.get("error"): |
| content = f"Health check failed: {health['error']}" |
| else: |
| output_parts = [] |
| output_parts.append(f"Status: {health.get('status', 'unknown')}") |
| output_parts.append(f"Message: {health.get('message', '')}") |
| output_parts.append(f"All Essential Tools Available: {health.get('all_essential_tools_available', False)}") |
|
|
| tools_status = health.get('tools_status', {}) |
| if tools_status: |
| output_parts.append("\nTool Status:") |
| for tool_name, available in tools_status.items(): |
| status_icon = "✅" if available else "❌" |
| output_parts.append(f" {status_icon} {tool_name}") |
|
|
| content = "\n".join(output_parts) |
|
|
| else: |
| content = f"Unknown tool: {tool_name}" |
|
|
| return { |
| "jsonrpc": "2.0", |
| "id": request.get("id"), |
| "result": { |
| "content": [{"type": "text", "text": content}] |
| } |
| } |
|
|
| except Exception as e: |
| logger.error(f"Tool execution error: {str(e)}") |
| return { |
| "jsonrpc": "2.0", |
| "id": request.get("id"), |
| "error": { |
| "code": -32000, |
| "message": f"Tool execution failed: {str(e)}" |
| } |
| } |
|
|
| def process_request(self, line: str) -> str: |
| """Process a single JSON-RPC request""" |
| try: |
| request = json.loads(line.strip()) |
|
|
| method = request.get("method") |
|
|
| if method == "initialize": |
| response = self.handle_initialize(request) |
| elif method == "tools/list": |
| response = self.handle_tools_list(request) |
| elif method == "tools/call": |
| response = self.handle_tools_call(request) |
| else: |
| response = { |
| "jsonrpc": "2.0", |
| "id": request.get("id"), |
| "error": { |
| "code": -32601, |
| "message": f"Method not found: {method}" |
| } |
| } |
|
|
| return json.dumps(response) |
|
|
| except json.JSONDecodeError as e: |
| logger.error(f"Invalid JSON: {str(e)}") |
| return json.dumps({ |
| "jsonrpc": "2.0", |
| "id": None, |
| "error": { |
| "code": -32700, |
| "message": "Parse error" |
| } |
| }) |
|
|
| except Exception as e: |
| logger.error(f"Request processing error: {str(e)}") |
| return json.dumps({ |
| "jsonrpc": "2.0", |
| "id": None, |
| "error": { |
| "code": -32603, |
| "message": f"Internal error: {str(e)}" |
| } |
| }) |
|
|
| def main(): |
| """Main server loop""" |
| server = MCPServer() |
|
|
| |
| logger.info("Testing connection to Kali API...") |
| health = server.kali_bridge.health_check() |
| if health.get("error"): |
| logger.error(f"Failed to connect to Kali API: {health['error']}") |
| sys.exit(1) |
| else: |
| logger.info("✅ Successfully connected to Kali API server") |
|
|
| logger.info("Kali MCP Bridge started. Ready to accept MCP requests...") |
|
|
| |
| for line in sys.stdin: |
| if line.strip(): |
| response = server.process_request(line) |
| print(response, flush=True) |
|
|
| if __name__ == "__main__": |
| main() |