import os import io import json import logging import threading import subprocess import random import string import requests import signal import time from datetime import datetime from flask import Flask, render_template_string, request, redirect, url_for, jsonify, send_file from huggingface_hub import HfApi, hf_hub_download from dotenv import load_dotenv from fpdf import FPDF load_dotenv() app = Flask(__name__) app.secret_key = 'security_scanner_secret_key_12345' DATA_FILE = 'scanner_data.json' REPO_ID = "Kgshop/synkristest" HF_TOKEN_WRITE = os.getenv("HF_TOKEN") HF_TOKEN_READ = os.getenv("HF_TOKEN_READ") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') data_lock = threading.Lock() NUCLEI_PATH = "./nuclei" SCAN_LOGS_DIR = "scan_logs" active_processes = {} if not os.path.exists(SCAN_LOGS_DIR): os.makedirs(SCAN_LOGS_DIR) def install_nuclei(): if not os.path.exists(NUCLEI_PATH): try: import platform arch = platform.machine().lower() if "x86_64" in arch or "amd64" in arch: url = "https://github.com/projectdiscovery/nuclei/releases/download/v3.3.5/nuclei_3.3.5_linux_amd64.zip" else: url = "https://github.com/projectdiscovery/nuclei/releases/download/v3.3.5/nuclei_3.3.5_linux_arm64.zip" r = requests.get(url) import zipfile with zipfile.ZipFile(io.BytesIO(r.content)) as z: z.extract("nuclei", path=".") os.chmod(NUCLEI_PATH, 0o755) except Exception as e: logging.error(f"Installation error: {e}") try: subprocess.run([NUCLEI_PATH, "-update-templates"], check=False) except: pass def download_db(): if not HF_TOKEN_READ and not HF_TOKEN_WRITE: return token = HF_TOKEN_READ if HF_TOKEN_READ else HF_TOKEN_WRITE try: hf_hub_download(repo_id=REPO_ID, filename=DATA_FILE, repo_type="dataset", token=token, local_dir=".") except: if not os.path.exists(DATA_FILE): with open(DATA_FILE, 'w') as f: json.dump({}, f) def upload_db(): if not HF_TOKEN_WRITE: return try: api = HfApi() api.upload_file(path_or_fileobj=DATA_FILE, path_in_repo=DATA_FILE, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE) except: pass def load_data(): with data_lock: if os.path.exists(DATA_FILE): try: with open(DATA_FILE, 'r') as f: return json.load(f) except: return {} return {} def save_data(data): with data_lock: with open(DATA_FILE, 'w') as f: json.dump(data, f, indent=4) upload_db() class ScannerPDF(FPDF): def header(self): if os.path.exists("DejaVuSansCondensed.ttf"): self.add_font('DejaVu', '', 'DejaVuSansCondensed.ttf', uni=True) self.set_font('DejaVu', '', 15) else: self.set_font('Arial', 'B', 15) self.cell(0, 10, 'Отчет о сканировании уязвимостей (Nuclei)', 0, 1, 'C') self.ln(5) def footer(self): self.set_y(-15) if os.path.exists("DejaVuSansCondensed.ttf"): self.set_font('DejaVu', '', 8) else: self.set_font('Arial', 'I', 8) self.cell(0, 10, f'Страница {self.page_no()}', 0, 0, 'C') def generate_pdf_report(env_id, target_url, last_scan): json_output = os.path.join(SCAN_LOGS_DIR, f"{env_id}.json") results = [] if os.path.exists(json_output): with open(json_output, "r") as f: for line in f: try: results.append(json.loads(line)) except: continue pdf = ScannerPDF() if os.path.exists("DejaVuSansCondensed.ttf"): pdf.add_font('DejaVu', '', 'DejaVuSansCondensed.ttf', uni=True) pdf.add_font('DejaVu', 'B', 'DejaVuSansCondensed-Bold.ttf', uni=True) main_font = 'DejaVu' else: main_font = 'Arial' pdf.add_page() pdf.set_font(main_font, 'B', 12) pdf.cell(0, 10, f"Цель: {target_url}", 0, 1) pdf.cell(0, 10, f"Дата: {last_scan}", 0, 1) pdf.ln(10) severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0, "info": 0} for res in results: sev = res.get("info", {}).get("severity", "info").lower() severity_counts[sev] = severity_counts.get(sev, 0) + 1 pdf.set_font(main_font, 'B', 14) pdf.cell(0, 10, "Сводка:", 0, 1) pdf.set_font(main_font, '', 12) for sev, count in severity_counts.items(): pdf.cell(0, 8, f"{sev.capitalize()}: {count}", 0, 1) pdf.ln(10) pdf.set_font(main_font, 'B', 14) pdf.cell(0, 10, "Найденные уязвимости:", 0, 1) for res in results: pdf.set_font(main_font, 'B', 11) sev = res.get("info", {}).get("severity", "info").upper() if sev == "CRITICAL": pdf.set_text_color(150, 0, 0) elif sev == "HIGH": pdf.set_text_color(255, 0, 0) else: pdf.set_text_color(0, 0, 0) pdf.cell(0, 8, f"[{sev}] {res.get('info', {}).get('name')}", 0, 1) pdf.set_text_color(0, 0, 0) pdf.set_font(main_font, '', 9) pdf.multi_cell(0, 5, f"Описание: {res.get('info', {}).get('description', 'N/A')}") pdf.cell(0, 5, f"Хост: {res.get('matched-at', 'N/A')}", 0, 1) pdf.ln(3) pdf_path = os.path.join(SCAN_LOGS_DIR, f"{env_id}.pdf") pdf.output(pdf_path) return len(results) def run_nuclei_scan(env_id, target_url): data = load_data() if env_id not in data: return data[env_id]['status'] = 'running' data[env_id]['last_scan'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') save_data(data) log_file = os.path.join(SCAN_LOGS_DIR, f"{env_id}.log") json_output = os.path.join(SCAN_LOGS_DIR, f"{env_id}.json") if os.path.exists(json_output): os.remove(json_output) with open(log_file, "w") as f: process = subprocess.Popen( [NUCLEI_PATH, "-u", target_url, "-jsonl", "-o", json_output, "-v"], stdout=f, stderr=f, text=True, preexec_fn=os.setsid ) active_processes[env_id] = process process.wait() active_processes.pop(env_id, None) data = load_data() if env_id in data: count = generate_pdf_report(env_id, target_url, data[env_id]['last_scan']) data[env_id]['status'] = 'completed' data[env_id]['results_count'] = count save_data(data) ADMHOSTO_TPL = '''
ID: {{ id }}
Статус: {% if env.status == 'running' %}В процессе{% elif env.status == 'completed' %}Завершено{% else %}Ожидание{% endif %}