import os import io import json import logging import threading import subprocess import random import string import requests import signal import time from datetime import datetime from flask import Flask, render_template_string, request, redirect, url_for, jsonify, send_file from huggingface_hub import HfApi, hf_hub_download from dotenv import load_dotenv from fpdf import FPDF load_dotenv() app = Flask(__name__) app.secret_key = 'security_scanner_secret_key_12345' DATA_FILE = 'scanner_data.json' REPO_ID = "Kgshop/synkristest" HF_TOKEN_WRITE = os.getenv("HF_TOKEN") HF_TOKEN_READ = os.getenv("HF_TOKEN_READ") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') data_lock = threading.Lock() NUCLEI_PATH = "./nuclei" SCAN_LOGS_DIR = "scan_logs" active_processes = {} if not os.path.exists(SCAN_LOGS_DIR): os.makedirs(SCAN_LOGS_DIR) def install_nuclei(): if not os.path.exists(NUCLEI_PATH): try: import platform arch = platform.machine().lower() if "x86_64" in arch or "amd64" in arch: url = "https://github.com/projectdiscovery/nuclei/releases/download/v3.3.5/nuclei_3.3.5_linux_amd64.zip" else: url = "https://github.com/projectdiscovery/nuclei/releases/download/v3.3.5/nuclei_3.3.5_linux_arm64.zip" r = requests.get(url) import zipfile with zipfile.ZipFile(io.BytesIO(r.content)) as z: z.extract("nuclei", path=".") os.chmod(NUCLEI_PATH, 0o755) except Exception as e: logging.error(f"Installation error: {e}") try: subprocess.run([NUCLEI_PATH, "-update-templates"], check=False) except: pass def download_db(): if not HF_TOKEN_READ and not HF_TOKEN_WRITE: return token = HF_TOKEN_READ if HF_TOKEN_READ else HF_TOKEN_WRITE try: hf_hub_download(repo_id=REPO_ID, filename=DATA_FILE, repo_type="dataset", token=token, local_dir=".") except: if not os.path.exists(DATA_FILE): with open(DATA_FILE, 'w') as f: json.dump({}, f) def upload_db(): if not HF_TOKEN_WRITE: return try: api = HfApi() api.upload_file(path_or_fileobj=DATA_FILE, path_in_repo=DATA_FILE, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE) except: pass def load_data(): with data_lock: if os.path.exists(DATA_FILE): try: with open(DATA_FILE, 'r') as f: return json.load(f) except: return {} return {} def save_data(data): with data_lock: with open(DATA_FILE, 'w') as f: json.dump(data, f, indent=4) upload_db() class ScannerPDF(FPDF): def header(self): if os.path.exists("DejaVuSansCondensed.ttf"): self.add_font('DejaVu', '', 'DejaVuSansCondensed.ttf', uni=True) self.set_font('DejaVu', '', 15) else: self.set_font('Arial', 'B', 15) self.cell(0, 10, 'Отчет о сканировании уязвимостей (Nuclei)', 0, 1, 'C') self.ln(5) def footer(self): self.set_y(-15) if os.path.exists("DejaVuSansCondensed.ttf"): self.set_font('DejaVu', '', 8) else: self.set_font('Arial', 'I', 8) self.cell(0, 10, f'Страница {self.page_no()}', 0, 0, 'C') def generate_pdf_report(env_id, target_url, last_scan): json_output = os.path.join(SCAN_LOGS_DIR, f"{env_id}.json") results = [] if os.path.exists(json_output): with open(json_output, "r") as f: for line in f: try: results.append(json.loads(line)) except: continue pdf = ScannerPDF() if os.path.exists("DejaVuSansCondensed.ttf"): pdf.add_font('DejaVu', '', 'DejaVuSansCondensed.ttf', uni=True) pdf.add_font('DejaVu', 'B', 'DejaVuSansCondensed-Bold.ttf', uni=True) main_font = 'DejaVu' else: main_font = 'Arial' pdf.add_page() pdf.set_font(main_font, 'B', 12) pdf.cell(0, 10, f"Цель: {target_url}", 0, 1) pdf.cell(0, 10, f"Дата: {last_scan}", 0, 1) pdf.ln(10) severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0, "info": 0} for res in results: sev = res.get("info", {}).get("severity", "info").lower() severity_counts[sev] = severity_counts.get(sev, 0) + 1 pdf.set_font(main_font, 'B', 14) pdf.cell(0, 10, "Сводка:", 0, 1) pdf.set_font(main_font, '', 12) for sev, count in severity_counts.items(): pdf.cell(0, 8, f"{sev.capitalize()}: {count}", 0, 1) pdf.ln(10) pdf.set_font(main_font, 'B', 14) pdf.cell(0, 10, "Найденные уязвимости:", 0, 1) for res in results: pdf.set_font(main_font, 'B', 11) sev = res.get("info", {}).get("severity", "info").upper() if sev == "CRITICAL": pdf.set_text_color(150, 0, 0) elif sev == "HIGH": pdf.set_text_color(255, 0, 0) else: pdf.set_text_color(0, 0, 0) pdf.cell(0, 8, f"[{sev}] {res.get('info', {}).get('name')}", 0, 1) pdf.set_text_color(0, 0, 0) pdf.set_font(main_font, '', 9) pdf.multi_cell(0, 5, f"Описание: {res.get('info', {}).get('description', 'N/A')}") pdf.cell(0, 5, f"Хост: {res.get('matched-at', 'N/A')}", 0, 1) pdf.ln(3) pdf_path = os.path.join(SCAN_LOGS_DIR, f"{env_id}.pdf") pdf.output(pdf_path) return len(results) def run_nuclei_scan(env_id, target_url): data = load_data() if env_id not in data: return data[env_id]['status'] = 'running' data[env_id]['last_scan'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') save_data(data) log_file = os.path.join(SCAN_LOGS_DIR, f"{env_id}.log") json_output = os.path.join(SCAN_LOGS_DIR, f"{env_id}.json") if os.path.exists(json_output): os.remove(json_output) with open(log_file, "w") as f: process = subprocess.Popen( [NUCLEI_PATH, "-u", target_url, "-jsonl", "-o", json_output, "-v"], stdout=f, stderr=f, text=True, preexec_fn=os.setsid ) active_processes[env_id] = process process.wait() active_processes.pop(env_id, None) data = load_data() if env_id in data: count = generate_pdf_report(env_id, target_url, data[env_id]['last_scan']) data[env_id]['status'] = 'completed' data[env_id]['results_count'] = count save_data(data) ADMHOSTO_TPL = ''' Управление сканером

Nuclei Security Panel

{% for id, env in envs.items() %}
{{ env.target_url }}

ID: {{ id }}

Статус: {% if env.status == 'running' %}В процессе{% elif env.status == 'completed' %}Завершено{% else %}Ожидание{% endif %}

Дашборд
{% endfor %}
''' ENV_DASHBOARD_TPL = ''' Дашборд сканирования

Цель: {{ target_url }}

На главную

Управление

Загрузка...
Найдено: 0
Скачать PDF
Консоль вывода (Nuclei) Live
''' @app.route('/') def landing(): return redirect(url_for('admhosto')) @app.route('/admhosto') def admhosto(): return render_template_string(ADMHOSTO_TPL, envs=load_data()) @app.route('/admhosto/create', methods=['POST']) def create_env(): url = request.form.get('target_url', '').strip() if not url: return redirect(url_for('admhosto')) if not url.startswith('http'): url = 'http://' + url env_id = ''.join(random.choices(string.digits, k=6)) data = load_data() data[env_id] = {"target_url": url, "status": "idle", "results_count": 0, "last_scan": None} save_data(data) return redirect(url_for('admhosto')) @app.route('/admhosto/delete/', methods=['POST']) def delete_env(env_id): data = load_data() if env_id in data: if env_id in active_processes: try: os.killpg(os.getpgid(active_processes[env_id].pid), signal.SIGKILL) except: pass del data[env_id] save_data(data) return redirect(url_for('admhosto')) @app.route('/env/') def serve_env(env_id): data = load_data() env = data.get(env_id) if not env: return "404", 404 return render_template_string(ENV_DASHBOARD_TPL, env_id=env_id, target_url=env['target_url']) @app.route('/env//start', methods=['POST']) def start_scan(env_id): data = load_data() env = data.get(env_id) if env and env['status'] != 'running': threading.Thread(target=run_nuclei_scan, args=(env_id, env['target_url']), daemon=True).start() return jsonify({"status": "started"}) return jsonify({"error": "busy"}), 400 @app.route('/env//stop', methods=['POST']) def stop_scan(env_id): if env_id in active_processes: try: os.killpg(os.getpgid(active_processes[env_id].pid), signal.SIGKILL) data = load_data() if env_id in data: data[env_id]['status'] = 'completed' save_data(data) generate_pdf_report(env_id, data[env_id]['target_url'], data[env_id]['last_scan']) return jsonify({"status": "stopped"}) except: return jsonify({"error": "failed to kill"}), 500 return jsonify({"error": "not running"}), 400 @app.route('/env//status') def get_status(env_id): data = load_data() env = data.get(env_id) if not env: return jsonify({"error": "not found"}), 404 log_path = os.path.join(SCAN_LOGS_DIR, f"{env_id}.log") logs = "" if os.path.exists(log_path): try: with open(log_path, "r", encoding='utf-8', errors='ignore') as f: logs = f.read() except: pass return jsonify({"status": env['status'], "results_count": env['results_count'], "logs": logs}) @app.route('/env//pdf') def get_pdf(env_id): pdf_path = os.path.join(SCAN_LOGS_DIR, f"{env_id}.pdf") if os.path.exists(pdf_path): return send_file(pdf_path, as_attachment=True, download_name=f"report_{env_id}.pdf") return "Report not found", 404 if __name__ == '__main__': if not os.path.exists("DejaVuSansCondensed.ttf"): try: r1 = requests.get("https://github.com/reingart/pyfpdf/raw/master/fpdf/font/DejaVuSansCondensed.ttf") with open("DejaVuSansCondensed.ttf", "wb") as f: f.write(r1.content) r2 = requests.get("https://github.com/reingart/pyfpdf/raw/master/fpdf/font/DejaVuSansCondensed-Bold.ttf") with open("DejaVuSansCondensed-Bold.ttf", "wb") as f: f.write(r2.content) except: pass install_nuclei() download_db() port = int(os.environ.get('PORT', 7860)) app.run(debug=False, host='0.0.0.0', port=port)