SentinelAI / agents /incident_correlation_agent.py
iitian's picture
Sync SentinelAI project and add Hugging Face Docker Space layout.
8b3905d
"""Incident correlation: fuse multi-stage events into attack timelines and graphs."""
from __future__ import annotations
from collections import defaultdict
from datetime import timedelta
from typing import Iterable
from uuid import NAMESPACE_URL, uuid5
from models.schemas import DetectionFinding, EnrichedEvent, Incident, IncidentEdge, IncidentNode
def correlate(
events: Iterable[EnrichedEvent],
findings: Iterable[DetectionFinding],
window_minutes: int = 45,
) -> list[Incident]:
events_list = sorted(events, key=lambda e: e.timestamp)
if not events_list:
return []
by_ip: dict[str, list[EnrichedEvent]] = defaultdict(list)
for e in events_list:
key = e.source_ip or f"host:{e.host}"
by_ip[key].append(e)
incidents: list[Incident] = []
finding_by_event = {f.event_id: f for f in findings}
for key, chain in by_ip.items():
if len(chain) < 2 and not any(finding_by_event.get(e.id) for e in chain):
continue
start = chain[0].timestamp
end = chain[-1].timestamp
if (end - start) > timedelta(minutes=window_minutes * 4):
continue
nodes = [
IncidentNode(event_id=e.id, label=f"{e.event_type}: {e.message[:80]}", timestamp=e.timestamp)
for e in chain
]
edges: list[IncidentEdge] = []
for i in range(len(chain) - 1):
edges.append(
IncidentEdge(
source=chain[i].id,
target=chain[i + 1].id,
relation="precedes",
)
)
techniques = {finding_by_event[e.id].technique for e in chain if e.id in finding_by_event}
title = f"Correlated activity β€” {key}"
if "brute_force_ssh" in techniques and "credential_stuffing_success" in techniques:
title = f"Likely SSH compromise chain β€” {key}"
summary = _narrative(chain, techniques)
timeline = [
{"t": e.timestamp.isoformat(), "type": e.event_type, "msg": e.message} for e in chain
]
stable_id = uuid5(NAMESPACE_URL, f"sentinelai|incident|{key}")
incidents.append(
Incident(
id=stable_id,
title=title,
summary=summary,
nodes=nodes,
edges=edges,
timeline=timeline,
)
)
return incidents
def _narrative(chain: list[EnrichedEvent], techniques: set[str]) -> str:
parts = [
f"{len(chain)} correlated events spanning "
f"{(chain[-1].timestamp - chain[0].timestamp).total_seconds():.0f}s"
]
if "brute_force_ssh" in techniques:
parts.append("brute-force pattern against SSH")
if "credential_stuffing_success" in techniques:
parts.append("successful authentication after failures")
if "privilege_escalation" in techniques or "privilege_abuse" in techniques:
parts.append("privilege escalation phase")
if "known_malicious_source" in techniques:
parts.append("originates from intelligence-flagged infrastructure")
return "; ".join(parts) + "."