File size: 4,674 Bytes
8b3905d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""Threat detection: rules + lightweight anomaly hooks + LLM-ready findings."""

from __future__ import annotations

from collections import defaultdict
from datetime import datetime, timedelta, timezone

from models.schemas import DetectionFinding, EnrichedEvent, Severity

# Sliding window: source_ip -> list of timestamps (failed ssh)
_fail_window: dict[str, list[datetime]] = defaultdict(list)
_burst_window: dict[str, list[datetime]] = defaultdict(list)


def _prune(ip: str, window: dict[str, list[datetime]], minutes: int = 15) -> None:
    cutoff = datetime.now(timezone.utc) - timedelta(minutes=minutes)
    window[ip] = [t for t in window[ip] if t > cutoff]


def detect_threats(event: EnrichedEvent) -> list[DetectionFinding]:
    findings: list[DetectionFinding] = []
    ip = event.source_ip or "unknown"
    now = event.timestamp

    if event.event_type == "auth.ssh_failed":
        _prune(ip, _fail_window)
        _fail_window[ip].append(now)
        if len(_fail_window[ip]) >= 5:
            findings.append(
                DetectionFinding(
                    event_id=event.id,
                    technique="brute_force_ssh",
                    description=f"Clustered SSH failures from {ip} ({len(_fail_window[ip])} in window)",
                    confidence=min(0.55 + 0.05 * len(_fail_window[ip]), 0.98),
                    mitre_technique="T1110",
                    severity=Severity.HIGH,
                )
            )

    if event.event_type == "auth.ssh_success" and len(_fail_window.get(ip, [])) >= 3:
        findings.append(
            DetectionFinding(
                event_id=event.id,
                technique="credential_stuffing_success",
                description="Successful SSH after repeated failures — possible stuffing or spray success",
                confidence=0.78,
                mitre_technique="T1078",
                severity=Severity.CRITICAL,
            )
        )

    if event.event_type == "privilege.sudo":
        cmd = str(event.normalized.get("command", "")).lower()
        if any(x in cmd for x in ("curl", "wget", "chmod 777", "/tmp/", "base64")):
            findings.append(
                DetectionFinding(
                    event_id=event.id,
                    technique="privilege_abuse",
                    description="Suspicious sudo command chain consistent with post-exploitation",
                    confidence=0.72,
                    mitre_technique="T1548",
                    severity=Severity.HIGH,
                )
            )
        else:
            findings.append(
                DetectionFinding(
                    event_id=event.id,
                    technique="privilege_escalation",
                    description="Interactive privilege elevation observed",
                    confidence=0.55,
                    mitre_technique="T1548",
                    severity=Severity.MEDIUM,
                )
            )

    if event.event_type == "web.request":
        code = event.normalized.get("status")
        if code in (401, 403):
            _prune(ip, _burst_window, minutes=5)
            _burst_window[ip].append(now)
            if len(_burst_window[ip]) >= 40:
                findings.append(
                    DetectionFinding(
                        event_id=event.id,
                        technique="credential_spray",
                        description="High volume of denied web authentications",
                        confidence=0.68,
                        mitre_technique="T1110",
                        severity=Severity.MEDIUM,
                    )
                )

    if event.enrichment.get("reputation") == "malicious":
        findings.append(
            DetectionFinding(
                event_id=event.id,
                technique="known_malicious_source",
                description="Source matches threat intelligence with elevated confidence",
                confidence=float(event.enrichment.get("confidence", 0.85)),
                mitre_technique="T1071",
                severity=Severity.HIGH,
            )
        )

    if event.event_type == "k8s.event":
        if "backoff" in event.message.lower() or "fail" in event.message.lower():
            findings.append(
                DetectionFinding(
                    event_id=event.id,
                    technique="k8s_anomaly",
                    description="Kubernetes workload instability — investigate supply chain or runtime compromise",
                    confidence=0.5,
                    mitre_technique="T1190",
                    severity=Severity.MEDIUM,
                )
            )

    return findings