| #!/usr/bin/env python3 | |
| import subprocess, json, sys | |
| from flask import Flask, request, jsonify | |
| import logging | |
| app = Flask(__name__) | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| SUPPORTED_TOOLS = ["run_command", "run_masscan", "run_nmap", "run_netstat", "run_sqlmap", "run_nikto", "run_hydra", "run_searchsploit", "run_curl", "run_wget", "write_file", "read_file"] | |
| PRIVILEGED_TOOLS = {"masscan", "nmap", "arp-scan", "wireshark", "tcpdump", "iptables", "ip6tables", "ufw", "hashcat", "airmon-ng", "aircrack-ng", "hydra", "metasploit", "burpsuite"} | |
| class ToolExecutor: | |
| def __init__(self): | |
| self.execution_log = [] | |
| self.error_recovery_attempts = {} | |
| def execute_tool(self, tool, params): | |
| if tool == "run_command": return self._run_command(params.get("command", "")) | |
| elif tool == "run_masscan": return self._run_masscan(params.get("target", ""), params.get("ports", "1-65535"), params.get("rate", "1000")) | |
| elif tool == "run_nmap": return self._run_nmap(params.get("target", ""), params.get("flags", "-sV")) | |
| elif tool == "run_netstat": return self._run_netstat(params.get("flags", "-tuln")) | |
| elif tool == "write_file": return self._write_file(params.get("filename", ""), params.get("content", "")) | |
| elif tool == "read_file": return self._read_file(params.get("filename", "")) | |
| return {"status": "error", "error_type": "unsupported_tool", "message": f"Tool '{tool}' not supported"} | |
| def _execute_command(self, command, retry_with_sudo=False): | |
| if retry_with_sudo and not command.strip().startswith("sudo"): command = f"sudo {command}" | |
| try: | |
| result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=300) | |
| if result.returncode == 0: return {"status": "success", "stdout": result.stdout.strip(), "stderr": result.stderr.strip()} | |
| else: | |
| stderr = result.stderr.lower() | |
| if "permission denied" in stderr or "operation not permitted" in stderr: | |
| if not retry_with_sudo: return self._execute_command(command, retry_with_sudo=True) | |
| return {"status": "error", "error_type": "permission_denied", "message": result.stderr} | |
| elif "not found" in stderr: return {"status": "error", "error_type": "command_not_found", "message": result.stderr} | |
| else: return {"status": "error", "error_type": "command_failed", "message": result.stderr if result.stderr else result.stdout} | |
| except subprocess.TimeoutExpired: return {"status": "error", "error_type": "timeout", "message": "Command timed out"} | |
| except Exception as e: return {"status": "error", "error_type": "execution_error", "message": str(e)} | |
| def _run_command(self, command): | |
| if not command: return {"status": "error", "error_type": "invalid_params", "message": "No command"} | |
| result = self._execute_command(command) | |
| self.execution_log.append({"tool": "run_command", "result": result}) | |
| return result | |
| def _run_masscan(self, target, ports, rate): | |
| if not target: return {"status": "error", "error_type": "invalid_params", "message": "No target"} | |
| command = f"masscan {target} -p {ports} --rate {rate}" | |
| result = self._execute_command(command) | |
| self.execution_log.append({"tool": "run_masscan", "result": result}) | |
| return result | |
| def _run_nmap(self, target, flags): | |
| if not target: return {"status": "error", "error_type": "invalid_params", "message": "No target"} | |
| command = f"nmap {flags} {target}" | |
| result = self._execute_command(command) | |
| self.execution_log.append({"tool": "run_nmap", "result": result}) | |
| return result | |
| def _run_netstat(self, flags): | |
| command = f"netstat {flags}" | |
| result = self._execute_command(command) | |
| self.execution_log.append({"tool": "run_netstat", "result": result}) | |
| return result | |
| def _write_file(self, filename, content): | |
| if not filename: return {"status": "error", "message": "No filename"} | |
| try: | |
| with open(filename, 'w') as f: f.write(content) | |
| return {"status": "success", "message": f"File written", "filename": filename} | |
| except Exception as e: return {"status": "error", "message": str(e)} | |
| def _read_file(self, filename): | |
| if not filename: return {"status": "error", "message": "No filename"} | |
| try: | |
| with open(filename, 'r') as f: content = f.read() | |
| return {"status": "success", "filename": filename, "content": content} | |
| except Exception as e: return {"status": "error", "message": str(e)} | |
| executor = ToolExecutor() | |
| def execute(): | |
| try: | |
| data = request.get_json() | |
| if not data:ββββββββββββββββ | |