#!/usr/bin/env python3 import os import json import logging import subprocess import threading from typing import Dict, Any from flask import Flask, request, jsonify # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.StreamHandler()] ) logger = logging.getLogger(__name__) # Configuration COMMAND_TIMEOUT = 180 # 3 minutes timeout app = Flask(__name__) class CommandExecutor: """Class to handle command execution with better timeout management""" def __init__(self, command: str, timeout: int = COMMAND_TIMEOUT): self.command = command self.timeout = timeout self.process = None self.stdout_data = "" self.stderr_data = "" self.stdout_thread = None self.stderr_thread = None self.return_code = None self.timed_out = False def _read_stdout(self): """Thread function to continuously read stdout""" for line in iter(self.process.stdout.readline, ''): self.stdout_data += line def _read_stderr(self): """Thread function to continuously read stderr""" for line in iter(self.process.stderr.readline, ''): self.stderr_data += line def execute(self) -> Dict[str, Any]: """Execute the command and handle timeout gracefully""" logger.info(f"Executing command: {self.command}") try: self.process = subprocess.Popen( self.command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1 # Line buffered ) # Start threads to read output continuously self.stdout_thread = threading.Thread(target=self._read_stdout) self.stderr_thread = threading.Thread(target=self._read_stderr) self.stdout_thread.daemon = True self.stderr_thread.daemon = True self.stdout_thread.start() self.stderr_thread.start() # Wait for the process to complete or timeout try: self.return_code = self.process.wait(timeout=self.timeout) # Process completed, join the threads self.stdout_thread.join() self.stderr_thread.join() except subprocess.TimeoutExpired: # Process timed out but we might have partial results self.timed_out = True logger.warning(f"Command timed out after {self.timeout} seconds. Terminating process.") # Try to terminate gracefully first self.process.terminate() try: self.process.wait(timeout=5) # Give it 5 seconds to terminate except subprocess.TimeoutExpired: # Force kill if it doesn't terminate logger.warning("Process not responding to termination. Killing.") self.process.kill() # Update final output self.return_code = -1 # Always consider it a success if we have output, even with timeout success = True if self.timed_out and (self.stdout_data or self.stderr_data) else (self.return_code == 0) return { "stdout": self.stdout_data, "stderr": self.stderr_data, "return_code": self.return_code, "success": success, "timed_out": self.timed_out, "partial_results": self.timed_out and (self.stdout_data or self.stderr_data) } except Exception as e: logger.error(f"Error executing command: {str(e)}") return { "stdout": self.stdout_data, "stderr": f"Error executing command: {str(e)}", "return_code": -1, "success": False, "timed_out": False, "partial_results": bool(self.stdout_data or self.stderr_data) } def execute_command(command: str) -> Dict[str, Any]: """Execute a shell command and return the result""" executor = CommandExecutor(command) return executor.execute() @app.route("/health", methods=["GET"]) def health_check(): """Health check endpoint.""" # Check if essential tools are installed essential_tools = ["nmap", "gobuster", "dirb", "nikto"] tools_status = {} for tool in essential_tools: try: result = execute_command(f"which {tool}") tools_status[tool] = result["success"] except: tools_status[tool] = False all_essential_tools_available = all(tools_status.values()) return jsonify({ "status": "healthy", "message": "Kali Linux Tools API Server is running", "tools_status": tools_status, "all_essential_tools_available": all_essential_tools_available }) @app.route("/api/command", methods=["POST"]) def generic_command(): """Execute any command provided in the request.""" try: params = request.json command = params.get("command", "") if not command: return jsonify({"error": "Command parameter is required"}), 400 result = execute_command(command) return jsonify(result) except Exception as e: logger.error(f"Error in command endpoint: {str(e)}") return jsonify({"error": f"Server error: {str(e)}"}), 500 @app.route("/mcp/capabilities", methods=["GET"]) def get_capabilities(): """Return MCP tool capabilities""" tools = [ { "name": "execute_command", "description": "Execute arbitrary commands on Kali Linux", "parameters": { "type": "object", "properties": { "command": {"type": "string", "description": "Command to execute"} }, "required": ["command"] } }, { "name": "server_health", "description": "Check server health and tool availability", "parameters": {"type": "object", "properties": {}} } ] return jsonify({ "tools": tools, "version": "1.0.0" }) @app.route("/mcp/tools/", methods=["POST"]) def execute_tool(tool_name): """Execute MCP tools directly""" try: params = request.json tool_name = tool_name.lower() # Map tool names to their functions tool_map = { "execute_command": generic_command, "server_health": health_check } if tool_name in tool_map: return tool_map[tool_name]() else: return jsonify({"error": f"Unknown tool: {tool_name}"}), 404 except Exception as e: logger.error(f"Error executing MCP tool {tool_name}: {str(e)}") return jsonify({"error": f"Tool execution failed: {str(e)}"}), 500 if __name__ == "__main__": logger.info("Starting Kali Linux MCP Server on port 5000") app.run(host="0.0.0.0", port=5000, debug=False)