Cybersec / app.py
Kgshop's picture
Update app.py
9426874 verified
import os
import io
import json
import logging
import threading
import subprocess
import random
import string
import requests
import signal
import time
from datetime import datetime
from flask import Flask, render_template_string, request, redirect, url_for, jsonify, send_file
from huggingface_hub import HfApi, hf_hub_download
from dotenv import load_dotenv
from fpdf import FPDF
load_dotenv()
app = Flask(__name__)
app.secret_key = 'security_scanner_secret_key_12345'
DATA_FILE = 'scanner_data.json'
REPO_ID = "Kgshop/synkristest"
HF_TOKEN_WRITE = os.getenv("HF_TOKEN")
HF_TOKEN_READ = os.getenv("HF_TOKEN_READ")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
data_lock = threading.Lock()
NUCLEI_PATH = "./nuclei"
SCAN_LOGS_DIR = "scan_logs"
active_processes = {}
if not os.path.exists(SCAN_LOGS_DIR):
os.makedirs(SCAN_LOGS_DIR)
def install_nuclei():
if not os.path.exists(NUCLEI_PATH):
try:
import platform
arch = platform.machine().lower()
if "x86_64" in arch or "amd64" in arch:
url = "https://github.com/projectdiscovery/nuclei/releases/download/v3.3.5/nuclei_3.3.5_linux_amd64.zip"
else:
url = "https://github.com/projectdiscovery/nuclei/releases/download/v3.3.5/nuclei_3.3.5_linux_arm64.zip"
r = requests.get(url)
import zipfile
with zipfile.ZipFile(io.BytesIO(r.content)) as z:
z.extract("nuclei", path=".")
os.chmod(NUCLEI_PATH, 0o755)
except Exception as e:
logging.error(f"Installation error: {e}")
try:
subprocess.run([NUCLEI_PATH, "-update-templates"], check=False)
except:
pass
def download_db():
if not HF_TOKEN_READ and not HF_TOKEN_WRITE:
return
token = HF_TOKEN_READ if HF_TOKEN_READ else HF_TOKEN_WRITE
try:
hf_hub_download(repo_id=REPO_ID, filename=DATA_FILE, repo_type="dataset", token=token, local_dir=".")
except:
if not os.path.exists(DATA_FILE):
with open(DATA_FILE, 'w') as f: json.dump({}, f)
def upload_db():
if not HF_TOKEN_WRITE: return
try:
api = HfApi()
api.upload_file(path_or_fileobj=DATA_FILE, path_in_repo=DATA_FILE, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE)
except:
pass
def load_data():
with data_lock:
if os.path.exists(DATA_FILE):
try:
with open(DATA_FILE, 'r') as f: return json.load(f)
except: return {}
return {}
def save_data(data):
with data_lock:
with open(DATA_FILE, 'w') as f: json.dump(data, f, indent=4)
upload_db()
class ScannerPDF(FPDF):
def header(self):
if os.path.exists("DejaVuSansCondensed.ttf"):
self.add_font('DejaVu', '', 'DejaVuSansCondensed.ttf', uni=True)
self.set_font('DejaVu', '', 15)
else:
self.set_font('Arial', 'B', 15)
self.cell(0, 10, 'Отчет о сканировании уязвимостей (Nuclei)', 0, 1, 'C')
self.ln(5)
def footer(self):
self.set_y(-15)
if os.path.exists("DejaVuSansCondensed.ttf"):
self.set_font('DejaVu', '', 8)
else:
self.set_font('Arial', 'I', 8)
self.cell(0, 10, f'Страница {self.page_no()}', 0, 0, 'C')
def generate_pdf_report(env_id, target_url, last_scan):
json_output = os.path.join(SCAN_LOGS_DIR, f"{env_id}.json")
results = []
if os.path.exists(json_output):
with open(json_output, "r") as f:
for line in f:
try: results.append(json.loads(line))
except: continue
pdf = ScannerPDF()
if os.path.exists("DejaVuSansCondensed.ttf"):
pdf.add_font('DejaVu', '', 'DejaVuSansCondensed.ttf', uni=True)
pdf.add_font('DejaVu', 'B', 'DejaVuSansCondensed-Bold.ttf', uni=True)
main_font = 'DejaVu'
else:
main_font = 'Arial'
pdf.add_page()
pdf.set_font(main_font, 'B', 12)
pdf.cell(0, 10, f"Цель: {target_url}", 0, 1)
pdf.cell(0, 10, f"Дата: {last_scan}", 0, 1)
pdf.ln(10)
severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0, "info": 0}
for res in results:
sev = res.get("info", {}).get("severity", "info").lower()
severity_counts[sev] = severity_counts.get(sev, 0) + 1
pdf.set_font(main_font, 'B', 14)
pdf.cell(0, 10, "Сводка:", 0, 1)
pdf.set_font(main_font, '', 12)
for sev, count in severity_counts.items():
pdf.cell(0, 8, f"{sev.capitalize()}: {count}", 0, 1)
pdf.ln(10)
pdf.set_font(main_font, 'B', 14)
pdf.cell(0, 10, "Найденные уязвимости:", 0, 1)
for res in results:
pdf.set_font(main_font, 'B', 11)
sev = res.get("info", {}).get("severity", "info").upper()
if sev == "CRITICAL": pdf.set_text_color(150, 0, 0)
elif sev == "HIGH": pdf.set_text_color(255, 0, 0)
else: pdf.set_text_color(0, 0, 0)
pdf.cell(0, 8, f"[{sev}] {res.get('info', {}).get('name')}", 0, 1)
pdf.set_text_color(0, 0, 0)
pdf.set_font(main_font, '', 9)
pdf.multi_cell(0, 5, f"Описание: {res.get('info', {}).get('description', 'N/A')}")
pdf.cell(0, 5, f"Хост: {res.get('matched-at', 'N/A')}", 0, 1)
pdf.ln(3)
pdf_path = os.path.join(SCAN_LOGS_DIR, f"{env_id}.pdf")
pdf.output(pdf_path)
return len(results)
def run_nuclei_scan(env_id, target_url):
data = load_data()
if env_id not in data: return
data[env_id]['status'] = 'running'
data[env_id]['last_scan'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
save_data(data)
log_file = os.path.join(SCAN_LOGS_DIR, f"{env_id}.log")
json_output = os.path.join(SCAN_LOGS_DIR, f"{env_id}.json")
if os.path.exists(json_output): os.remove(json_output)
with open(log_file, "w") as f:
process = subprocess.Popen(
[NUCLEI_PATH, "-u", target_url, "-jsonl", "-o", json_output, "-v"],
stdout=f, stderr=f, text=True, preexec_fn=os.setsid
)
active_processes[env_id] = process
process.wait()
active_processes.pop(env_id, None)
data = load_data()
if env_id in data:
count = generate_pdf_report(env_id, target_url, data[env_id]['last_scan'])
data[env_id]['status'] = 'completed'
data[env_id]['results_count'] = count
save_data(data)
ADMHOSTO_TPL = '''
<!DOCTYPE html>
<html lang="ru">
<head>
<meta charset="UTF-8">
<title>Управление сканером</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">
<style>
body { background: #f4f7f6; font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; }
.container { max-width: 900px; }
.card { border: none; border-radius: 15px; box-shadow: 0 4px 6px rgba(0,0,0,0.1); }
.env-card { transition: 0.2s; }
.env-card:hover { transform: translateY(-3px); }
</style>
</head>
<body>
<div class="container py-5">
<h1 class="mb-4 text-center text-primary">Nuclei Security Panel</h1>
<div class="card p-4 mb-5">
<form action="/admhosto/create" method="POST" class="row g-3">
<div class="col-md-9">
<input type="text" name="target_url" class="form-control form-control-lg" placeholder="Введите URL цели (например, example.com)" required>
</div>
<div class="col-md-3">
<button type="submit" class="btn btn-primary btn-lg w-100">Создать</button>
</div>
</form>
</div>
<div class="row">
{% for id, env in envs.items() %}
<div class="col-md-6 mb-4">
<div class="card env-card h-100">
<div class="card-body">
<h5 class="card-title text-dark">{{ env.target_url }}</h5>
<p class="text-muted small mb-1">ID: {{ id }}</p>
<p class="mb-3">Статус:
<span class="badge {% if env.status == 'running' %}bg-warning{% elif env.status == 'completed' %}bg-success{% else %}bg-secondary{% endif %}">
{% if env.status == 'running' %}В процессе{% elif env.status == 'completed' %}Завершено{% else %}Ожидание{% endif %}
</span>
</p>
<div class="d-flex gap-2">
<a href="/env/{{ id }}" class="btn btn-sm btn-info text-white flex-grow-1">Дашборд</a>
<form action="/admhosto/delete/{{ id }}" method="POST" onsubmit="return confirm('Удалить?');">
<button class="btn btn-sm btn-outline-danger">Удалить</button>
</form>
</div>
</div>
</div>
</div>
{% endfor %}
</div>
</div>
</body>
</html>
'''
ENV_DASHBOARD_TPL = '''
<!DOCTYPE html>
<html lang="ru">
<head>
<meta charset="UTF-8">
<title>Дашборд сканирования</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">
<style>
body { background: #0f172a; color: #f8fafc; }
.card { background: #1e293b; border: 1px solid #334155; color: #f8fafc; }
#log-container {
background: #000000;
color: #10b981;
height: 550px;
overflow-y: auto;
font-family: 'Consolas', monospace;
padding: 15px;
border-radius: 8px;
font-size: 12px;
line-height: 1.4;
border: 1px solid #334155;
}
.status-badge { font-size: 1.1rem; padding: 8px 15px; border-radius: 50px; }
</style>
</head>
<body>
<div class="container-fluid py-4 px-4">
<div class="d-flex justify-content-between align-items-center mb-4">
<h2 class="m-0">Цель: <span class="text-info">{{ target_url }}</span></h2>
<a href="/admhosto" class="btn btn-outline-light">На главную</a>
</div>
<div class="row g-4">
<div class="col-lg-3">
<div class="card p-4 shadow-sm h-100">
<h4 class="mb-4">Управление</h4>
<div class="mb-4 text-center">
<div id="status-text" class="status-badge mb-2 bg-secondary text-white">Загрузка...</div>
<div class="mt-2 small text-muted">Найдено: <span id="findings-count" class="text-white fw-bold">0</span></div>
</div>
<button id="scan-btn" class="btn btn-primary w-100 mb-3 py-2" onclick="startScan()">Запустить Nuclei</button>
<button id="stop-btn" class="btn btn-danger w-100 mb-3 py-2 d-none" onclick="stopScan()">Остановить</button>
<a id="pdf-link" href="/env/{{ env_id }}/pdf" class="btn btn-success w-100 py-2 disabled">Скачать PDF</a>
</div>
</div>
<div class="col-lg-9">
<div class="card p-3 shadow-sm">
<div class="d-flex justify-content-between mb-2">
<span class="text-muted small">Консоль вывода (Nuclei)</span>
<span class="badge bg-dark">Live</span>
</div>
<div id="log-container"></div>
</div>
</div>
</div>
</div>
<script>
const envId = "{{ env_id }}";
let pollInterval = null;
function updateUI(data) {
const statusEl = document.getElementById('status-text');
const findingsEl = document.getElementById('findings-count');
const scanBtn = document.getElementById('scan-btn');
const stopBtn = document.getElementById('stop-btn');
const pdfLink = document.getElementById('pdf-link');
const logContainer = document.getElementById('log-container');
findingsEl.innerText = data.results_count;
logContainer.innerText = data.logs;
logContainer.scrollTop = logContainer.scrollHeight;
if (data.status === 'running') {
statusEl.innerText = 'Сканирование...';
statusEl.className = 'status-badge mb-2 bg-warning text-dark';
scanBtn.classList.add('d-none');
stopBtn.classList.remove('d-none');
pdfLink.classList.add('disabled');
} else if (data.status === 'completed') {
statusEl.innerText = 'Завершено';
statusEl.className = 'status-badge mb-2 bg-success text-white';
scanBtn.classList.remove('d-none');
stopBtn.classList.add('d-none');
pdfLink.classList.remove('disabled');
} else {
statusEl.innerText = 'Ожидание';
statusEl.className = 'status-badge mb-2 bg-secondary text-white';
scanBtn.classList.remove('d-none');
stopBtn.classList.add('d-none');
}
}
function poll() {
fetch('/env/' + envId + '/status')
.then(r => r.json())
.then(data => {
updateUI(data);
if (data.status !== 'running' && pollInterval) {
clearInterval(pollInterval);
pollInterval = null;
}
});
}
function startScan() {
fetch('/env/' + envId + '/start', {method: 'POST'})
.then(() => {
if (!pollInterval) pollInterval = setInterval(poll, 1000);
});
}
function stopScan() {
fetch('/env/' + envId + '/stop', {method: 'POST'})
.then(() => {
setTimeout(poll, 500);
});
}
poll();
pollInterval = setInterval(poll, 2000);
</script>
</body>
</html>
'''
@app.route('/')
def landing():
return redirect(url_for('admhosto'))
@app.route('/admhosto')
def admhosto():
return render_template_string(ADMHOSTO_TPL, envs=load_data())
@app.route('/admhosto/create', methods=['POST'])
def create_env():
url = request.form.get('target_url', '').strip()
if not url: return redirect(url_for('admhosto'))
if not url.startswith('http'): url = 'http://' + url
env_id = ''.join(random.choices(string.digits, k=6))
data = load_data()
data[env_id] = {"target_url": url, "status": "idle", "results_count": 0, "last_scan": None}
save_data(data)
return redirect(url_for('admhosto'))
@app.route('/admhosto/delete/<env_id>', methods=['POST'])
def delete_env(env_id):
data = load_data()
if env_id in data:
if env_id in active_processes:
try: os.killpg(os.getpgid(active_processes[env_id].pid), signal.SIGKILL)
except: pass
del data[env_id]
save_data(data)
return redirect(url_for('admhosto'))
@app.route('/env/<env_id>')
def serve_env(env_id):
data = load_data()
env = data.get(env_id)
if not env: return "404", 404
return render_template_string(ENV_DASHBOARD_TPL, env_id=env_id, target_url=env['target_url'])
@app.route('/env/<env_id>/start', methods=['POST'])
def start_scan(env_id):
data = load_data()
env = data.get(env_id)
if env and env['status'] != 'running':
threading.Thread(target=run_nuclei_scan, args=(env_id, env['target_url']), daemon=True).start()
return jsonify({"status": "started"})
return jsonify({"error": "busy"}), 400
@app.route('/env/<env_id>/stop', methods=['POST'])
def stop_scan(env_id):
if env_id in active_processes:
try:
os.killpg(os.getpgid(active_processes[env_id].pid), signal.SIGKILL)
data = load_data()
if env_id in data:
data[env_id]['status'] = 'completed'
save_data(data)
generate_pdf_report(env_id, data[env_id]['target_url'], data[env_id]['last_scan'])
return jsonify({"status": "stopped"})
except:
return jsonify({"error": "failed to kill"}), 500
return jsonify({"error": "not running"}), 400
@app.route('/env/<env_id>/status')
def get_status(env_id):
data = load_data()
env = data.get(env_id)
if not env: return jsonify({"error": "not found"}), 404
log_path = os.path.join(SCAN_LOGS_DIR, f"{env_id}.log")
logs = ""
if os.path.exists(log_path):
try:
with open(log_path, "r", encoding='utf-8', errors='ignore') as f:
logs = f.read()
except: pass
return jsonify({"status": env['status'], "results_count": env['results_count'], "logs": logs})
@app.route('/env/<env_id>/pdf')
def get_pdf(env_id):
pdf_path = os.path.join(SCAN_LOGS_DIR, f"{env_id}.pdf")
if os.path.exists(pdf_path):
return send_file(pdf_path, as_attachment=True, download_name=f"report_{env_id}.pdf")
return "Report not found", 404
if __name__ == '__main__':
if not os.path.exists("DejaVuSansCondensed.ttf"):
try:
r1 = requests.get("https://github.com/reingart/pyfpdf/raw/master/fpdf/font/DejaVuSansCondensed.ttf")
with open("DejaVuSansCondensed.ttf", "wb") as f: f.write(r1.content)
r2 = requests.get("https://github.com/reingart/pyfpdf/raw/master/fpdf/font/DejaVuSansCondensed-Bold.ttf")
with open("DejaVuSansCondensed-Bold.ttf", "wb") as f: f.write(r2.content)
except: pass
install_nuclei()
download_db()
port = int(os.environ.get('PORT', 7860))
app.run(debug=False, host='0.0.0.0', port=port)