from flask import Flask, render_template_string, jsonify, request, send_file
import sys
import os
import io
from datetime import datetime
from collections import Counter
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from reportlab.lib.utils import ImageReader
from reportlab.lib.units import inch
from scapy.utils import wrpcap
import tempfile
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from database.db import Database
from response.block_ip import block_ip_windows, unblock_ip_windows
from geoip.blocker import GeoIPBlocker
app = Flask(__name__)
db = None
pcap_sniffer = None
geoip = None
geoip_available = False
# ---------- PDF Helpers ----------
def add_page_number(c, page_num):
c.saveState()
c.setFont("Helvetica", 8)
c.drawString(letter[0] - 80, 30, f"Page {page_num}")
c.restoreState()
def draw_header(c, title, subtitle=""):
c.setFont("Helvetica-Bold", 16)
c.drawString(50, letter[1] - 50, title)
c.setFont("Helvetica", 10)
c.drawString(50, letter[1] - 70, subtitle)
c.line(50, letter[1] - 80, letter[0] - 50, letter[1] - 80)
def draw_watermark(c, logo_path):
if os.path.exists(logo_path):
try:
img = ImageReader(logo_path)
c.saveState()
c.setFillAlpha(0.2)
c.drawImage(img, letter[0]/2 - 1.5*inch, letter[1]/2 - 1.5*inch,
width=3*inch, height=3*inch, mask='auto', preserveAspectRatio=True)
c.restoreState()
except:
pass
def generate_blocked_ips_pdf():
blocked = db.get_blocked_ips()
buffer = io.BytesIO()
c = canvas.Canvas(buffer, pagesize=letter)
logo_path = os.path.join(os.path.dirname(__file__), 'static', 'logo.png')
page_num = 1
draw_watermark(c, logo_path)
draw_header(c, "MayOne Security Framework", "Blocked IP Addresses Report")
y = letter[1] - 110
c.drawString(50, y, f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
y -= 20
c.drawString(50, y, f"Total Blocked IPs: {len(blocked)}")
y -= 40
c.setFont("Helvetica-Bold", 10)
c.drawString(50, y, "IP Address")
c.drawString(160, y, "Blocked Since")
c.drawString(300, y, "Reason")
c.setFont("Helvetica", 10)
y -= 20
for ip, block_time, reason in blocked:
if y < 80:
draw_watermark(c, logo_path)
add_page_number(c, page_num)
c.showPage()
page_num += 1
draw_header(c, "MayOne Security Framework (cont.)", "Blocked IPs")
y = letter[1] - 110
c.setFont("Helvetica-Bold", 10)
c.drawString(50, y, "IP Address")
c.drawString(160, y, "Blocked Since")
c.drawString(300, y, "Reason")
c.setFont("Helvetica", 10)
y -= 20
reason_short = reason[:60] + "..." if len(reason) > 60 else reason
c.drawString(50, y, ip)
c.drawString(160, y, block_time[:19])
c.drawString(300, y, reason_short)
y -= 20
add_page_number(c, page_num)
c.save()
buffer.seek(0)
return buffer
def generate_all_ips_pdf():
events = db.get_recent_events(limit=1000)
buffer = io.BytesIO()
c = canvas.Canvas(buffer, pagesize=letter)
logo_path = os.path.join(os.path.dirname(__file__), 'static', 'logo.png')
page_num = 1
draw_watermark(c, logo_path)
draw_header(c, "MayOne Security Framework", "Complete IP Traffic Log")
y = letter[1] - 110
c.drawString(50, y, f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
y -= 20
c.drawString(50, y, f"Total Events: {len(events)}")
y -= 40
headers = ["Timestamp", "Src IP", "Dst IP", "Proto", "Port", "Size", "Threat", "Risk"]
col_widths = [100, 80, 80, 40, 40, 50, 70, 40]
c.setFont("Helvetica-Bold", 8)
x = 50
for i, h in enumerate(headers):
c.drawString(x, y, h)
x += col_widths[i]
c.setFont("Helvetica", 8)
y -= 15
for ev in events:
if y < 80:
draw_watermark(c, logo_path)
add_page_number(c, page_num)
c.showPage()
page_num += 1
draw_header(c, "MayOne Security Framework (cont.)", "IP Traffic Log")
y = letter[1] - 110
c.setFont("Helvetica-Bold", 8)
x = 50
for i, h in enumerate(headers):
c.drawString(x, y, h)
x += col_widths[i]
c.setFont("Helvetica", 8)
y -= 15
ts = ev[1][:19] if len(ev[1]) > 19 else ev[1]
src = ev[2][:15]
dst = ev[3][:15]
proto = ev[4][:4]
port = str(ev[5]) if ev[5] else ""
size = str(ev[6])
threat = ev[7] or "Normal"
risk = str(ev[8]) if ev[8] else "0"
x = 50
c.drawString(x, y, ts); x += col_widths[0]
c.drawString(x, y, src); x += col_widths[1]
c.drawString(x, y, dst); x += col_widths[2]
c.drawString(x, y, proto); x += col_widths[3]
c.drawString(x, y, port); x += col_widths[4]
c.drawString(x, y, size); x += col_widths[5]
c.drawString(x, y, threat); x += col_widths[6]
c.drawString(x, y, risk)
y -= 12
add_page_number(c, page_num)
c.save()
buffer.seek(0)
return buffer
# ---------- HTML Template (with GeoIP warning and conditional toggle) ----------
HTML_TEMPLATE = '''
MayOne Security Framework
⚠️ GeoIP database not found. Download GeoLite2-Country.mmdb and place it in the 'geoip/' folder to enable country lookup and automatic blocking.
📡 Traffic Analysis
Protocol Distribution
Top 5 Ports
🚨 Recent Threats
| Time |
Source IP |
Country |
Protocol |
Dest Port |
Threat Type |
Risk |
Action |
| Loading... |
| IP Address | Blocked Since | Reason | Action |
| Loading... |
'''
# ---------- Flask Routes ----------
@app.route('/')
def dashboard():
return render_template_string(HTML_TEMPLATE, geoip_available=geoip_available)
@app.route('/api/stats')
def api_stats():
total_events = db.get_total_event_count()
conn, cursor = db._get_connection()
cursor.execute('SELECT DISTINCT src_ip FROM events')
unique_src = len(cursor.fetchall())
blocked = db.get_blocked_ips()
return jsonify({
'total_events': total_events,
'unique_src': unique_src,
'blocked_count': len(blocked)
})
@app.route('/api/traffic_stats')
def api_traffic_stats():
events = db.get_recent_events(limit=2000)
protocol_counter = Counter()
port_counter = Counter()
for ev in events:
proto = ev[4]
port = ev[5]
protocol_counter[proto] += 1
if port:
port_counter[port] += 1
top_ports = dict(port_counter.most_common(5))
return jsonify({
'protocols': dict(protocol_counter),
'top_ports': top_ports
})
@app.route('/api/threats')
def api_threats():
events = db.get_recent_events(500)
threats = []
for e in events:
if e[7] or e[8] > 0:
src_ip = e[2]
country = None
if geoip and geoip.reader:
country = geoip.get_country_code(src_ip)
threats.append({
'time': e[1],
'src_ip': src_ip,
'country': country,
'protocol': e[4],
'port': e[5] if e[5] else None,
'threat_type': e[7] or 'MONITORED',
'risk': e[8],
'action': e[9]
})
return jsonify(threats[:100])
@app.route('/api/blocked_ips')
def api_blocked_ips():
blocked = db.get_blocked_ips()
return jsonify([{'ip': b[0], 'time': b[1], 'reason': b[2]} for b in blocked])
@app.route('/api/block', methods=['POST'])
def api_block():
data = request.get_json()
ip = data.get('ip', '').strip()
reason = data.get('reason', 'Manual block from dashboard')
if not ip:
return jsonify({'success': False, 'error': 'IP required'}), 400
success = block_ip_windows(ip, reason)
if success:
db.insert_blocked_ip(ip, reason)
return jsonify({'success': True})
else:
return jsonify({'success': False, 'error': 'Firewall rule failed'}), 500
@app.route('/api/unblock', methods=['POST'])
def api_unblock():
data = request.get_json()
ip = data.get('ip', '').strip()
if not ip:
return jsonify({'success': False, 'error': 'IP required'}), 400
success = unblock_ip_windows(ip)
if success:
conn, cursor = db._get_connection()
cursor.execute('DELETE FROM blocked_ips WHERE ip = ?', (ip,))
conn.commit()
return jsonify({'success': True})
else:
return jsonify({'success': False, 'error': 'Unblock failed'}), 500
@app.route('/api/download_blocked_ips_pdf')
def download_blocked_ips_pdf():
pdf_buffer = generate_blocked_ips_pdf()
return send_file(pdf_buffer, as_attachment=True, download_name='blocked_ips_log.pdf', mimetype='application/pdf')
@app.route('/api/download_all_ips_pdf')
def download_all_ips_pdf():
pdf_buffer = generate_all_ips_pdf()
return send_file(pdf_buffer, as_attachment=True, download_name='all_ip_traffic_log.pdf', mimetype='application/pdf')
@app.route('/api/download_pcap')
def download_pcap():
if not pcap_sniffer:
return jsonify({'error': 'PCAP capture not available'}), 500
packets = pcap_sniffer.get_pcap_buffer()
if not packets:
return jsonify({'error': 'No packets captured yet'}), 404
try:
with tempfile.NamedTemporaryFile(delete=False, suffix='.pcap') as tmp:
wrpcap(tmp.name, packets)
tmp_path = tmp.name
with open(tmp_path, 'rb') as f:
pcap_data = f.read()
os.unlink(tmp_path)
return send_file(io.BytesIO(pcap_data), as_attachment=True, download_name='capture.pcap', mimetype='application/vnd.tcpdump.pcap')
except Exception as e:
app.logger.error(f"PCAP export failed: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/geoip_status')
def geoip_status():
import main
return jsonify({'enabled': getattr(main, 'geoip_enabled', False)})
@app.route('/api/geoip_toggle', methods=['POST'])
def geoip_toggle():
import main
data = request.get_json()
main.geoip_enabled = data.get('enabled', False)
return jsonify({'success': True})
@app.route('/static/')
def serve_static(filename):
from flask import send_from_directory
static_dir = os.path.join(os.path.dirname(__file__), 'static')
return send_from_directory(static_dir, filename)
# ---------- Startup ----------
def run_dashboard(host='127.0.0.1', port=5000, sniffer=None):
global db, pcap_sniffer, geoip, geoip_available
db = Database()
pcap_sniffer = sniffer
geoip = GeoIPBlocker()
geoip_available = geoip.reader is not None
if not geoip_available:
print("[Dashboard] GeoIP database not found. Country lookup and blocking disabled.")
static_folder = os.path.join(os.path.dirname(__file__), 'static')
os.makedirs(static_folder, exist_ok=True)
logo_path = os.path.join(static_folder, 'logo.png')
if not os.path.exists(logo_path):
print("[Dashboard] No logo.png found in dashboard/static/. Please add your logo for watermark.")
app.run(host=host, port=port, debug=False, use_reloader=False)