"""Incident correlation: fuse multi-stage events into attack timelines and graphs.""" from __future__ import annotations from collections import defaultdict from datetime import timedelta from typing import Iterable from uuid import NAMESPACE_URL, uuid5 from models.schemas import DetectionFinding, EnrichedEvent, Incident, IncidentEdge, IncidentNode def correlate( events: Iterable[EnrichedEvent], findings: Iterable[DetectionFinding], window_minutes: int = 45, ) -> list[Incident]: events_list = sorted(events, key=lambda e: e.timestamp) if not events_list: return [] by_ip: dict[str, list[EnrichedEvent]] = defaultdict(list) for e in events_list: key = e.source_ip or f"host:{e.host}" by_ip[key].append(e) incidents: list[Incident] = [] finding_by_event = {f.event_id: f for f in findings} for key, chain in by_ip.items(): if len(chain) < 2 and not any(finding_by_event.get(e.id) for e in chain): continue start = chain[0].timestamp end = chain[-1].timestamp if (end - start) > timedelta(minutes=window_minutes * 4): continue nodes = [ IncidentNode(event_id=e.id, label=f"{e.event_type}: {e.message[:80]}", timestamp=e.timestamp) for e in chain ] edges: list[IncidentEdge] = [] for i in range(len(chain) - 1): edges.append( IncidentEdge( source=chain[i].id, target=chain[i + 1].id, relation="precedes", ) ) techniques = {finding_by_event[e.id].technique for e in chain if e.id in finding_by_event} title = f"Correlated activity — {key}" if "brute_force_ssh" in techniques and "credential_stuffing_success" in techniques: title = f"Likely SSH compromise chain — {key}" summary = _narrative(chain, techniques) timeline = [ {"t": e.timestamp.isoformat(), "type": e.event_type, "msg": e.message} for e in chain ] stable_id = uuid5(NAMESPACE_URL, f"sentinelai|incident|{key}") incidents.append( Incident( id=stable_id, title=title, summary=summary, nodes=nodes, edges=edges, timeline=timeline, ) ) return incidents def _narrative(chain: list[EnrichedEvent], techniques: set[str]) -> str: parts = [ f"{len(chain)} correlated events spanning " f"{(chain[-1].timestamp - chain[0].timestamp).total_seconds():.0f}s" ] if "brute_force_ssh" in techniques: parts.append("brute-force pattern against SSH") if "credential_stuffing_success" in techniques: parts.append("successful authentication after failures") if "privilege_escalation" in techniques or "privilege_abuse" in techniques: parts.append("privilege escalation phase") if "known_malicious_source" in techniques: parts.append("originates from intelligence-flagged infrastructure") return "; ".join(parts) + "."