from flask import Flask, render_template_string, jsonify, request, send_file import sys import os import io from datetime import datetime from collections import Counter from reportlab.lib.pagesizes import letter from reportlab.pdfgen import canvas from reportlab.lib.utils import ImageReader from reportlab.lib.units import inch from scapy.utils import wrpcap import tempfile sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from database.db import Database from response.block_ip import block_ip_windows, unblock_ip_windows from geoip.blocker import GeoIPBlocker app = Flask(__name__) db = None pcap_sniffer = None geoip = None geoip_available = False # ---------- PDF Helpers ---------- def add_page_number(c, page_num): c.saveState() c.setFont("Helvetica", 8) c.drawString(letter[0] - 80, 30, f"Page {page_num}") c.restoreState() def draw_header(c, title, subtitle=""): c.setFont("Helvetica-Bold", 16) c.drawString(50, letter[1] - 50, title) c.setFont("Helvetica", 10) c.drawString(50, letter[1] - 70, subtitle) c.line(50, letter[1] - 80, letter[0] - 50, letter[1] - 80) def draw_watermark(c, logo_path): if os.path.exists(logo_path): try: img = ImageReader(logo_path) c.saveState() c.setFillAlpha(0.2) c.drawImage(img, letter[0]/2 - 1.5*inch, letter[1]/2 - 1.5*inch, width=3*inch, height=3*inch, mask='auto', preserveAspectRatio=True) c.restoreState() except: pass def generate_blocked_ips_pdf(): blocked = db.get_blocked_ips() buffer = io.BytesIO() c = canvas.Canvas(buffer, pagesize=letter) logo_path = os.path.join(os.path.dirname(__file__), 'static', 'logo.png') page_num = 1 draw_watermark(c, logo_path) draw_header(c, "MayOne Security Framework", "Blocked IP Addresses Report") y = letter[1] - 110 c.drawString(50, y, f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") y -= 20 c.drawString(50, y, f"Total Blocked IPs: {len(blocked)}") y -= 40 c.setFont("Helvetica-Bold", 10) c.drawString(50, y, "IP Address") c.drawString(160, y, "Blocked Since") c.drawString(300, y, "Reason") c.setFont("Helvetica", 10) y -= 20 for ip, block_time, reason in blocked: if y < 80: draw_watermark(c, logo_path) add_page_number(c, page_num) c.showPage() page_num += 1 draw_header(c, "MayOne Security Framework (cont.)", "Blocked IPs") y = letter[1] - 110 c.setFont("Helvetica-Bold", 10) c.drawString(50, y, "IP Address") c.drawString(160, y, "Blocked Since") c.drawString(300, y, "Reason") c.setFont("Helvetica", 10) y -= 20 reason_short = reason[:60] + "..." if len(reason) > 60 else reason c.drawString(50, y, ip) c.drawString(160, y, block_time[:19]) c.drawString(300, y, reason_short) y -= 20 add_page_number(c, page_num) c.save() buffer.seek(0) return buffer def generate_all_ips_pdf(): events = db.get_recent_events(limit=1000) buffer = io.BytesIO() c = canvas.Canvas(buffer, pagesize=letter) logo_path = os.path.join(os.path.dirname(__file__), 'static', 'logo.png') page_num = 1 draw_watermark(c, logo_path) draw_header(c, "MayOne Security Framework", "Complete IP Traffic Log") y = letter[1] - 110 c.drawString(50, y, f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") y -= 20 c.drawString(50, y, f"Total Events: {len(events)}") y -= 40 headers = ["Timestamp", "Src IP", "Dst IP", "Proto", "Port", "Size", "Threat", "Risk"] col_widths = [100, 80, 80, 40, 40, 50, 70, 40] c.setFont("Helvetica-Bold", 8) x = 50 for i, h in enumerate(headers): c.drawString(x, y, h) x += col_widths[i] c.setFont("Helvetica", 8) y -= 15 for ev in events: if y < 80: draw_watermark(c, logo_path) add_page_number(c, page_num) c.showPage() page_num += 1 draw_header(c, "MayOne Security Framework (cont.)", "IP Traffic Log") y = letter[1] - 110 c.setFont("Helvetica-Bold", 8) x = 50 for i, h in enumerate(headers): c.drawString(x, y, h) x += col_widths[i] c.setFont("Helvetica", 8) y -= 15 ts = ev[1][:19] if len(ev[1]) > 19 else ev[1] src = ev[2][:15] dst = ev[3][:15] proto = ev[4][:4] port = str(ev[5]) if ev[5] else "" size = str(ev[6]) threat = ev[7] or "Normal" risk = str(ev[8]) if ev[8] else "0" x = 50 c.drawString(x, y, ts); x += col_widths[0] c.drawString(x, y, src); x += col_widths[1] c.drawString(x, y, dst); x += col_widths[2] c.drawString(x, y, proto); x += col_widths[3] c.drawString(x, y, port); x += col_widths[4] c.drawString(x, y, size); x += col_widths[5] c.drawString(x, y, threat); x += col_widths[6] c.drawString(x, y, risk) y -= 12 add_page_number(c, page_num) c.save() buffer.seek(0) return buffer # ---------- HTML Template (with GeoIP warning and conditional toggle) ---------- HTML_TEMPLATE = ''' MayOne Security Framework

🛡️ MayOne Security Framework

AI‑powered Intrusion Detection & Response

📊 Live Statistics

0
Total Events
0
Unique Sources
0
Blocked IPs

📡 Traffic Analysis

Protocol Distribution

Top 5 Ports

🚨 Recent Threats

Time Source IP Country Protocol Dest Port Threat Type Risk Action
Loading...

🚫 Blocked IPs & Manual Control

IP AddressBlocked SinceReasonAction
Loading...
''' # ---------- Flask Routes ---------- @app.route('/') def dashboard(): return render_template_string(HTML_TEMPLATE, geoip_available=geoip_available) @app.route('/api/stats') def api_stats(): total_events = db.get_total_event_count() conn, cursor = db._get_connection() cursor.execute('SELECT DISTINCT src_ip FROM events') unique_src = len(cursor.fetchall()) blocked = db.get_blocked_ips() return jsonify({ 'total_events': total_events, 'unique_src': unique_src, 'blocked_count': len(blocked) }) @app.route('/api/traffic_stats') def api_traffic_stats(): events = db.get_recent_events(limit=2000) protocol_counter = Counter() port_counter = Counter() for ev in events: proto = ev[4] port = ev[5] protocol_counter[proto] += 1 if port: port_counter[port] += 1 top_ports = dict(port_counter.most_common(5)) return jsonify({ 'protocols': dict(protocol_counter), 'top_ports': top_ports }) @app.route('/api/threats') def api_threats(): events = db.get_recent_events(500) threats = [] for e in events: if e[7] or e[8] > 0: src_ip = e[2] country = None if geoip and geoip.reader: country = geoip.get_country_code(src_ip) threats.append({ 'time': e[1], 'src_ip': src_ip, 'country': country, 'protocol': e[4], 'port': e[5] if e[5] else None, 'threat_type': e[7] or 'MONITORED', 'risk': e[8], 'action': e[9] }) return jsonify(threats[:100]) @app.route('/api/blocked_ips') def api_blocked_ips(): blocked = db.get_blocked_ips() return jsonify([{'ip': b[0], 'time': b[1], 'reason': b[2]} for b in blocked]) @app.route('/api/block', methods=['POST']) def api_block(): data = request.get_json() ip = data.get('ip', '').strip() reason = data.get('reason', 'Manual block from dashboard') if not ip: return jsonify({'success': False, 'error': 'IP required'}), 400 success = block_ip_windows(ip, reason) if success: db.insert_blocked_ip(ip, reason) return jsonify({'success': True}) else: return jsonify({'success': False, 'error': 'Firewall rule failed'}), 500 @app.route('/api/unblock', methods=['POST']) def api_unblock(): data = request.get_json() ip = data.get('ip', '').strip() if not ip: return jsonify({'success': False, 'error': 'IP required'}), 400 success = unblock_ip_windows(ip) if success: conn, cursor = db._get_connection() cursor.execute('DELETE FROM blocked_ips WHERE ip = ?', (ip,)) conn.commit() return jsonify({'success': True}) else: return jsonify({'success': False, 'error': 'Unblock failed'}), 500 @app.route('/api/download_blocked_ips_pdf') def download_blocked_ips_pdf(): pdf_buffer = generate_blocked_ips_pdf() return send_file(pdf_buffer, as_attachment=True, download_name='blocked_ips_log.pdf', mimetype='application/pdf') @app.route('/api/download_all_ips_pdf') def download_all_ips_pdf(): pdf_buffer = generate_all_ips_pdf() return send_file(pdf_buffer, as_attachment=True, download_name='all_ip_traffic_log.pdf', mimetype='application/pdf') @app.route('/api/download_pcap') def download_pcap(): if not pcap_sniffer: return jsonify({'error': 'PCAP capture not available'}), 500 packets = pcap_sniffer.get_pcap_buffer() if not packets: return jsonify({'error': 'No packets captured yet'}), 404 try: with tempfile.NamedTemporaryFile(delete=False, suffix='.pcap') as tmp: wrpcap(tmp.name, packets) tmp_path = tmp.name with open(tmp_path, 'rb') as f: pcap_data = f.read() os.unlink(tmp_path) return send_file(io.BytesIO(pcap_data), as_attachment=True, download_name='capture.pcap', mimetype='application/vnd.tcpdump.pcap') except Exception as e: app.logger.error(f"PCAP export failed: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/geoip_status') def geoip_status(): import main return jsonify({'enabled': getattr(main, 'geoip_enabled', False)}) @app.route('/api/geoip_toggle', methods=['POST']) def geoip_toggle(): import main data = request.get_json() main.geoip_enabled = data.get('enabled', False) return jsonify({'success': True}) @app.route('/static/') def serve_static(filename): from flask import send_from_directory static_dir = os.path.join(os.path.dirname(__file__), 'static') return send_from_directory(static_dir, filename) # ---------- Startup ---------- def run_dashboard(host='127.0.0.1', port=5000, sniffer=None): global db, pcap_sniffer, geoip, geoip_available db = Database() pcap_sniffer = sniffer geoip = GeoIPBlocker() geoip_available = geoip.reader is not None if not geoip_available: print("[Dashboard] GeoIP database not found. Country lookup and blocking disabled.") static_folder = os.path.join(os.path.dirname(__file__), 'static') os.makedirs(static_folder, exist_ok=True) logo_path = os.path.join(static_folder, 'logo.png') if not os.path.exists(logo_path): print("[Dashboard] No logo.png found in dashboard/static/. Please add your logo for watermark.") app.run(host=host, port=port, debug=False, use_reloader=False)