| |
|
|
| import os |
| import json |
| import logging |
| import subprocess |
| import threading |
| from typing import Dict, Any |
| from flask import Flask, request, jsonify |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(message)s", |
| handlers=[logging.StreamHandler()] |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| COMMAND_TIMEOUT = 180 |
|
|
| app = Flask(__name__) |
|
|
| class CommandExecutor: |
| """Class to handle command execution with better timeout management""" |
|
|
| def __init__(self, command: str, timeout: int = COMMAND_TIMEOUT): |
| self.command = command |
| self.timeout = timeout |
| self.process = None |
| self.stdout_data = "" |
| self.stderr_data = "" |
| self.stdout_thread = None |
| self.stderr_thread = None |
| self.return_code = None |
| self.timed_out = False |
|
|
| def _read_stdout(self): |
| """Thread function to continuously read stdout""" |
| for line in iter(self.process.stdout.readline, ''): |
| self.stdout_data += line |
|
|
| def _read_stderr(self): |
| """Thread function to continuously read stderr""" |
| for line in iter(self.process.stderr.readline, ''): |
| self.stderr_data += line |
|
|
| def execute(self) -> Dict[str, Any]: |
| """Execute the command and handle timeout gracefully""" |
| logger.info(f"Executing command: {self.command}") |
|
|
| try: |
| self.process = subprocess.Popen( |
| self.command, |
| shell=True, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| text=True, |
| bufsize=1 |
| ) |
|
|
| |
| self.stdout_thread = threading.Thread(target=self._read_stdout) |
| self.stderr_thread = threading.Thread(target=self._read_stderr) |
| self.stdout_thread.daemon = True |
| self.stderr_thread.daemon = True |
| self.stdout_thread.start() |
| self.stderr_thread.start() |
|
|
| |
| try: |
| self.return_code = self.process.wait(timeout=self.timeout) |
| |
| self.stdout_thread.join() |
| self.stderr_thread.join() |
| except subprocess.TimeoutExpired: |
| |
| self.timed_out = True |
| logger.warning(f"Command timed out after {self.timeout} seconds. Terminating process.") |
|
|
| |
| self.process.terminate() |
| try: |
| self.process.wait(timeout=5) |
| except subprocess.TimeoutExpired: |
| |
| logger.warning("Process not responding to termination. Killing.") |
| self.process.kill() |
|
|
| |
| self.return_code = -1 |
|
|
| |
| success = True if self.timed_out and (self.stdout_data or self.stderr_data) else (self.return_code == 0) |
|
|
| return { |
| "stdout": self.stdout_data, |
| "stderr": self.stderr_data, |
| "return_code": self.return_code, |
| "success": success, |
| "timed_out": self.timed_out, |
| "partial_results": self.timed_out and (self.stdout_data or self.stderr_data) |
| } |
|
|
| except Exception as e: |
| logger.error(f"Error executing command: {str(e)}") |
| return { |
| "stdout": self.stdout_data, |
| "stderr": f"Error executing command: {str(e)}", |
| "return_code": -1, |
| "success": False, |
| "timed_out": False, |
| "partial_results": bool(self.stdout_data or self.stderr_data) |
| } |
|
|
| def execute_command(command: str) -> Dict[str, Any]: |
| """Execute a shell command and return the result""" |
| executor = CommandExecutor(command) |
| return executor.execute() |
|
|
| @app.route("/health", methods=["GET"]) |
| def health_check(): |
| """Health check endpoint.""" |
| |
| essential_tools = ["nmap", "gobuster", "dirb", "nikto"] |
| tools_status = {} |
|
|
| for tool in essential_tools: |
| try: |
| result = execute_command(f"which {tool}") |
| tools_status[tool] = result["success"] |
| except: |
| tools_status[tool] = False |
|
|
| all_essential_tools_available = all(tools_status.values()) |
|
|
| return jsonify({ |
| "status": "healthy", |
| "message": "Kali Linux Tools API Server is running", |
| "tools_status": tools_status, |
| "all_essential_tools_available": all_essential_tools_available |
| }) |
|
|
| @app.route("/api/command", methods=["POST"]) |
| def generic_command(): |
| """Execute any command provided in the request.""" |
| try: |
| params = request.json |
| command = params.get("command", "") |
|
|
| if not command: |
| return jsonify({"error": "Command parameter is required"}), 400 |
|
|
| result = execute_command(command) |
| return jsonify(result) |
| except Exception as e: |
| logger.error(f"Error in command endpoint: {str(e)}") |
| return jsonify({"error": f"Server error: {str(e)}"}), 500 |
|
|
| @app.route("/mcp/capabilities", methods=["GET"]) |
| def get_capabilities(): |
| """Return MCP tool capabilities""" |
| tools = [ |
| { |
| "name": "execute_command", |
| "description": "Execute arbitrary commands on Kali Linux", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "command": {"type": "string", "description": "Command to execute"} |
| }, |
| "required": ["command"] |
| } |
| }, |
| { |
| "name": "server_health", |
| "description": "Check server health and tool availability", |
| "parameters": {"type": "object", "properties": {}} |
| } |
| ] |
|
|
| return jsonify({ |
| "tools": tools, |
| "version": "1.0.0" |
| }) |
|
|
| @app.route("/mcp/tools/<tool_name>", methods=["POST"]) |
| def execute_tool(tool_name): |
| """Execute MCP tools directly""" |
| try: |
| params = request.json |
| tool_name = tool_name.lower() |
|
|
| |
| tool_map = { |
| "execute_command": generic_command, |
| "server_health": health_check |
| } |
|
|
| if tool_name in tool_map: |
| return tool_map[tool_name]() |
| else: |
| return jsonify({"error": f"Unknown tool: {tool_name}"}), 404 |
|
|
| except Exception as e: |
| logger.error(f"Error executing MCP tool {tool_name}: {str(e)}") |
| return jsonify({"error": f"Tool execution failed: {str(e)}"}), 500 |
|
|
| if __name__ == "__main__": |
| logger.info("Starting Kali Linux MCP Server on port 5000") |
| app.run(host="0.0.0.0", port=5000, debug=False) |