ai-nids-backend / utils /ai_engine.py
CodebaseAi's picture
deploy: updated backend structure and ignored frontend
87f8e11
# utils/ai_engine.py
# -----------------------------------------
# Lightweight "AI" engine using rules + templates
# No heavy ML model – safe for your laptop 🙂
from collections import Counter
from datetime import datetime
def _normalize_label(label: str) -> str:
return str(label or "Unknown").strip().upper()
# 1️⃣ Explain a single threat/event
def explain_threat(event: dict) -> str:
"""
Takes a single event dict (from logger / recent())
and returns a human-readable explanation.
"""
label = _normalize_label(event.get("prediction"))
risk_level = str(event.get("risk_level", "Low")).title()
src_ip = event.get("src_ip") or event.get("src") or "Unknown source"
dst_ip = event.get("dst_ip") or event.get("dst") or "Unknown destination"
proto = event.get("proto", "Unknown")
sport = event.get("sport") or event.get("src_port") or "?"
dport = event.get("dport") or event.get("dst_port") or "?"
# Simple knowledge base
explanations = {
"VPN": (
"Traffic from {src} to {dst} over {proto} looks like VPN usage. "
"VPN tunnels encrypt traffic and can hide the real origin of an attacker. "
"Review if this VPN endpoint is expected for this host."
),
"TOR": (
"Traffic appears to be routed through the Tor anonymity network. "
"Tor is commonly used to hide attacker identity. "
"Investigate the host at {src} and check if Tor usage is allowed."
),
"I2P": (
"Detected I2P (Invisible Internet Project) style traffic. "
"I2P is an anonymity network similar to Tor and can be abused for C2 channels."
),
"FREENET": (
"Traffic resembles Freenet P2P anonymity network. "
"Such networks can be used to exchange illegal or malicious content."
),
"ZERONET": (
"ZeroNet-like traffic detected. ZeroNet hosts sites over a P2P network. "
"This may bypass normal web filtering and logging."
),
# CICIDS-style examples – extend as you like
"DOS HULK": (
"High-rate HTTP traffic typical of DoS-Hulk attack was detected. "
"This can exhaust web server resources and cause service disruption."
),
"DOS SLOWLORIS": (
"Slowloris-style DoS traffic detected. It keeps many HTTP connections open "
"to slowly exhaust server connection limits."
),
"BOT": (
"Behavior suggests the host may be part of a botnet. "
"Correlate with outbound connections and run malware scans on {src}."
),
"BENIGN": (
"This flow is classified as BENIGN. No immediate malicious pattern detected, "
"but you should still monitor for anomalies over time."
),
}
# Pick best match (exact or substring)
text = None
if label in explanations:
text = explanations[label]
else:
for k, v in explanations.items():
if k in label:
text = v
break
if text is None:
text = (
"The traffic is classified as '{label}' with a risk level of {risk}. "
"Review source {src} → destination {dst}, protocol {proto}, "
"and ports {sport} → {dport} for suspicious patterns."
)
return text.format(
label=label,
risk=risk_level,
src=src_ip,
dst=dst_ip,
proto=proto,
sport=sport,
dport=dport,
)
# 2️⃣ Summarize multiple events (for report)
def summarize_events(events, model: str = "bcc") -> str:
"""
Takes a list of events and returns a high-level English summary.
"""
if not events:
return "No recent events available for summary."
labels = [_normalize_label(e.get("prediction")) for e in events]
counts = Counter(labels)
total = len(events)
high_risk_keywords = [
"DDOS", "DOS", "BRUTE", "SQL", "BOT", "INFILTRATION", "HULK",
"SLOWLORIS", "SLOWHTTPTEST"
]
high_risk = sum(
c for lbl, c in counts.items()
if any(k in lbl for k in high_risk_keywords)
)
tor_like = sum(
counts.get(lbl, 0) for lbl in ["TOR", "I2P", "ZERONET", "FREENET", "VPN"]
)
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Build readable summary
parts = [
f"AI Summary generated at {ts} for model '{model.upper()}'.",
f"Total analysed events: {total}.",
]
if high_risk:
parts.append(
f"High-risk attacks detected: {high_risk} events "
f"({', '.join(k for k in counts.keys() if any(x in k for x in high_risk_keywords))})."
)
else:
parts.append("No high-risk attack pattern strongly detected in this window.")
if tor_like:
parts.append(
f"Anonymity or tunneling traffic (VPN/TOR/I2P/etc.) observed in {tor_like} events. "
"Verify if this usage is expected and authorized."
)
# top 3 labels
top3 = counts.most_common(3)
label_str = ", ".join(f"{lbl}: {cnt}" for lbl, cnt in top3)
parts.append(f"Top traffic classes: {label_str}.")
if model == "bcc":
parts.append(
"BCC model focuses on live packet patterns; consider correlating with host logs "
"for deeper forensic analysis."
)
else:
parts.append(
"CICIDS model analyses flow-level statistics; consider exporting flows for "
"offline investigation if anomalies increase."
)
return " ".join(parts)