Spaces:
Running
Running
| """Incident correlation: fuse multi-stage events into attack timelines and graphs.""" | |
| from __future__ import annotations | |
| from collections import defaultdict | |
| from datetime import timedelta | |
| from typing import Iterable | |
| from uuid import NAMESPACE_URL, uuid5 | |
| from models.schemas import DetectionFinding, EnrichedEvent, Incident, IncidentEdge, IncidentNode | |
| def correlate( | |
| events: Iterable[EnrichedEvent], | |
| findings: Iterable[DetectionFinding], | |
| window_minutes: int = 45, | |
| ) -> list[Incident]: | |
| events_list = sorted(events, key=lambda e: e.timestamp) | |
| if not events_list: | |
| return [] | |
| by_ip: dict[str, list[EnrichedEvent]] = defaultdict(list) | |
| for e in events_list: | |
| key = e.source_ip or f"host:{e.host}" | |
| by_ip[key].append(e) | |
| incidents: list[Incident] = [] | |
| finding_by_event = {f.event_id: f for f in findings} | |
| for key, chain in by_ip.items(): | |
| if len(chain) < 2 and not any(finding_by_event.get(e.id) for e in chain): | |
| continue | |
| start = chain[0].timestamp | |
| end = chain[-1].timestamp | |
| if (end - start) > timedelta(minutes=window_minutes * 4): | |
| continue | |
| nodes = [ | |
| IncidentNode(event_id=e.id, label=f"{e.event_type}: {e.message[:80]}", timestamp=e.timestamp) | |
| for e in chain | |
| ] | |
| edges: list[IncidentEdge] = [] | |
| for i in range(len(chain) - 1): | |
| edges.append( | |
| IncidentEdge( | |
| source=chain[i].id, | |
| target=chain[i + 1].id, | |
| relation="precedes", | |
| ) | |
| ) | |
| techniques = {finding_by_event[e.id].technique for e in chain if e.id in finding_by_event} | |
| title = f"Correlated activity β {key}" | |
| if "brute_force_ssh" in techniques and "credential_stuffing_success" in techniques: | |
| title = f"Likely SSH compromise chain β {key}" | |
| summary = _narrative(chain, techniques) | |
| timeline = [ | |
| {"t": e.timestamp.isoformat(), "type": e.event_type, "msg": e.message} for e in chain | |
| ] | |
| stable_id = uuid5(NAMESPACE_URL, f"sentinelai|incident|{key}") | |
| incidents.append( | |
| Incident( | |
| id=stable_id, | |
| title=title, | |
| summary=summary, | |
| nodes=nodes, | |
| edges=edges, | |
| timeline=timeline, | |
| ) | |
| ) | |
| return incidents | |
| def _narrative(chain: list[EnrichedEvent], techniques: set[str]) -> str: | |
| parts = [ | |
| f"{len(chain)} correlated events spanning " | |
| f"{(chain[-1].timestamp - chain[0].timestamp).total_seconds():.0f}s" | |
| ] | |
| if "brute_force_ssh" in techniques: | |
| parts.append("brute-force pattern against SSH") | |
| if "credential_stuffing_success" in techniques: | |
| parts.append("successful authentication after failures") | |
| if "privilege_escalation" in techniques or "privilege_abuse" in techniques: | |
| parts.append("privilege escalation phase") | |
| if "known_malicious_source" in techniques: | |
| parts.append("originates from intelligence-flagged infrastructure") | |
| return "; ".join(parts) + "." | |