"""Threat detection: rules + lightweight anomaly hooks + LLM-ready findings.""" from __future__ import annotations from collections import defaultdict from datetime import datetime, timedelta, timezone from models.schemas import DetectionFinding, EnrichedEvent, Severity # Sliding window: source_ip -> list of timestamps (failed ssh) _fail_window: dict[str, list[datetime]] = defaultdict(list) _burst_window: dict[str, list[datetime]] = defaultdict(list) def _prune(ip: str, window: dict[str, list[datetime]], minutes: int = 15) -> None: cutoff = datetime.now(timezone.utc) - timedelta(minutes=minutes) window[ip] = [t for t in window[ip] if t > cutoff] def detect_threats(event: EnrichedEvent) -> list[DetectionFinding]: findings: list[DetectionFinding] = [] ip = event.source_ip or "unknown" now = event.timestamp if event.event_type == "auth.ssh_failed": _prune(ip, _fail_window) _fail_window[ip].append(now) if len(_fail_window[ip]) >= 5: findings.append( DetectionFinding( event_id=event.id, technique="brute_force_ssh", description=f"Clustered SSH failures from {ip} ({len(_fail_window[ip])} in window)", confidence=min(0.55 + 0.05 * len(_fail_window[ip]), 0.98), mitre_technique="T1110", severity=Severity.HIGH, ) ) if event.event_type == "auth.ssh_success" and len(_fail_window.get(ip, [])) >= 3: findings.append( DetectionFinding( event_id=event.id, technique="credential_stuffing_success", description="Successful SSH after repeated failures — possible stuffing or spray success", confidence=0.78, mitre_technique="T1078", severity=Severity.CRITICAL, ) ) if event.event_type == "privilege.sudo": cmd = str(event.normalized.get("command", "")).lower() if any(x in cmd for x in ("curl", "wget", "chmod 777", "/tmp/", "base64")): findings.append( DetectionFinding( event_id=event.id, technique="privilege_abuse", description="Suspicious sudo command chain consistent with post-exploitation", confidence=0.72, mitre_technique="T1548", severity=Severity.HIGH, ) ) else: findings.append( DetectionFinding( event_id=event.id, technique="privilege_escalation", description="Interactive privilege elevation observed", confidence=0.55, mitre_technique="T1548", severity=Severity.MEDIUM, ) ) if event.event_type == "web.request": code = event.normalized.get("status") if code in (401, 403): _prune(ip, _burst_window, minutes=5) _burst_window[ip].append(now) if len(_burst_window[ip]) >= 40: findings.append( DetectionFinding( event_id=event.id, technique="credential_spray", description="High volume of denied web authentications", confidence=0.68, mitre_technique="T1110", severity=Severity.MEDIUM, ) ) if event.enrichment.get("reputation") == "malicious": findings.append( DetectionFinding( event_id=event.id, technique="known_malicious_source", description="Source matches threat intelligence with elevated confidence", confidence=float(event.enrichment.get("confidence", 0.85)), mitre_technique="T1071", severity=Severity.HIGH, ) ) if event.event_type == "k8s.event": if "backoff" in event.message.lower() or "fail" in event.message.lower(): findings.append( DetectionFinding( event_id=event.id, technique="k8s_anomaly", description="Kubernetes workload instability — investigate supply chain or runtime compromise", confidence=0.5, mitre_technique="T1190", severity=Severity.MEDIUM, ) ) return findings