"""Shared Pydantic schemas for SentinelAI event pipeline.""" from __future__ import annotations from datetime import datetime, timezone from enum import Enum from typing import Any, Optional from uuid import UUID, uuid4 from pydantic import BaseModel, Field def _utc_now() -> datetime: return datetime.now(timezone.utc) class Severity(str, Enum): CRITICAL = "critical" HIGH = "high" MEDIUM = "medium" LOW = "low" INFO = "info" class AgentStatus(str, Enum): IDLE = "idle" RUNNING = "running" COMPLETE = "complete" ERROR = "error" class RawLogIngest(BaseModel): """Payload for /ingest-logs from collectors or demo scripts.""" source: str = Field(..., description="collector id, e.g. ssh, nginx, k8s") raw_line: str metadata: dict[str, Any] = Field(default_factory=dict) class SecurityEvent(BaseModel): """Unified security event after parse + normalize.""" id: UUID = Field(default_factory=uuid4) timestamp: datetime event_type: str source_ip: Optional[str] = None host: str = "unknown" severity: Severity = Severity.INFO message: str = "" raw: dict[str, Any] = Field(default_factory=dict) normalized: dict[str, Any] = Field(default_factory=dict) class EnrichedEvent(SecurityEvent): """Event with threat intel enrichment.""" enrichment: dict[str, Any] = Field(default_factory=dict) class DetectionFinding(BaseModel): """Output from threat detection agent.""" id: UUID = Field(default_factory=uuid4) event_id: UUID technique: str description: str confidence: float = Field(ge=0, le=1) mitre_technique: Optional[str] = None severity: Severity class IncidentNode(BaseModel): event_id: UUID label: str timestamp: datetime class IncidentEdge(BaseModel): source: UUID target: UUID relation: str class Incident(BaseModel): """Correlated attack chain.""" id: UUID = Field(default_factory=uuid4) title: str summary: str nodes: list[IncidentNode] = Field(default_factory=list) edges: list[IncidentEdge] = Field(default_factory=list) timeline: list[dict[str, Any]] = Field(default_factory=list) created_at: datetime = Field(default_factory=_utc_now) class RiskAssessment(BaseModel): incident_id: UUID risk_score: float = Field(ge=0, le=100) severity: Severity confidence: float = Field(ge=0, le=1) factors: dict[str, Any] = Field(default_factory=dict) class AnalystReport(BaseModel): incident_id: UUID executive_summary: str technical_analysis: str investigation_notes: str indicators: list[str] = Field(default_factory=list) recommended_actions: list[str] = Field( default_factory=list, description="SOC-style remediation bullets (block IP, rotate creds, etc.)", ) class RemediationPlan(BaseModel): incident_id: UUID actions: list[dict[str, Any]] = Field(default_factory=list) firewall_rules: list[str] = Field(default_factory=list) scripts: list[str] = Field(default_factory=list) k8s_patches: list[str] = Field(default_factory=list) iam_hardening: list[str] = Field(default_factory=list) class AlertPayload(BaseModel): channel: str = Field(..., description="slack|discord|email|teams|webhook") title: str body: str severity: Severity metadata: dict[str, Any] = Field(default_factory=dict) class DashboardMetrics(BaseModel): threats_detected: int = 0 active_incidents: int = 0 blocked_attacks: int = 0 events_per_minute: float = 0 top_countries: list[dict[str, Any]] = Field(default_factory=list) risk_trend: list[dict[str, Any]] = Field(default_factory=list) remediation_success_rate: float = 0.94 attack_frequency: list[dict[str, Any]] = Field(default_factory=list) class AgentActivity(BaseModel): agent: str status: AgentStatus detail: str = "" updated_at: datetime = Field(default_factory=_utc_now) class IncidentActionBody(BaseModel): incident_id: UUID class ReplayStartBody(BaseModel): delay_ms: int = Field(default=450, ge=50, le=10_000, description="Delay between replay frames") class WorkflowState(BaseModel): """Shared LangGraph-style state bag.""" events: list[EnrichedEvent] = Field(default_factory=list) findings: list[DetectionFinding] = Field(default_factory=list) incidents: list[Incident] = Field(default_factory=list) risks: list[RiskAssessment] = Field(default_factory=list) reports: list[AnalystReport] = Field(default_factory=list) remediations: list[RemediationPlan] = Field(default_factory=list) alerts_sent: list[str] = Field(default_factory=list)