SentinelAI / agents /threat_detection_agent.py
iitian's picture
Sync SentinelAI project and add Hugging Face Docker Space layout.
8b3905d
"""Threat detection: rules + lightweight anomaly hooks + LLM-ready findings."""
from __future__ import annotations
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from models.schemas import DetectionFinding, EnrichedEvent, Severity
# Sliding window: source_ip -> list of timestamps (failed ssh)
_fail_window: dict[str, list[datetime]] = defaultdict(list)
_burst_window: dict[str, list[datetime]] = defaultdict(list)
def _prune(ip: str, window: dict[str, list[datetime]], minutes: int = 15) -> None:
cutoff = datetime.now(timezone.utc) - timedelta(minutes=minutes)
window[ip] = [t for t in window[ip] if t > cutoff]
def detect_threats(event: EnrichedEvent) -> list[DetectionFinding]:
findings: list[DetectionFinding] = []
ip = event.source_ip or "unknown"
now = event.timestamp
if event.event_type == "auth.ssh_failed":
_prune(ip, _fail_window)
_fail_window[ip].append(now)
if len(_fail_window[ip]) >= 5:
findings.append(
DetectionFinding(
event_id=event.id,
technique="brute_force_ssh",
description=f"Clustered SSH failures from {ip} ({len(_fail_window[ip])} in window)",
confidence=min(0.55 + 0.05 * len(_fail_window[ip]), 0.98),
mitre_technique="T1110",
severity=Severity.HIGH,
)
)
if event.event_type == "auth.ssh_success" and len(_fail_window.get(ip, [])) >= 3:
findings.append(
DetectionFinding(
event_id=event.id,
technique="credential_stuffing_success",
description="Successful SSH after repeated failures β€” possible stuffing or spray success",
confidence=0.78,
mitre_technique="T1078",
severity=Severity.CRITICAL,
)
)
if event.event_type == "privilege.sudo":
cmd = str(event.normalized.get("command", "")).lower()
if any(x in cmd for x in ("curl", "wget", "chmod 777", "/tmp/", "base64")):
findings.append(
DetectionFinding(
event_id=event.id,
technique="privilege_abuse",
description="Suspicious sudo command chain consistent with post-exploitation",
confidence=0.72,
mitre_technique="T1548",
severity=Severity.HIGH,
)
)
else:
findings.append(
DetectionFinding(
event_id=event.id,
technique="privilege_escalation",
description="Interactive privilege elevation observed",
confidence=0.55,
mitre_technique="T1548",
severity=Severity.MEDIUM,
)
)
if event.event_type == "web.request":
code = event.normalized.get("status")
if code in (401, 403):
_prune(ip, _burst_window, minutes=5)
_burst_window[ip].append(now)
if len(_burst_window[ip]) >= 40:
findings.append(
DetectionFinding(
event_id=event.id,
technique="credential_spray",
description="High volume of denied web authentications",
confidence=0.68,
mitre_technique="T1110",
severity=Severity.MEDIUM,
)
)
if event.enrichment.get("reputation") == "malicious":
findings.append(
DetectionFinding(
event_id=event.id,
technique="known_malicious_source",
description="Source matches threat intelligence with elevated confidence",
confidence=float(event.enrichment.get("confidence", 0.85)),
mitre_technique="T1071",
severity=Severity.HIGH,
)
)
if event.event_type == "k8s.event":
if "backoff" in event.message.lower() or "fail" in event.message.lower():
findings.append(
DetectionFinding(
event_id=event.id,
technique="k8s_anomaly",
description="Kubernetes workload instability β€” investigate supply chain or runtime compromise",
confidence=0.5,
mitre_technique="T1190",
severity=Severity.MEDIUM,
)
)
return findings