Spaces:
Sleeping
Sleeping
| """Threat detection: rules + lightweight anomaly hooks + LLM-ready findings.""" | |
| from __future__ import annotations | |
| from collections import defaultdict | |
| from datetime import datetime, timedelta, timezone | |
| from models.schemas import DetectionFinding, EnrichedEvent, Severity | |
| # Sliding window: source_ip -> list of timestamps (failed ssh) | |
| _fail_window: dict[str, list[datetime]] = defaultdict(list) | |
| _burst_window: dict[str, list[datetime]] = defaultdict(list) | |
| def _prune(ip: str, window: dict[str, list[datetime]], minutes: int = 15) -> None: | |
| cutoff = datetime.now(timezone.utc) - timedelta(minutes=minutes) | |
| window[ip] = [t for t in window[ip] if t > cutoff] | |
| def detect_threats(event: EnrichedEvent) -> list[DetectionFinding]: | |
| findings: list[DetectionFinding] = [] | |
| ip = event.source_ip or "unknown" | |
| now = event.timestamp | |
| if event.event_type == "auth.ssh_failed": | |
| _prune(ip, _fail_window) | |
| _fail_window[ip].append(now) | |
| if len(_fail_window[ip]) >= 5: | |
| findings.append( | |
| DetectionFinding( | |
| event_id=event.id, | |
| technique="brute_force_ssh", | |
| description=f"Clustered SSH failures from {ip} ({len(_fail_window[ip])} in window)", | |
| confidence=min(0.55 + 0.05 * len(_fail_window[ip]), 0.98), | |
| mitre_technique="T1110", | |
| severity=Severity.HIGH, | |
| ) | |
| ) | |
| if event.event_type == "auth.ssh_success" and len(_fail_window.get(ip, [])) >= 3: | |
| findings.append( | |
| DetectionFinding( | |
| event_id=event.id, | |
| technique="credential_stuffing_success", | |
| description="Successful SSH after repeated failures β possible stuffing or spray success", | |
| confidence=0.78, | |
| mitre_technique="T1078", | |
| severity=Severity.CRITICAL, | |
| ) | |
| ) | |
| if event.event_type == "privilege.sudo": | |
| cmd = str(event.normalized.get("command", "")).lower() | |
| if any(x in cmd for x in ("curl", "wget", "chmod 777", "/tmp/", "base64")): | |
| findings.append( | |
| DetectionFinding( | |
| event_id=event.id, | |
| technique="privilege_abuse", | |
| description="Suspicious sudo command chain consistent with post-exploitation", | |
| confidence=0.72, | |
| mitre_technique="T1548", | |
| severity=Severity.HIGH, | |
| ) | |
| ) | |
| else: | |
| findings.append( | |
| DetectionFinding( | |
| event_id=event.id, | |
| technique="privilege_escalation", | |
| description="Interactive privilege elevation observed", | |
| confidence=0.55, | |
| mitre_technique="T1548", | |
| severity=Severity.MEDIUM, | |
| ) | |
| ) | |
| if event.event_type == "web.request": | |
| code = event.normalized.get("status") | |
| if code in (401, 403): | |
| _prune(ip, _burst_window, minutes=5) | |
| _burst_window[ip].append(now) | |
| if len(_burst_window[ip]) >= 40: | |
| findings.append( | |
| DetectionFinding( | |
| event_id=event.id, | |
| technique="credential_spray", | |
| description="High volume of denied web authentications", | |
| confidence=0.68, | |
| mitre_technique="T1110", | |
| severity=Severity.MEDIUM, | |
| ) | |
| ) | |
| if event.enrichment.get("reputation") == "malicious": | |
| findings.append( | |
| DetectionFinding( | |
| event_id=event.id, | |
| technique="known_malicious_source", | |
| description="Source matches threat intelligence with elevated confidence", | |
| confidence=float(event.enrichment.get("confidence", 0.85)), | |
| mitre_technique="T1071", | |
| severity=Severity.HIGH, | |
| ) | |
| ) | |
| if event.event_type == "k8s.event": | |
| if "backoff" in event.message.lower() or "fail" in event.message.lower(): | |
| findings.append( | |
| DetectionFinding( | |
| event_id=event.id, | |
| technique="k8s_anomaly", | |
| description="Kubernetes workload instability β investigate supply chain or runtime compromise", | |
| confidence=0.5, | |
| mitre_technique="T1190", | |
| severity=Severity.MEDIUM, | |
| ) | |
| ) | |
| return findings | |