SentinelAI / agents /normalization_agent.py
iitian's picture
Sync SentinelAI project and add Hugging Face Docker Space layout.
8b3905d
"""Normalization agent: schema validation, host + timestamp standardization."""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
from models.schemas import SecurityEvent
def normalize_event(event: SecurityEvent) -> SecurityEvent:
ts = event.timestamp
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
host = (event.host or "unknown").strip().lower()
cat = _categorize(event.event_type)
normalized = {
**event.normalized,
"category": cat,
"host_normalized": host,
"ts_iso": ts.isoformat(),
}
return event.model_copy(
update={
"timestamp": ts,
"host": host,
"normalized": normalized,
}
)
def _categorize(event_type: str) -> str:
et = event_type.lower()
if "ssh" in et or "auth" in et:
return "authentication"
if "sudo" in et or "privilege" in et:
return "privilege"
if "web" in et or "nginx" in et or "apache" in et:
return "web"
if "k8s" in et or "kubernetes" in et:
return "orchestration"
if "firewall" in et or "iptables" in et:
return "network"
return "general"