Spaces:
Running
Running
File size: 1,221 Bytes
8b3905d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | """Normalization agent: schema validation, host + timestamp standardization."""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
from models.schemas import SecurityEvent
def normalize_event(event: SecurityEvent) -> SecurityEvent:
ts = event.timestamp
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
host = (event.host or "unknown").strip().lower()
cat = _categorize(event.event_type)
normalized = {
**event.normalized,
"category": cat,
"host_normalized": host,
"ts_iso": ts.isoformat(),
}
return event.model_copy(
update={
"timestamp": ts,
"host": host,
"normalized": normalized,
}
)
def _categorize(event_type: str) -> str:
et = event_type.lower()
if "ssh" in et or "auth" in et:
return "authentication"
if "sudo" in et or "privilege" in et:
return "privilege"
if "web" in et or "nginx" in et or "apache" in et:
return "web"
if "k8s" in et or "kubernetes" in et:
return "orchestration"
if "firewall" in et or "iptables" in et:
return "network"
return "general"
|