CyberSecure / routers /reports.py
cloud450's picture
Upload 20 files
6f3b14e verified
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import Dict, List
import io
import base64
from reportlab.lib.pagesizes import letter, A4
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image, PageBreak, Table, TableStyle
from reportlab.lib import colors
from reportlab.lib.enums import TA_CENTER, TA_LEFT
import matplotlib
matplotlib.use('Agg') # Non-interactive backend
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime
import os
from groq import Groq
router = APIRouter()
class ReportRequest(BaseModel):
attack_summary: Dict[str, int]
classification_report: Dict
threat_statistics: Dict
attack_counts: Dict[str, int]
protocol_counts: Dict[str, int]
def create_pie_chart(data: Dict[str, int], title: str) -> str:
"""Create a pie chart and return as base64 image"""
fig, ax = plt.subplots(figsize=(6, 4))
labels = list(data.keys())
sizes = list(data.values())
colors_list = ['#00C851', '#ff4444', '#ff8800', '#33b5e5', '#aa66cc', '#2BBBAD']
ax.pie(sizes, labels=labels, autopct='%1.1f%%', colors=colors_list[:len(labels)], startangle=90)
ax.set_title(title, fontsize=14, fontweight='bold')
# Save to bytes
buf = io.BytesIO()
plt.savefig(buf, format='png', dpi=150, bbox_inches='tight')
buf.seek(0)
plt.close()
return buf
def create_bar_chart(data: Dict[str, int], title: str, color='#33b5e5') -> str:
"""Create a bar chart and return as base64 image"""
fig, ax = plt.subplots(figsize=(8, 4))
labels = list(data.keys())
values = list(data.values())
ax.barh(labels, values, color=color)
ax.set_xlabel('Count', fontsize=10)
ax.set_title(title, fontsize=14, fontweight='bold')
ax.grid(axis='x', alpha=0.3)
# Save to bytes
buf = io.BytesIO()
plt.savefig(buf, format='png', dpi=150, bbox_inches='tight')
buf.seek(0)
plt.close()
return buf
@router.post("/generate-pdf")
async def generate_pdf_report(data: ReportRequest):
try:
# Create PDF buffer
buffer = io.BytesIO()
doc = SimpleDocTemplate(buffer, pagesize=letter, topMargin=0.5*inch, bottomMargin=0.5*inch)
# Styles
styles = getSampleStyleSheet()
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontSize=24,
textColor=colors.HexColor('#1a1a2e'),
spaceAfter=30,
alignment=TA_CENTER,
fontName='Helvetica-Bold'
)
heading_style = ParagraphStyle(
'CustomHeading',
parent=styles['Heading2'],
fontSize=16,
textColor=colors.HexColor('#2d4059'),
spaceAfter=12,
spaceBefore=20,
fontName='Helvetica-Bold'
)
body_style = ParagraphStyle(
'CustomBody',
parent=styles['BodyText'],
fontSize=11,
leading=16,
spaceAfter=12,
alignment=TA_LEFT
)
# Build content
content = []
# Title
content.append(Paragraph("Network Intrusion Detection System", title_style))
content.append(Paragraph("Comprehensive Threat Analysis Report", styles['Heading2']))
content.append(Spacer(1, 0.2*inch))
# Metadata
meta_data = [
['Report Generated:', datetime.now().strftime('%Y-%m-%d %H:%M:%S')],
['Total Flows Analyzed:', str(data.threat_statistics.get('total', 0))],
['Malicious Flows:', str(data.threat_statistics.get('malicious', 0))],
['Benign Flows:', str(data.threat_statistics.get('benign', 0))],
]
meta_table = Table(meta_data, colWidths=[2*inch, 3*inch])
meta_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (0, -1), colors.HexColor('#e8eaf6')),
('TEXTCOLOR', (0, 0), (-1, -1), colors.black),
('ALIGN', (0, 0), (-1, -1), 'LEFT'),
('FONTNAME', (0, 0), (0, -1), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, -1), 10),
('BOTTOMPADDING', (0, 0), (-1, -1), 8),
('GRID', (0, 0), (-1, -1), 0.5, colors.grey)
]))
content.append(meta_table)
content.append(Spacer(1, 0.3*inch))
# Executive Summary (AI Generated)
content.append(Paragraph("Executive Summary", heading_style))
# Generate AI summary using Groq
api_key = os.getenv("GROQ_API_KEY")
client = Groq(api_key=api_key)
summary_prompt = f"""Generate a professional executive summary for a network security report with these statistics:
Total Flows: {data.threat_statistics.get('total', 0)}
Malicious: {data.threat_statistics.get('malicious', 0)}
Attack Distribution: {data.attack_summary}
Provide a 3-4 sentence executive summary in plain text (no markdown) covering:
1. Overall security posture
2. Key threats detected
3. Critical recommendations
Keep it professional and concise."""
try:
response = client.chat.completions.create(
messages=[{"role": "user", "content": summary_prompt}],
model="llama-3.1-8b-instant",
temperature=0.7,
max_tokens=200
)
summary_text = response.choices[0].message.content
except:
summary_text = f"Analysis of {data.threat_statistics.get('total', 0)} network flows revealed {data.threat_statistics.get('malicious', 0)} malicious activities. The system successfully identified multiple attack vectors requiring immediate attention."
content.append(Paragraph(summary_text, body_style))
content.append(Spacer(1, 0.2*inch))
# Attack Distribution Chart
content.append(Paragraph("Attack Distribution Analysis", heading_style))
pie_buf = create_pie_chart(data.attack_counts, "Attack Type Distribution")
img = Image(pie_buf, width=5*inch, height=3.3*inch)
content.append(img)
content.append(Spacer(1, 0.2*inch))
# Attack Statistics Table
attack_data = [['Attack Type', 'Count', 'Percentage']]
total = sum(data.attack_counts.values())
for attack, count in sorted(data.attack_counts.items(), key=lambda x: x[1], reverse=True):
percentage = f"{(count/total*100):.1f}%" if total > 0 else "0%"
attack_data.append([attack, str(count), percentage])
attack_table = Table(attack_data, colWidths=[2.5*inch, 1.5*inch, 1.5*inch])
attack_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#3f51b5')),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 12),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
content.append(attack_table)
content.append(PageBreak())
# Protocol Distribution
content.append(Paragraph("Protocol Distribution Analysis", heading_style))
bar_buf = create_bar_chart(data.protocol_counts, "Top Protocols by Flow Count", '#33b5e5')
img2 = Image(bar_buf, width=6*inch, height=3*inch)
content.append(img2)
content.append(Spacer(1, 0.3*inch))
# Recommendations
content.append(Paragraph("Security Recommendations", heading_style))
recommendations = [
"Implement rate limiting and DDoS protection for high-volume attack vectors",
"Enable multi-factor authentication to prevent brute force attacks",
"Deploy Web Application Firewall (WAF) rules for detected web attack patterns",
"Conduct regular security audits and penetration testing",
"Update intrusion detection signatures based on identified attack patterns"
]
for i, rec in enumerate(recommendations, 1):
content.append(Paragraph(f"{i}. {rec}", body_style))
content.append(Spacer(1, 0.3*inch))
# Footer
content.append(Spacer(1, 0.5*inch))
footer_style = ParagraphStyle('Footer', parent=styles['Normal'], fontSize=9, textColor=colors.grey, alignment=TA_CENTER)
content.append(Paragraph("This report was generated by SkyFort IDS - Network Intrusion Detection System", footer_style))
content.append(Paragraph(f"Report ID: RPT-{datetime.now().strftime('%Y%m%d-%H%M%S')}", footer_style))
# Build PDF
doc.build(content)
# Get PDF bytes
pdf_bytes = buffer.getvalue()
buffer.close()
# Return as base64
pdf_base64 = base64.b64encode(pdf_bytes).decode('utf-8')
return {
"pdf": pdf_base64,
"filename": f"threat_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf"
}
except Exception as e:
import traceback
print(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@router.post("/generate")
async def generate_text_report(data: ReportRequest):
"""Generate a text-based threat report"""
try:
api_key = os.getenv("GROQ_API_KEY")
client = Groq(api_key=api_key)
# Prepare statistics
total = data.threat_statistics.get('total', 0)
malicious = data.threat_statistics.get('malicious', 0)
benign = data.threat_statistics.get('benign', 0)
# Create attack summary string
attack_summary = ", ".join([f"{k}: {v}" for k, v in data.attack_counts.items()])
prompt = f"""Generate a professional cybersecurity threat analysis report based on these statistics:
Total Network Flows Analyzed: {total}
Malicious Flows Detected: {malicious}
Benign Flows: {benign}
Attack Distribution: {attack_summary}
Create a comprehensive report with the following sections:
1. Executive Summary (2-3 sentences)
2. Threat Overview (detailed analysis of detected attacks)
3. Attack Pattern Analysis (breakdown by type)
4. Risk Assessment
5. Recommended Actions (5-7 specific mitigation strategies)
6. Conclusion
Format the report professionally with clear section headers. Keep it factual and actionable."""
response = client.chat.completions.create(
messages=[{"role": "user", "content": prompt}],
model="llama-3.1-8b-instant",
temperature=0.7,
max_tokens=1500
)
report_text = response.choices[0].message.content
# Add header and footer
full_report = f"""
╔══════════════════════════════════════════════════════════════════╗
β•‘ NETWORK INTRUSION DETECTION SYSTEM (IDS) β•‘
β•‘ THREAT ANALYSIS REPORT β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Report ID: RPT-{datetime.now().strftime('%Y%m%d-%H%M%S')}
{report_text}
═══════════════════════════════════════════════════════════════════
This report was automatically generated by SkyFort IDS
For questions or concerns, contact your security team
═══════════════════════════════════════════════════════════════════
"""
return {"report": full_report}
except Exception as e:
import traceback
print(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))