#!/usr/bin/env python3 import subprocess, json, sys from flask import Flask, request, jsonify import logging app = Flask(__name__) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) SUPPORTED_TOOLS = ["run_command", "run_masscan", "run_nmap", "run_netstat", "run_sqlmap", "run_nikto", "run_hydra", "run_searchsploit", "run_curl", "run_wget", "write_file", "read_file"] PRIVILEGED_TOOLS = {"masscan", "nmap", "arp-scan", "wireshark", "tcpdump", "iptables", "ip6tables", "ufw", "hashcat", "airmon-ng", "aircrack-ng", "hydra", "metasploit", "burpsuite"} class ToolExecutor: def __init__(self): self.execution_log = [] self.error_recovery_attempts = {} def execute_tool(self, tool, params): if tool == "run_command": return self._run_command(params.get("command", "")) elif tool == "run_masscan": return self._run_masscan(params.get("target", ""), params.get("ports", "1-65535"), params.get("rate", "1000")) elif tool == "run_nmap": return self._run_nmap(params.get("target", ""), params.get("flags", "-sV")) elif tool == "run_netstat": return self._run_netstat(params.get("flags", "-tuln")) elif tool == "write_file": return self._write_file(params.get("filename", ""), params.get("content", "")) elif tool == "read_file": return self._read_file(params.get("filename", "")) return {"status": "error", "error_type": "unsupported_tool", "message": f"Tool '{tool}' not supported"} def _execute_command(self, command, retry_with_sudo=False): if retry_with_sudo and not command.strip().startswith("sudo"): command = f"sudo {command}" try: result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=300) if result.returncode == 0: return {"status": "success", "stdout": result.stdout.strip(), "stderr": result.stderr.strip()} else: stderr = result.stderr.lower() if "permission denied" in stderr or "operation not permitted" in stderr: if not retry_with_sudo: return self._execute_command(command, retry_with_sudo=True) return {"status": "error", "error_type": "permission_denied", "message": result.stderr} elif "not found" in stderr: return {"status": "error", "error_type": "command_not_found", "message": result.stderr} else: return {"status": "error", "error_type": "command_failed", "message": result.stderr if result.stderr else result.stdout} except subprocess.TimeoutExpired: return {"status": "error", "error_type": "timeout", "message": "Command timed out"} except Exception as e: return {"status": "error", "error_type": "execution_error", "message": str(e)} def _run_command(self, command): if not command: return {"status": "error", "error_type": "invalid_params", "message": "No command"} result = self._execute_command(command) self.execution_log.append({"tool": "run_command", "result": result}) return result def _run_masscan(self, target, ports, rate): if not target: return {"status": "error", "error_type": "invalid_params", "message": "No target"} command = f"masscan {target} -p {ports} --rate {rate}" result = self._execute_command(command) self.execution_log.append({"tool": "run_masscan", "result": result}) return result def _run_nmap(self, target, flags): if not target: return {"status": "error", "error_type": "invalid_params", "message": "No target"} command = f"nmap {flags} {target}" result = self._execute_command(command) self.execution_log.append({"tool": "run_nmap", "result": result}) return result def _run_netstat(self, flags): command = f"netstat {flags}" result = self._execute_command(command) self.execution_log.append({"tool": "run_netstat", "result": result}) return result def _write_file(self, filename, content): if not filename: return {"status": "error", "message": "No filename"} try: with open(filename, 'w') as f: f.write(content) return {"status": "success", "message": f"File written", "filename": filename} except Exception as e: return {"status": "error", "message": str(e)} def _read_file(self, filename): if not filename: return {"status": "error", "message": "No filename"} try: with open(filename, 'r') as f: content = f.read() return {"status": "success", "filename": filename, "content": content} except Exception as e: return {"status": "error", "message": str(e)} executor = ToolExecutor() @app.route('/', methods=['POST']) def execute(): try: data = request.get_json() if not data:​​​​​​​​​​​​​​​​