Spaces:
Sleeping
Sleeping
| # utils/ai_engine.py | |
| # ----------------------------------------- | |
| # Lightweight "AI" engine using rules + templates | |
| # No heavy ML model – safe for your laptop 🙂 | |
| from collections import Counter | |
| from datetime import datetime | |
| def _normalize_label(label: str) -> str: | |
| return str(label or "Unknown").strip().upper() | |
| # 1️⃣ Explain a single threat/event | |
| def explain_threat(event: dict) -> str: | |
| """ | |
| Takes a single event dict (from logger / recent()) | |
| and returns a human-readable explanation. | |
| """ | |
| label = _normalize_label(event.get("prediction")) | |
| risk_level = str(event.get("risk_level", "Low")).title() | |
| src_ip = event.get("src_ip") or event.get("src") or "Unknown source" | |
| dst_ip = event.get("dst_ip") or event.get("dst") or "Unknown destination" | |
| proto = event.get("proto", "Unknown") | |
| sport = event.get("sport") or event.get("src_port") or "?" | |
| dport = event.get("dport") or event.get("dst_port") or "?" | |
| # Simple knowledge base | |
| explanations = { | |
| "VPN": ( | |
| "Traffic from {src} to {dst} over {proto} looks like VPN usage. " | |
| "VPN tunnels encrypt traffic and can hide the real origin of an attacker. " | |
| "Review if this VPN endpoint is expected for this host." | |
| ), | |
| "TOR": ( | |
| "Traffic appears to be routed through the Tor anonymity network. " | |
| "Tor is commonly used to hide attacker identity. " | |
| "Investigate the host at {src} and check if Tor usage is allowed." | |
| ), | |
| "I2P": ( | |
| "Detected I2P (Invisible Internet Project) style traffic. " | |
| "I2P is an anonymity network similar to Tor and can be abused for C2 channels." | |
| ), | |
| "FREENET": ( | |
| "Traffic resembles Freenet P2P anonymity network. " | |
| "Such networks can be used to exchange illegal or malicious content." | |
| ), | |
| "ZERONET": ( | |
| "ZeroNet-like traffic detected. ZeroNet hosts sites over a P2P network. " | |
| "This may bypass normal web filtering and logging." | |
| ), | |
| # CICIDS-style examples – extend as you like | |
| "DOS HULK": ( | |
| "High-rate HTTP traffic typical of DoS-Hulk attack was detected. " | |
| "This can exhaust web server resources and cause service disruption." | |
| ), | |
| "DOS SLOWLORIS": ( | |
| "Slowloris-style DoS traffic detected. It keeps many HTTP connections open " | |
| "to slowly exhaust server connection limits." | |
| ), | |
| "BOT": ( | |
| "Behavior suggests the host may be part of a botnet. " | |
| "Correlate with outbound connections and run malware scans on {src}." | |
| ), | |
| "BENIGN": ( | |
| "This flow is classified as BENIGN. No immediate malicious pattern detected, " | |
| "but you should still monitor for anomalies over time." | |
| ), | |
| } | |
| # Pick best match (exact or substring) | |
| text = None | |
| if label in explanations: | |
| text = explanations[label] | |
| else: | |
| for k, v in explanations.items(): | |
| if k in label: | |
| text = v | |
| break | |
| if text is None: | |
| text = ( | |
| "The traffic is classified as '{label}' with a risk level of {risk}. " | |
| "Review source {src} → destination {dst}, protocol {proto}, " | |
| "and ports {sport} → {dport} for suspicious patterns." | |
| ) | |
| return text.format( | |
| label=label, | |
| risk=risk_level, | |
| src=src_ip, | |
| dst=dst_ip, | |
| proto=proto, | |
| sport=sport, | |
| dport=dport, | |
| ) | |
| # 2️⃣ Summarize multiple events (for report) | |
| def summarize_events(events, model: str = "bcc") -> str: | |
| """ | |
| Takes a list of events and returns a high-level English summary. | |
| """ | |
| if not events: | |
| return "No recent events available for summary." | |
| labels = [_normalize_label(e.get("prediction")) for e in events] | |
| counts = Counter(labels) | |
| total = len(events) | |
| high_risk_keywords = [ | |
| "DDOS", "DOS", "BRUTE", "SQL", "BOT", "INFILTRATION", "HULK", | |
| "SLOWLORIS", "SLOWHTTPTEST" | |
| ] | |
| high_risk = sum( | |
| c for lbl, c in counts.items() | |
| if any(k in lbl for k in high_risk_keywords) | |
| ) | |
| tor_like = sum( | |
| counts.get(lbl, 0) for lbl in ["TOR", "I2P", "ZERONET", "FREENET", "VPN"] | |
| ) | |
| ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| # Build readable summary | |
| parts = [ | |
| f"AI Summary generated at {ts} for model '{model.upper()}'.", | |
| f"Total analysed events: {total}.", | |
| ] | |
| if high_risk: | |
| parts.append( | |
| f"High-risk attacks detected: {high_risk} events " | |
| f"({', '.join(k for k in counts.keys() if any(x in k for x in high_risk_keywords))})." | |
| ) | |
| else: | |
| parts.append("No high-risk attack pattern strongly detected in this window.") | |
| if tor_like: | |
| parts.append( | |
| f"Anonymity or tunneling traffic (VPN/TOR/I2P/etc.) observed in {tor_like} events. " | |
| "Verify if this usage is expected and authorized." | |
| ) | |
| # top 3 labels | |
| top3 = counts.most_common(3) | |
| label_str = ", ".join(f"{lbl}: {cnt}" for lbl, cnt in top3) | |
| parts.append(f"Top traffic classes: {label_str}.") | |
| if model == "bcc": | |
| parts.append( | |
| "BCC model focuses on live packet patterns; consider correlating with host logs " | |
| "for deeper forensic analysis." | |
| ) | |
| else: | |
| parts.append( | |
| "CICIDS model analyses flow-level statistics; consider exporting flows for " | |
| "offline investigation if anomalies increase." | |
| ) | |
| return " ".join(parts) | |