| import json | |
| import os | |
| from datetime import datetime | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.pdfgen import canvas | |
| from reportlab.lib.utils import simpleSplit | |
| class ReportGenerator: | |
| def __init__(self, db): | |
| self.db = db | |
| self.report_dir = "reports" | |
| os.makedirs(self.report_dir, exist_ok=True) | |
| def _get_cursor(self): | |
| _, cursor = self.db._get_connection() | |
| return cursor | |
| def generate_report(self, reason="scheduled"): | |
| ts = datetime.now() | |
| base_name = f"security_report_{ts.strftime('%Y%m%d_%H%M%S')}" | |
| data = self._collect_data(ts) | |
| json_path = os.path.join(self.report_dir, base_name + ".json") | |
| with open(json_path, 'w') as f: | |
| json.dump(data, f, indent=2) | |
| txt_path = os.path.join(self.report_dir, base_name + ".txt") | |
| with open(txt_path, 'w') as f: | |
| f.write(self._format_text(data)) | |
| pdf_path = os.path.join(self.report_dir, base_name + ".pdf") | |
| self._generate_pdf(data, pdf_path) | |
| cursor = self._get_cursor() | |
| cursor.execute('INSERT INTO reports (timestamp, report_path, type) VALUES (?, ?, ?)', | |
| (ts.isoformat(), json_path, reason)) | |
| self.db._get_connection()[0].commit() | |
| return json_path, txt_path, pdf_path | |
| def _collect_data(self, ts): | |
| events = self.db.get_recent_events(200) | |
| threats = self.db.get_threat_summary(24) | |
| blocked = self.db.get_blocked_ips() | |
| total_packets = len(events) | |
| unique_src = set(e[2] for e in events) | |
| return { | |
| "report_time": ts.isoformat(), | |
| "summary": { | |
| "total_events": total_packets, | |
| "unique_sources": len(unique_src), | |
| "threats_detected": len([e for e in events if e[7] is not None]), | |
| "blocked_ips": len(blocked) | |
| }, | |
| "threat_breakdown": [{"type": t[0], "count": t[1], "avg_risk": t[2]} for t in threats], | |
| "blocked_ips_list": [{"ip": b[0], "time": b[1], "reason": b[2]} for b in blocked], | |
| "top_suspicious": self._get_top_ips(events), | |
| "recommendations": self._generate_recs(threats, len(blocked)) | |
| } | |
| def _get_top_ips(self, events, n=5): | |
| freq = {} | |
| for e in events: | |
| ip = e[2] | |
| freq[ip] = freq.get(ip, 0) + 1 | |
| sorted_ips = sorted(freq.items(), key=lambda x: x[1], reverse=True)[:n] | |
| return [{"ip": ip, "packets": cnt} for ip, cnt in sorted_ips] | |
| def _generate_recs(self, threats, blocked_count): | |
| recs = [] | |
| if any(t[0] == "PORT_SCAN" for t in threats): | |
| recs.append("Enable port knocking or move SSH/RDP to non-standard ports.") | |
| if any(t[0] == "BRUTE_FORCE" for t in threats): | |
| recs.append("Enforce strong passwords and consider account lockout policies.") | |
| if blocked_count > 10: | |
| recs.append("Review blocked IP list; consider using an IP reputation feed.") | |
| if not recs: | |
| recs.append("No immediate action required. Continue monitoring.") | |
| return recs | |
| def _format_text(self, data): | |
| lines = [] | |
| lines.append("="*60) | |
| lines.append(f"MayOne Security Report - {data['report_time']}") | |
| lines.append("="*60) | |
| lines.append("\nSUMMARY") | |
| for k,v in data['summary'].items(): | |
| lines.append(f" {k}: {v}") | |
| lines.append("\nTHREAT BREAKDOWN") | |
| for t in data['threat_breakdown']: | |
| lines.append(f" {t['type']}: {t['count']} events, avg risk {t['avg_risk']:.1f}") | |
| lines.append("\nBLOCKED IPs") | |
| for b in data['blocked_ips_list'][:10]: | |
| lines.append(f" {b['ip']} - {b['reason']} (since {b['time']})") | |
| lines.append("\nRECOMMENDATIONS") | |
| for r in data['recommendations']: | |
| lines.append(f" - {r}") | |
| return "\n".join(lines) | |
| def _generate_pdf(self, data, path): | |
| c = canvas.Canvas(path, pagesize=letter) | |
| width, height = letter | |
| y = height - 50 | |
| c.drawString(50, y, f"MayOne Security Report - {data['report_time']}") | |
| y -= 30 | |
| c.drawString(50, y, "Summary") | |
| y -= 20 | |
| for k,v in data['summary'].items(): | |
| c.drawString(70, y, f"{k}: {v}") | |
| y -= 15 | |
| y -= 10 | |
| c.drawString(50, y, "Top Recommendations") | |
| y -= 20 | |
| for rec in data['recommendations'][:3]: | |
| lines = simpleSplit(rec, "Helvetica", 12, width-100) | |
| for line in lines: | |
| c.drawString(70, y, f"- {line}") | |
| y -= 15 | |
| c.save() |