File size: 7,180 Bytes
33b7766 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 | #!/usr/bin/env python3
import os
import json
import logging
import subprocess
import threading
from typing import Dict, Any
from flask import Flask, request, jsonify
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# Configuration
COMMAND_TIMEOUT = 180 # 3 minutes timeout
app = Flask(__name__)
class CommandExecutor:
"""Class to handle command execution with better timeout management"""
def __init__(self, command: str, timeout: int = COMMAND_TIMEOUT):
self.command = command
self.timeout = timeout
self.process = None
self.stdout_data = ""
self.stderr_data = ""
self.stdout_thread = None
self.stderr_thread = None
self.return_code = None
self.timed_out = False
def _read_stdout(self):
"""Thread function to continuously read stdout"""
for line in iter(self.process.stdout.readline, ''):
self.stdout_data += line
def _read_stderr(self):
"""Thread function to continuously read stderr"""
for line in iter(self.process.stderr.readline, ''):
self.stderr_data += line
def execute(self) -> Dict[str, Any]:
"""Execute the command and handle timeout gracefully"""
logger.info(f"Executing command: {self.command}")
try:
self.process = subprocess.Popen(
self.command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1 # Line buffered
)
# Start threads to read output continuously
self.stdout_thread = threading.Thread(target=self._read_stdout)
self.stderr_thread = threading.Thread(target=self._read_stderr)
self.stdout_thread.daemon = True
self.stderr_thread.daemon = True
self.stdout_thread.start()
self.stderr_thread.start()
# Wait for the process to complete or timeout
try:
self.return_code = self.process.wait(timeout=self.timeout)
# Process completed, join the threads
self.stdout_thread.join()
self.stderr_thread.join()
except subprocess.TimeoutExpired:
# Process timed out but we might have partial results
self.timed_out = True
logger.warning(f"Command timed out after {self.timeout} seconds. Terminating process.")
# Try to terminate gracefully first
self.process.terminate()
try:
self.process.wait(timeout=5) # Give it 5 seconds to terminate
except subprocess.TimeoutExpired:
# Force kill if it doesn't terminate
logger.warning("Process not responding to termination. Killing.")
self.process.kill()
# Update final output
self.return_code = -1
# Always consider it a success if we have output, even with timeout
success = True if self.timed_out and (self.stdout_data or self.stderr_data) else (self.return_code == 0)
return {
"stdout": self.stdout_data,
"stderr": self.stderr_data,
"return_code": self.return_code,
"success": success,
"timed_out": self.timed_out,
"partial_results": self.timed_out and (self.stdout_data or self.stderr_data)
}
except Exception as e:
logger.error(f"Error executing command: {str(e)}")
return {
"stdout": self.stdout_data,
"stderr": f"Error executing command: {str(e)}",
"return_code": -1,
"success": False,
"timed_out": False,
"partial_results": bool(self.stdout_data or self.stderr_data)
}
def execute_command(command: str) -> Dict[str, Any]:
"""Execute a shell command and return the result"""
executor = CommandExecutor(command)
return executor.execute()
@app.route("/health", methods=["GET"])
def health_check():
"""Health check endpoint."""
# Check if essential tools are installed
essential_tools = ["nmap", "gobuster", "dirb", "nikto"]
tools_status = {}
for tool in essential_tools:
try:
result = execute_command(f"which {tool}")
tools_status[tool] = result["success"]
except:
tools_status[tool] = False
all_essential_tools_available = all(tools_status.values())
return jsonify({
"status": "healthy",
"message": "Kali Linux Tools API Server is running",
"tools_status": tools_status,
"all_essential_tools_available": all_essential_tools_available
})
@app.route("/api/command", methods=["POST"])
def generic_command():
"""Execute any command provided in the request."""
try:
params = request.json
command = params.get("command", "")
if not command:
return jsonify({"error": "Command parameter is required"}), 400
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in command endpoint: {str(e)}")
return jsonify({"error": f"Server error: {str(e)}"}), 500
@app.route("/mcp/capabilities", methods=["GET"])
def get_capabilities():
"""Return MCP tool capabilities"""
tools = [
{
"name": "execute_command",
"description": "Execute arbitrary commands on Kali Linux",
"parameters": {
"type": "object",
"properties": {
"command": {"type": "string", "description": "Command to execute"}
},
"required": ["command"]
}
},
{
"name": "server_health",
"description": "Check server health and tool availability",
"parameters": {"type": "object", "properties": {}}
}
]
return jsonify({
"tools": tools,
"version": "1.0.0"
})
@app.route("/mcp/tools/<tool_name>", methods=["POST"])
def execute_tool(tool_name):
"""Execute MCP tools directly"""
try:
params = request.json
tool_name = tool_name.lower()
# Map tool names to their functions
tool_map = {
"execute_command": generic_command,
"server_health": health_check
}
if tool_name in tool_map:
return tool_map[tool_name]()
else:
return jsonify({"error": f"Unknown tool: {tool_name}"}), 404
except Exception as e:
logger.error(f"Error executing MCP tool {tool_name}: {str(e)}")
return jsonify({"error": f"Tool execution failed: {str(e)}"}), 500
if __name__ == "__main__":
logger.info("Starting Kali Linux MCP Server on port 5000")
app.run(host="0.0.0.0", port=5000, debug=False) |