Spaces:
Running
Running
| """Normalization agent: schema validation, host + timestamp standardization.""" | |
| from __future__ import annotations | |
| from datetime import datetime, timezone | |
| from typing import Any | |
| from models.schemas import SecurityEvent | |
| def normalize_event(event: SecurityEvent) -> SecurityEvent: | |
| ts = event.timestamp | |
| if ts.tzinfo is None: | |
| ts = ts.replace(tzinfo=timezone.utc) | |
| host = (event.host or "unknown").strip().lower() | |
| cat = _categorize(event.event_type) | |
| normalized = { | |
| **event.normalized, | |
| "category": cat, | |
| "host_normalized": host, | |
| "ts_iso": ts.isoformat(), | |
| } | |
| return event.model_copy( | |
| update={ | |
| "timestamp": ts, | |
| "host": host, | |
| "normalized": normalized, | |
| } | |
| ) | |
| def _categorize(event_type: str) -> str: | |
| et = event_type.lower() | |
| if "ssh" in et or "auth" in et: | |
| return "authentication" | |
| if "sudo" in et or "privilege" in et: | |
| return "privilege" | |
| if "web" in et or "nginx" in et or "apache" in et: | |
| return "web" | |
| if "k8s" in et or "kubernetes" in et: | |
| return "orchestration" | |
| if "firewall" in et or "iptables" in et: | |
| return "network" | |
| return "general" | |