Spaces:
Sleeping
Sleeping
File size: 5,668 Bytes
0f8fe33 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# utils/ai_engine.py
# -----------------------------------------
# Lightweight "AI" engine using rules + templates
# No heavy ML model – safe for your laptop 🙂
from collections import Counter
from datetime import datetime
def _normalize_label(label: str) -> str:
return str(label or "Unknown").strip().upper()
# 1️⃣ Explain a single threat/event
def explain_threat(event: dict) -> str:
"""
Takes a single event dict (from logger / recent())
and returns a human-readable explanation.
"""
label = _normalize_label(event.get("prediction"))
risk_level = str(event.get("risk_level", "Low")).title()
src_ip = event.get("src_ip") or event.get("src") or "Unknown source"
dst_ip = event.get("dst_ip") or event.get("dst") or "Unknown destination"
proto = event.get("proto", "Unknown")
sport = event.get("sport") or event.get("src_port") or "?"
dport = event.get("dport") or event.get("dst_port") or "?"
# Simple knowledge base
explanations = {
"VPN": (
"Traffic from {src} to {dst} over {proto} looks like VPN usage. "
"VPN tunnels encrypt traffic and can hide the real origin of an attacker. "
"Review if this VPN endpoint is expected for this host."
),
"TOR": (
"Traffic appears to be routed through the Tor anonymity network. "
"Tor is commonly used to hide attacker identity. "
"Investigate the host at {src} and check if Tor usage is allowed."
),
"I2P": (
"Detected I2P (Invisible Internet Project) style traffic. "
"I2P is an anonymity network similar to Tor and can be abused for C2 channels."
),
"FREENET": (
"Traffic resembles Freenet P2P anonymity network. "
"Such networks can be used to exchange illegal or malicious content."
),
"ZERONET": (
"ZeroNet-like traffic detected. ZeroNet hosts sites over a P2P network. "
"This may bypass normal web filtering and logging."
),
# CICIDS-style examples – extend as you like
"DOS HULK": (
"High-rate HTTP traffic typical of DoS-Hulk attack was detected. "
"This can exhaust web server resources and cause service disruption."
),
"DOS SLOWLORIS": (
"Slowloris-style DoS traffic detected. It keeps many HTTP connections open "
"to slowly exhaust server connection limits."
),
"BOT": (
"Behavior suggests the host may be part of a botnet. "
"Correlate with outbound connections and run malware scans on {src}."
),
"BENIGN": (
"This flow is classified as BENIGN. No immediate malicious pattern detected, "
"but you should still monitor for anomalies over time."
),
}
# Pick best match (exact or substring)
text = None
if label in explanations:
text = explanations[label]
else:
for k, v in explanations.items():
if k in label:
text = v
break
if text is None:
text = (
"The traffic is classified as '{label}' with a risk level of {risk}. "
"Review source {src} → destination {dst}, protocol {proto}, "
"and ports {sport} → {dport} for suspicious patterns."
)
return text.format(
label=label,
risk=risk_level,
src=src_ip,
dst=dst_ip,
proto=proto,
sport=sport,
dport=dport,
)
# 2️⃣ Summarize multiple events (for report)
def summarize_events(events, model: str = "bcc") -> str:
"""
Takes a list of events and returns a high-level English summary.
"""
if not events:
return "No recent events available for summary."
labels = [_normalize_label(e.get("prediction")) for e in events]
counts = Counter(labels)
total = len(events)
high_risk_keywords = [
"DDOS", "DOS", "BRUTE", "SQL", "BOT", "INFILTRATION", "HULK",
"SLOWLORIS", "SLOWHTTPTEST"
]
high_risk = sum(
c for lbl, c in counts.items()
if any(k in lbl for k in high_risk_keywords)
)
tor_like = sum(
counts.get(lbl, 0) for lbl in ["TOR", "I2P", "ZERONET", "FREENET", "VPN"]
)
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Build readable summary
parts = [
f"AI Summary generated at {ts} for model '{model.upper()}'.",
f"Total analysed events: {total}.",
]
if high_risk:
parts.append(
f"High-risk attacks detected: {high_risk} events "
f"({', '.join(k for k in counts.keys() if any(x in k for x in high_risk_keywords))})."
)
else:
parts.append("No high-risk attack pattern strongly detected in this window.")
if tor_like:
parts.append(
f"Anonymity or tunneling traffic (VPN/TOR/I2P/etc.) observed in {tor_like} events. "
"Verify if this usage is expected and authorized."
)
# top 3 labels
top3 = counts.most_common(3)
label_str = ", ".join(f"{lbl}: {cnt}" for lbl, cnt in top3)
parts.append(f"Top traffic classes: {label_str}.")
if model == "bcc":
parts.append(
"BCC model focuses on live packet patterns; consider correlating with host logs "
"for deeper forensic analysis."
)
else:
parts.append(
"CICIDS model analyses flow-level statistics; consider exporting flows for "
"offline investigation if anomalies increase."
)
return " ".join(parts)
|