Kali / server.py
SAUSERLA's picture
Upload 4 files
33b7766 verified
#!/usr/bin/env python3
import os
import json
import logging
import subprocess
import threading
from typing import Dict, Any
from flask import Flask, request, jsonify
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# Configuration
COMMAND_TIMEOUT = 180 # 3 minutes timeout
app = Flask(__name__)
class CommandExecutor:
"""Class to handle command execution with better timeout management"""
def __init__(self, command: str, timeout: int = COMMAND_TIMEOUT):
self.command = command
self.timeout = timeout
self.process = None
self.stdout_data = ""
self.stderr_data = ""
self.stdout_thread = None
self.stderr_thread = None
self.return_code = None
self.timed_out = False
def _read_stdout(self):
"""Thread function to continuously read stdout"""
for line in iter(self.process.stdout.readline, ''):
self.stdout_data += line
def _read_stderr(self):
"""Thread function to continuously read stderr"""
for line in iter(self.process.stderr.readline, ''):
self.stderr_data += line
def execute(self) -> Dict[str, Any]:
"""Execute the command and handle timeout gracefully"""
logger.info(f"Executing command: {self.command}")
try:
self.process = subprocess.Popen(
self.command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1 # Line buffered
)
# Start threads to read output continuously
self.stdout_thread = threading.Thread(target=self._read_stdout)
self.stderr_thread = threading.Thread(target=self._read_stderr)
self.stdout_thread.daemon = True
self.stderr_thread.daemon = True
self.stdout_thread.start()
self.stderr_thread.start()
# Wait for the process to complete or timeout
try:
self.return_code = self.process.wait(timeout=self.timeout)
# Process completed, join the threads
self.stdout_thread.join()
self.stderr_thread.join()
except subprocess.TimeoutExpired:
# Process timed out but we might have partial results
self.timed_out = True
logger.warning(f"Command timed out after {self.timeout} seconds. Terminating process.")
# Try to terminate gracefully first
self.process.terminate()
try:
self.process.wait(timeout=5) # Give it 5 seconds to terminate
except subprocess.TimeoutExpired:
# Force kill if it doesn't terminate
logger.warning("Process not responding to termination. Killing.")
self.process.kill()
# Update final output
self.return_code = -1
# Always consider it a success if we have output, even with timeout
success = True if self.timed_out and (self.stdout_data or self.stderr_data) else (self.return_code == 0)
return {
"stdout": self.stdout_data,
"stderr": self.stderr_data,
"return_code": self.return_code,
"success": success,
"timed_out": self.timed_out,
"partial_results": self.timed_out and (self.stdout_data or self.stderr_data)
}
except Exception as e:
logger.error(f"Error executing command: {str(e)}")
return {
"stdout": self.stdout_data,
"stderr": f"Error executing command: {str(e)}",
"return_code": -1,
"success": False,
"timed_out": False,
"partial_results": bool(self.stdout_data or self.stderr_data)
}
def execute_command(command: str) -> Dict[str, Any]:
"""Execute a shell command and return the result"""
executor = CommandExecutor(command)
return executor.execute()
@app.route("/health", methods=["GET"])
def health_check():
"""Health check endpoint."""
# Check if essential tools are installed
essential_tools = ["nmap", "gobuster", "dirb", "nikto"]
tools_status = {}
for tool in essential_tools:
try:
result = execute_command(f"which {tool}")
tools_status[tool] = result["success"]
except:
tools_status[tool] = False
all_essential_tools_available = all(tools_status.values())
return jsonify({
"status": "healthy",
"message": "Kali Linux Tools API Server is running",
"tools_status": tools_status,
"all_essential_tools_available": all_essential_tools_available
})
@app.route("/api/command", methods=["POST"])
def generic_command():
"""Execute any command provided in the request."""
try:
params = request.json
command = params.get("command", "")
if not command:
return jsonify({"error": "Command parameter is required"}), 400
result = execute_command(command)
return jsonify(result)
except Exception as e:
logger.error(f"Error in command endpoint: {str(e)}")
return jsonify({"error": f"Server error: {str(e)}"}), 500
@app.route("/mcp/capabilities", methods=["GET"])
def get_capabilities():
"""Return MCP tool capabilities"""
tools = [
{
"name": "execute_command",
"description": "Execute arbitrary commands on Kali Linux",
"parameters": {
"type": "object",
"properties": {
"command": {"type": "string", "description": "Command to execute"}
},
"required": ["command"]
}
},
{
"name": "server_health",
"description": "Check server health and tool availability",
"parameters": {"type": "object", "properties": {}}
}
]
return jsonify({
"tools": tools,
"version": "1.0.0"
})
@app.route("/mcp/tools/<tool_name>", methods=["POST"])
def execute_tool(tool_name):
"""Execute MCP tools directly"""
try:
params = request.json
tool_name = tool_name.lower()
# Map tool names to their functions
tool_map = {
"execute_command": generic_command,
"server_health": health_check
}
if tool_name in tool_map:
return tool_map[tool_name]()
else:
return jsonify({"error": f"Unknown tool: {tool_name}"}), 404
except Exception as e:
logger.error(f"Error executing MCP tool {tool_name}: {str(e)}")
return jsonify({"error": f"Tool execution failed: {str(e)}"}), 500
if __name__ == "__main__":
logger.info("Starting Kali Linux MCP Server on port 5000")
app.run(host="0.0.0.0", port=5000, debug=False)