Local_Model / mcp_server.py
automajicly's picture
Upload 5 files
f8bc565 verified
#!/usr/bin/env python3
import subprocess, json, sys
from flask import Flask, request, jsonify
import logging
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
SUPPORTED_TOOLS = ["run_command", "run_masscan", "run_nmap", "run_netstat", "run_sqlmap", "run_nikto", "run_hydra", "run_searchsploit", "run_curl", "run_wget", "write_file", "read_file"]
PRIVILEGED_TOOLS = {"masscan", "nmap", "arp-scan", "wireshark", "tcpdump", "iptables", "ip6tables", "ufw", "hashcat", "airmon-ng", "aircrack-ng", "hydra", "metasploit", "burpsuite"}
class ToolExecutor:
def __init__(self):
self.execution_log = []
self.error_recovery_attempts = {}
def execute_tool(self, tool, params):
if tool == "run_command": return self._run_command(params.get("command", ""))
elif tool == "run_masscan": return self._run_masscan(params.get("target", ""), params.get("ports", "1-65535"), params.get("rate", "1000"))
elif tool == "run_nmap": return self._run_nmap(params.get("target", ""), params.get("flags", "-sV"))
elif tool == "run_netstat": return self._run_netstat(params.get("flags", "-tuln"))
elif tool == "write_file": return self._write_file(params.get("filename", ""), params.get("content", ""))
elif tool == "read_file": return self._read_file(params.get("filename", ""))
return {"status": "error", "error_type": "unsupported_tool", "message": f"Tool '{tool}' not supported"}
def _execute_command(self, command, retry_with_sudo=False):
if retry_with_sudo and not command.strip().startswith("sudo"): command = f"sudo {command}"
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=300)
if result.returncode == 0: return {"status": "success", "stdout": result.stdout.strip(), "stderr": result.stderr.strip()}
else:
stderr = result.stderr.lower()
if "permission denied" in stderr or "operation not permitted" in stderr:
if not retry_with_sudo: return self._execute_command(command, retry_with_sudo=True)
return {"status": "error", "error_type": "permission_denied", "message": result.stderr}
elif "not found" in stderr: return {"status": "error", "error_type": "command_not_found", "message": result.stderr}
else: return {"status": "error", "error_type": "command_failed", "message": result.stderr if result.stderr else result.stdout}
except subprocess.TimeoutExpired: return {"status": "error", "error_type": "timeout", "message": "Command timed out"}
except Exception as e: return {"status": "error", "error_type": "execution_error", "message": str(e)}
def _run_command(self, command):
if not command: return {"status": "error", "error_type": "invalid_params", "message": "No command"}
result = self._execute_command(command)
self.execution_log.append({"tool": "run_command", "result": result})
return result
def _run_masscan(self, target, ports, rate):
if not target: return {"status": "error", "error_type": "invalid_params", "message": "No target"}
command = f"masscan {target} -p {ports} --rate {rate}"
result = self._execute_command(command)
self.execution_log.append({"tool": "run_masscan", "result": result})
return result
def _run_nmap(self, target, flags):
if not target: return {"status": "error", "error_type": "invalid_params", "message": "No target"}
command = f"nmap {flags} {target}"
result = self._execute_command(command)
self.execution_log.append({"tool": "run_nmap", "result": result})
return result
def _run_netstat(self, flags):
command = f"netstat {flags}"
result = self._execute_command(command)
self.execution_log.append({"tool": "run_netstat", "result": result})
return result
def _write_file(self, filename, content):
if not filename: return {"status": "error", "message": "No filename"}
try:
with open(filename, 'w') as f: f.write(content)
return {"status": "success", "message": f"File written", "filename": filename}
except Exception as e: return {"status": "error", "message": str(e)}
def _read_file(self, filename):
if not filename: return {"status": "error", "message": "No filename"}
try:
with open(filename, 'r') as f: content = f.read()
return {"status": "success", "filename": filename, "content": content}
except Exception as e: return {"status": "error", "message": str(e)}
executor = ToolExecutor()
@app.route('/', methods=['POST'])
def execute():
try:
data = request.get_json()
if not data:​​​​​​​​​​​​​​​​