Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import json | |
| import logging | |
| import threading | |
| import subprocess | |
| import random | |
| import string | |
| import requests | |
| import signal | |
| import time | |
| from datetime import datetime | |
| from flask import Flask, render_template_string, request, redirect, url_for, jsonify, send_file | |
| from huggingface_hub import HfApi, hf_hub_download | |
| from dotenv import load_dotenv | |
| from fpdf import FPDF | |
| load_dotenv() | |
| app = Flask(__name__) | |
| app.secret_key = 'security_scanner_secret_key_12345' | |
| DATA_FILE = 'scanner_data.json' | |
| REPO_ID = "Kgshop/synkristest" | |
| HF_TOKEN_WRITE = os.getenv("HF_TOKEN") | |
| HF_TOKEN_READ = os.getenv("HF_TOKEN_READ") | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| data_lock = threading.Lock() | |
| NUCLEI_PATH = "./nuclei" | |
| SCAN_LOGS_DIR = "scan_logs" | |
| active_processes = {} | |
| if not os.path.exists(SCAN_LOGS_DIR): | |
| os.makedirs(SCAN_LOGS_DIR) | |
| def install_nuclei(): | |
| if not os.path.exists(NUCLEI_PATH): | |
| try: | |
| import platform | |
| arch = platform.machine().lower() | |
| if "x86_64" in arch or "amd64" in arch: | |
| url = "https://github.com/projectdiscovery/nuclei/releases/download/v3.3.5/nuclei_3.3.5_linux_amd64.zip" | |
| else: | |
| url = "https://github.com/projectdiscovery/nuclei/releases/download/v3.3.5/nuclei_3.3.5_linux_arm64.zip" | |
| r = requests.get(url) | |
| import zipfile | |
| with zipfile.ZipFile(io.BytesIO(r.content)) as z: | |
| z.extract("nuclei", path=".") | |
| os.chmod(NUCLEI_PATH, 0o755) | |
| except Exception as e: | |
| logging.error(f"Installation error: {e}") | |
| try: | |
| subprocess.run([NUCLEI_PATH, "-update-templates"], check=False) | |
| except: | |
| pass | |
| def download_db(): | |
| if not HF_TOKEN_READ and not HF_TOKEN_WRITE: | |
| return | |
| token = HF_TOKEN_READ if HF_TOKEN_READ else HF_TOKEN_WRITE | |
| try: | |
| hf_hub_download(repo_id=REPO_ID, filename=DATA_FILE, repo_type="dataset", token=token, local_dir=".") | |
| except: | |
| if not os.path.exists(DATA_FILE): | |
| with open(DATA_FILE, 'w') as f: json.dump({}, f) | |
| def upload_db(): | |
| if not HF_TOKEN_WRITE: return | |
| try: | |
| api = HfApi() | |
| api.upload_file(path_or_fileobj=DATA_FILE, path_in_repo=DATA_FILE, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE) | |
| except: | |
| pass | |
| def load_data(): | |
| with data_lock: | |
| if os.path.exists(DATA_FILE): | |
| try: | |
| with open(DATA_FILE, 'r') as f: return json.load(f) | |
| except: return {} | |
| return {} | |
| def save_data(data): | |
| with data_lock: | |
| with open(DATA_FILE, 'w') as f: json.dump(data, f, indent=4) | |
| upload_db() | |
| class ScannerPDF(FPDF): | |
| def header(self): | |
| if os.path.exists("DejaVuSansCondensed.ttf"): | |
| self.add_font('DejaVu', '', 'DejaVuSansCondensed.ttf', uni=True) | |
| self.set_font('DejaVu', '', 15) | |
| else: | |
| self.set_font('Arial', 'B', 15) | |
| self.cell(0, 10, 'Отчет о сканировании уязвимостей (Nuclei)', 0, 1, 'C') | |
| self.ln(5) | |
| def footer(self): | |
| self.set_y(-15) | |
| if os.path.exists("DejaVuSansCondensed.ttf"): | |
| self.set_font('DejaVu', '', 8) | |
| else: | |
| self.set_font('Arial', 'I', 8) | |
| self.cell(0, 10, f'Страница {self.page_no()}', 0, 0, 'C') | |
| def generate_pdf_report(env_id, target_url, last_scan): | |
| json_output = os.path.join(SCAN_LOGS_DIR, f"{env_id}.json") | |
| results = [] | |
| if os.path.exists(json_output): | |
| with open(json_output, "r") as f: | |
| for line in f: | |
| try: results.append(json.loads(line)) | |
| except: continue | |
| pdf = ScannerPDF() | |
| if os.path.exists("DejaVuSansCondensed.ttf"): | |
| pdf.add_font('DejaVu', '', 'DejaVuSansCondensed.ttf', uni=True) | |
| pdf.add_font('DejaVu', 'B', 'DejaVuSansCondensed-Bold.ttf', uni=True) | |
| main_font = 'DejaVu' | |
| else: | |
| main_font = 'Arial' | |
| pdf.add_page() | |
| pdf.set_font(main_font, 'B', 12) | |
| pdf.cell(0, 10, f"Цель: {target_url}", 0, 1) | |
| pdf.cell(0, 10, f"Дата: {last_scan}", 0, 1) | |
| pdf.ln(10) | |
| severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0, "info": 0} | |
| for res in results: | |
| sev = res.get("info", {}).get("severity", "info").lower() | |
| severity_counts[sev] = severity_counts.get(sev, 0) + 1 | |
| pdf.set_font(main_font, 'B', 14) | |
| pdf.cell(0, 10, "Сводка:", 0, 1) | |
| pdf.set_font(main_font, '', 12) | |
| for sev, count in severity_counts.items(): | |
| pdf.cell(0, 8, f"{sev.capitalize()}: {count}", 0, 1) | |
| pdf.ln(10) | |
| pdf.set_font(main_font, 'B', 14) | |
| pdf.cell(0, 10, "Найденные уязвимости:", 0, 1) | |
| for res in results: | |
| pdf.set_font(main_font, 'B', 11) | |
| sev = res.get("info", {}).get("severity", "info").upper() | |
| if sev == "CRITICAL": pdf.set_text_color(150, 0, 0) | |
| elif sev == "HIGH": pdf.set_text_color(255, 0, 0) | |
| else: pdf.set_text_color(0, 0, 0) | |
| pdf.cell(0, 8, f"[{sev}] {res.get('info', {}).get('name')}", 0, 1) | |
| pdf.set_text_color(0, 0, 0) | |
| pdf.set_font(main_font, '', 9) | |
| pdf.multi_cell(0, 5, f"Описание: {res.get('info', {}).get('description', 'N/A')}") | |
| pdf.cell(0, 5, f"Хост: {res.get('matched-at', 'N/A')}", 0, 1) | |
| pdf.ln(3) | |
| pdf_path = os.path.join(SCAN_LOGS_DIR, f"{env_id}.pdf") | |
| pdf.output(pdf_path) | |
| return len(results) | |
| def run_nuclei_scan(env_id, target_url): | |
| data = load_data() | |
| if env_id not in data: return | |
| data[env_id]['status'] = 'running' | |
| data[env_id]['last_scan'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| save_data(data) | |
| log_file = os.path.join(SCAN_LOGS_DIR, f"{env_id}.log") | |
| json_output = os.path.join(SCAN_LOGS_DIR, f"{env_id}.json") | |
| if os.path.exists(json_output): os.remove(json_output) | |
| with open(log_file, "w") as f: | |
| process = subprocess.Popen( | |
| [NUCLEI_PATH, "-u", target_url, "-jsonl", "-o", json_output, "-v"], | |
| stdout=f, stderr=f, text=True, preexec_fn=os.setsid | |
| ) | |
| active_processes[env_id] = process | |
| process.wait() | |
| active_processes.pop(env_id, None) | |
| data = load_data() | |
| if env_id in data: | |
| count = generate_pdf_report(env_id, target_url, data[env_id]['last_scan']) | |
| data[env_id]['status'] = 'completed' | |
| data[env_id]['results_count'] = count | |
| save_data(data) | |
| ADMHOSTO_TPL = ''' | |
| <!DOCTYPE html> | |
| <html lang="ru"> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <title>Управление сканером</title> | |
| <meta name="viewport" content="width=device-width, initial-scale=1"> | |
| <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet"> | |
| <style> | |
| body { background: #f4f7f6; font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; } | |
| .container { max-width: 900px; } | |
| .card { border: none; border-radius: 15px; box-shadow: 0 4px 6px rgba(0,0,0,0.1); } | |
| .env-card { transition: 0.2s; } | |
| .env-card:hover { transform: translateY(-3px); } | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container py-5"> | |
| <h1 class="mb-4 text-center text-primary">Nuclei Security Panel</h1> | |
| <div class="card p-4 mb-5"> | |
| <form action="/admhosto/create" method="POST" class="row g-3"> | |
| <div class="col-md-9"> | |
| <input type="text" name="target_url" class="form-control form-control-lg" placeholder="Введите URL цели (например, example.com)" required> | |
| </div> | |
| <div class="col-md-3"> | |
| <button type="submit" class="btn btn-primary btn-lg w-100">Создать</button> | |
| </div> | |
| </form> | |
| </div> | |
| <div class="row"> | |
| {% for id, env in envs.items() %} | |
| <div class="col-md-6 mb-4"> | |
| <div class="card env-card h-100"> | |
| <div class="card-body"> | |
| <h5 class="card-title text-dark">{{ env.target_url }}</h5> | |
| <p class="text-muted small mb-1">ID: {{ id }}</p> | |
| <p class="mb-3">Статус: | |
| <span class="badge {% if env.status == 'running' %}bg-warning{% elif env.status == 'completed' %}bg-success{% else %}bg-secondary{% endif %}"> | |
| {% if env.status == 'running' %}В процессе{% elif env.status == 'completed' %}Завершено{% else %}Ожидание{% endif %} | |
| </span> | |
| </p> | |
| <div class="d-flex gap-2"> | |
| <a href="/env/{{ id }}" class="btn btn-sm btn-info text-white flex-grow-1">Дашборд</a> | |
| <form action="/admhosto/delete/{{ id }}" method="POST" onsubmit="return confirm('Удалить?');"> | |
| <button class="btn btn-sm btn-outline-danger">Удалить</button> | |
| </form> | |
| </div> | |
| </div> | |
| </div> | |
| </div> | |
| {% endfor %} | |
| </div> | |
| </div> | |
| </body> | |
| </html> | |
| ''' | |
| ENV_DASHBOARD_TPL = ''' | |
| <!DOCTYPE html> | |
| <html lang="ru"> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <title>Дашборд сканирования</title> | |
| <meta name="viewport" content="width=device-width, initial-scale=1"> | |
| <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet"> | |
| <style> | |
| body { background: #0f172a; color: #f8fafc; } | |
| .card { background: #1e293b; border: 1px solid #334155; color: #f8fafc; } | |
| #log-container { | |
| background: #000000; | |
| color: #10b981; | |
| height: 550px; | |
| overflow-y: auto; | |
| font-family: 'Consolas', monospace; | |
| padding: 15px; | |
| border-radius: 8px; | |
| font-size: 12px; | |
| line-height: 1.4; | |
| border: 1px solid #334155; | |
| } | |
| .status-badge { font-size: 1.1rem; padding: 8px 15px; border-radius: 50px; } | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container-fluid py-4 px-4"> | |
| <div class="d-flex justify-content-between align-items-center mb-4"> | |
| <h2 class="m-0">Цель: <span class="text-info">{{ target_url }}</span></h2> | |
| <a href="/admhosto" class="btn btn-outline-light">На главную</a> | |
| </div> | |
| <div class="row g-4"> | |
| <div class="col-lg-3"> | |
| <div class="card p-4 shadow-sm h-100"> | |
| <h4 class="mb-4">Управление</h4> | |
| <div class="mb-4 text-center"> | |
| <div id="status-text" class="status-badge mb-2 bg-secondary text-white">Загрузка...</div> | |
| <div class="mt-2 small text-muted">Найдено: <span id="findings-count" class="text-white fw-bold">0</span></div> | |
| </div> | |
| <button id="scan-btn" class="btn btn-primary w-100 mb-3 py-2" onclick="startScan()">Запустить Nuclei</button> | |
| <button id="stop-btn" class="btn btn-danger w-100 mb-3 py-2 d-none" onclick="stopScan()">Остановить</button> | |
| <a id="pdf-link" href="/env/{{ env_id }}/pdf" class="btn btn-success w-100 py-2 disabled">Скачать PDF</a> | |
| </div> | |
| </div> | |
| <div class="col-lg-9"> | |
| <div class="card p-3 shadow-sm"> | |
| <div class="d-flex justify-content-between mb-2"> | |
| <span class="text-muted small">Консоль вывода (Nuclei)</span> | |
| <span class="badge bg-dark">Live</span> | |
| </div> | |
| <div id="log-container"></div> | |
| </div> | |
| </div> | |
| </div> | |
| </div> | |
| <script> | |
| const envId = "{{ env_id }}"; | |
| let pollInterval = null; | |
| function updateUI(data) { | |
| const statusEl = document.getElementById('status-text'); | |
| const findingsEl = document.getElementById('findings-count'); | |
| const scanBtn = document.getElementById('scan-btn'); | |
| const stopBtn = document.getElementById('stop-btn'); | |
| const pdfLink = document.getElementById('pdf-link'); | |
| const logContainer = document.getElementById('log-container'); | |
| findingsEl.innerText = data.results_count; | |
| logContainer.innerText = data.logs; | |
| logContainer.scrollTop = logContainer.scrollHeight; | |
| if (data.status === 'running') { | |
| statusEl.innerText = 'Сканирование...'; | |
| statusEl.className = 'status-badge mb-2 bg-warning text-dark'; | |
| scanBtn.classList.add('d-none'); | |
| stopBtn.classList.remove('d-none'); | |
| pdfLink.classList.add('disabled'); | |
| } else if (data.status === 'completed') { | |
| statusEl.innerText = 'Завершено'; | |
| statusEl.className = 'status-badge mb-2 bg-success text-white'; | |
| scanBtn.classList.remove('d-none'); | |
| stopBtn.classList.add('d-none'); | |
| pdfLink.classList.remove('disabled'); | |
| } else { | |
| statusEl.innerText = 'Ожидание'; | |
| statusEl.className = 'status-badge mb-2 bg-secondary text-white'; | |
| scanBtn.classList.remove('d-none'); | |
| stopBtn.classList.add('d-none'); | |
| } | |
| } | |
| function poll() { | |
| fetch('/env/' + envId + '/status') | |
| .then(r => r.json()) | |
| .then(data => { | |
| updateUI(data); | |
| if (data.status !== 'running' && pollInterval) { | |
| clearInterval(pollInterval); | |
| pollInterval = null; | |
| } | |
| }); | |
| } | |
| function startScan() { | |
| fetch('/env/' + envId + '/start', {method: 'POST'}) | |
| .then(() => { | |
| if (!pollInterval) pollInterval = setInterval(poll, 1000); | |
| }); | |
| } | |
| function stopScan() { | |
| fetch('/env/' + envId + '/stop', {method: 'POST'}) | |
| .then(() => { | |
| setTimeout(poll, 500); | |
| }); | |
| } | |
| poll(); | |
| pollInterval = setInterval(poll, 2000); | |
| </script> | |
| </body> | |
| </html> | |
| ''' | |
| def landing(): | |
| return redirect(url_for('admhosto')) | |
| def admhosto(): | |
| return render_template_string(ADMHOSTO_TPL, envs=load_data()) | |
| def create_env(): | |
| url = request.form.get('target_url', '').strip() | |
| if not url: return redirect(url_for('admhosto')) | |
| if not url.startswith('http'): url = 'http://' + url | |
| env_id = ''.join(random.choices(string.digits, k=6)) | |
| data = load_data() | |
| data[env_id] = {"target_url": url, "status": "idle", "results_count": 0, "last_scan": None} | |
| save_data(data) | |
| return redirect(url_for('admhosto')) | |
| def delete_env(env_id): | |
| data = load_data() | |
| if env_id in data: | |
| if env_id in active_processes: | |
| try: os.killpg(os.getpgid(active_processes[env_id].pid), signal.SIGKILL) | |
| except: pass | |
| del data[env_id] | |
| save_data(data) | |
| return redirect(url_for('admhosto')) | |
| def serve_env(env_id): | |
| data = load_data() | |
| env = data.get(env_id) | |
| if not env: return "404", 404 | |
| return render_template_string(ENV_DASHBOARD_TPL, env_id=env_id, target_url=env['target_url']) | |
| def start_scan(env_id): | |
| data = load_data() | |
| env = data.get(env_id) | |
| if env and env['status'] != 'running': | |
| threading.Thread(target=run_nuclei_scan, args=(env_id, env['target_url']), daemon=True).start() | |
| return jsonify({"status": "started"}) | |
| return jsonify({"error": "busy"}), 400 | |
| def stop_scan(env_id): | |
| if env_id in active_processes: | |
| try: | |
| os.killpg(os.getpgid(active_processes[env_id].pid), signal.SIGKILL) | |
| data = load_data() | |
| if env_id in data: | |
| data[env_id]['status'] = 'completed' | |
| save_data(data) | |
| generate_pdf_report(env_id, data[env_id]['target_url'], data[env_id]['last_scan']) | |
| return jsonify({"status": "stopped"}) | |
| except: | |
| return jsonify({"error": "failed to kill"}), 500 | |
| return jsonify({"error": "not running"}), 400 | |
| def get_status(env_id): | |
| data = load_data() | |
| env = data.get(env_id) | |
| if not env: return jsonify({"error": "not found"}), 404 | |
| log_path = os.path.join(SCAN_LOGS_DIR, f"{env_id}.log") | |
| logs = "" | |
| if os.path.exists(log_path): | |
| try: | |
| with open(log_path, "r", encoding='utf-8', errors='ignore') as f: | |
| logs = f.read() | |
| except: pass | |
| return jsonify({"status": env['status'], "results_count": env['results_count'], "logs": logs}) | |
| def get_pdf(env_id): | |
| pdf_path = os.path.join(SCAN_LOGS_DIR, f"{env_id}.pdf") | |
| if os.path.exists(pdf_path): | |
| return send_file(pdf_path, as_attachment=True, download_name=f"report_{env_id}.pdf") | |
| return "Report not found", 404 | |
| if __name__ == '__main__': | |
| if not os.path.exists("DejaVuSansCondensed.ttf"): | |
| try: | |
| r1 = requests.get("https://github.com/reingart/pyfpdf/raw/master/fpdf/font/DejaVuSansCondensed.ttf") | |
| with open("DejaVuSansCondensed.ttf", "wb") as f: f.write(r1.content) | |
| r2 = requests.get("https://github.com/reingart/pyfpdf/raw/master/fpdf/font/DejaVuSansCondensed-Bold.ttf") | |
| with open("DejaVuSansCondensed-Bold.ttf", "wb") as f: f.write(r2.content) | |
| except: pass | |
| install_nuclei() | |
| download_db() | |
| port = int(os.environ.get('PORT', 7860)) | |
| app.run(debug=False, host='0.0.0.0', port=port) |