File size: 4,882 Bytes
f8bc565 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | #!/usr/bin/env python3
import subprocess, json, sys
from flask import Flask, request, jsonify
import logging
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
SUPPORTED_TOOLS = ["run_command", "run_masscan", "run_nmap", "run_netstat", "run_sqlmap", "run_nikto", "run_hydra", "run_searchsploit", "run_curl", "run_wget", "write_file", "read_file"]
PRIVILEGED_TOOLS = {"masscan", "nmap", "arp-scan", "wireshark", "tcpdump", "iptables", "ip6tables", "ufw", "hashcat", "airmon-ng", "aircrack-ng", "hydra", "metasploit", "burpsuite"}
class ToolExecutor:
def __init__(self):
self.execution_log = []
self.error_recovery_attempts = {}
def execute_tool(self, tool, params):
if tool == "run_command": return self._run_command(params.get("command", ""))
elif tool == "run_masscan": return self._run_masscan(params.get("target", ""), params.get("ports", "1-65535"), params.get("rate", "1000"))
elif tool == "run_nmap": return self._run_nmap(params.get("target", ""), params.get("flags", "-sV"))
elif tool == "run_netstat": return self._run_netstat(params.get("flags", "-tuln"))
elif tool == "write_file": return self._write_file(params.get("filename", ""), params.get("content", ""))
elif tool == "read_file": return self._read_file(params.get("filename", ""))
return {"status": "error", "error_type": "unsupported_tool", "message": f"Tool '{tool}' not supported"}
def _execute_command(self, command, retry_with_sudo=False):
if retry_with_sudo and not command.strip().startswith("sudo"): command = f"sudo {command}"
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=300)
if result.returncode == 0: return {"status": "success", "stdout": result.stdout.strip(), "stderr": result.stderr.strip()}
else:
stderr = result.stderr.lower()
if "permission denied" in stderr or "operation not permitted" in stderr:
if not retry_with_sudo: return self._execute_command(command, retry_with_sudo=True)
return {"status": "error", "error_type": "permission_denied", "message": result.stderr}
elif "not found" in stderr: return {"status": "error", "error_type": "command_not_found", "message": result.stderr}
else: return {"status": "error", "error_type": "command_failed", "message": result.stderr if result.stderr else result.stdout}
except subprocess.TimeoutExpired: return {"status": "error", "error_type": "timeout", "message": "Command timed out"}
except Exception as e: return {"status": "error", "error_type": "execution_error", "message": str(e)}
def _run_command(self, command):
if not command: return {"status": "error", "error_type": "invalid_params", "message": "No command"}
result = self._execute_command(command)
self.execution_log.append({"tool": "run_command", "result": result})
return result
def _run_masscan(self, target, ports, rate):
if not target: return {"status": "error", "error_type": "invalid_params", "message": "No target"}
command = f"masscan {target} -p {ports} --rate {rate}"
result = self._execute_command(command)
self.execution_log.append({"tool": "run_masscan", "result": result})
return result
def _run_nmap(self, target, flags):
if not target: return {"status": "error", "error_type": "invalid_params", "message": "No target"}
command = f"nmap {flags} {target}"
result = self._execute_command(command)
self.execution_log.append({"tool": "run_nmap", "result": result})
return result
def _run_netstat(self, flags):
command = f"netstat {flags}"
result = self._execute_command(command)
self.execution_log.append({"tool": "run_netstat", "result": result})
return result
def _write_file(self, filename, content):
if not filename: return {"status": "error", "message": "No filename"}
try:
with open(filename, 'w') as f: f.write(content)
return {"status": "success", "message": f"File written", "filename": filename}
except Exception as e: return {"status": "error", "message": str(e)}
def _read_file(self, filename):
if not filename: return {"status": "error", "message": "No filename"}
try:
with open(filename, 'r') as f: content = f.read()
return {"status": "success", "filename": filename, "content": content}
except Exception as e: return {"status": "error", "message": str(e)}
executor = ToolExecutor()
@app.route('/', methods=['POST'])
def execute():
try:
data = request.get_json()
if not data:ββββββββββββββββ
|