SentinelAI / models /schemas.py
iitian's picture
Sync SentinelAI project and add Hugging Face Docker Space layout.
8b3905d
"""Shared Pydantic schemas for SentinelAI event pipeline."""
from __future__ import annotations
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Optional
from uuid import UUID, uuid4
from pydantic import BaseModel, Field
def _utc_now() -> datetime:
return datetime.now(timezone.utc)
class Severity(str, Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
class AgentStatus(str, Enum):
IDLE = "idle"
RUNNING = "running"
COMPLETE = "complete"
ERROR = "error"
class RawLogIngest(BaseModel):
"""Payload for /ingest-logs from collectors or demo scripts."""
source: str = Field(..., description="collector id, e.g. ssh, nginx, k8s")
raw_line: str
metadata: dict[str, Any] = Field(default_factory=dict)
class SecurityEvent(BaseModel):
"""Unified security event after parse + normalize."""
id: UUID = Field(default_factory=uuid4)
timestamp: datetime
event_type: str
source_ip: Optional[str] = None
host: str = "unknown"
severity: Severity = Severity.INFO
message: str = ""
raw: dict[str, Any] = Field(default_factory=dict)
normalized: dict[str, Any] = Field(default_factory=dict)
class EnrichedEvent(SecurityEvent):
"""Event with threat intel enrichment."""
enrichment: dict[str, Any] = Field(default_factory=dict)
class DetectionFinding(BaseModel):
"""Output from threat detection agent."""
id: UUID = Field(default_factory=uuid4)
event_id: UUID
technique: str
description: str
confidence: float = Field(ge=0, le=1)
mitre_technique: Optional[str] = None
severity: Severity
class IncidentNode(BaseModel):
event_id: UUID
label: str
timestamp: datetime
class IncidentEdge(BaseModel):
source: UUID
target: UUID
relation: str
class Incident(BaseModel):
"""Correlated attack chain."""
id: UUID = Field(default_factory=uuid4)
title: str
summary: str
nodes: list[IncidentNode] = Field(default_factory=list)
edges: list[IncidentEdge] = Field(default_factory=list)
timeline: list[dict[str, Any]] = Field(default_factory=list)
created_at: datetime = Field(default_factory=_utc_now)
class RiskAssessment(BaseModel):
incident_id: UUID
risk_score: float = Field(ge=0, le=100)
severity: Severity
confidence: float = Field(ge=0, le=1)
factors: dict[str, Any] = Field(default_factory=dict)
class AnalystReport(BaseModel):
incident_id: UUID
executive_summary: str
technical_analysis: str
investigation_notes: str
indicators: list[str] = Field(default_factory=list)
recommended_actions: list[str] = Field(
default_factory=list,
description="SOC-style remediation bullets (block IP, rotate creds, etc.)",
)
class RemediationPlan(BaseModel):
incident_id: UUID
actions: list[dict[str, Any]] = Field(default_factory=list)
firewall_rules: list[str] = Field(default_factory=list)
scripts: list[str] = Field(default_factory=list)
k8s_patches: list[str] = Field(default_factory=list)
iam_hardening: list[str] = Field(default_factory=list)
class AlertPayload(BaseModel):
channel: str = Field(..., description="slack|discord|email|teams|webhook")
title: str
body: str
severity: Severity
metadata: dict[str, Any] = Field(default_factory=dict)
class DashboardMetrics(BaseModel):
threats_detected: int = 0
active_incidents: int = 0
blocked_attacks: int = 0
events_per_minute: float = 0
top_countries: list[dict[str, Any]] = Field(default_factory=list)
risk_trend: list[dict[str, Any]] = Field(default_factory=list)
remediation_success_rate: float = 0.94
attack_frequency: list[dict[str, Any]] = Field(default_factory=list)
class AgentActivity(BaseModel):
agent: str
status: AgentStatus
detail: str = ""
updated_at: datetime = Field(default_factory=_utc_now)
class IncidentActionBody(BaseModel):
incident_id: UUID
class ReplayStartBody(BaseModel):
delay_ms: int = Field(default=450, ge=50, le=10_000, description="Delay between replay frames")
class WorkflowState(BaseModel):
"""Shared LangGraph-style state bag."""
events: list[EnrichedEvent] = Field(default_factory=list)
findings: list[DetectionFinding] = Field(default_factory=list)
incidents: list[Incident] = Field(default_factory=list)
risks: list[RiskAssessment] = Field(default_factory=list)
reports: list[AnalystReport] = Field(default_factory=list)
remediations: list[RemediationPlan] = Field(default_factory=list)
alerts_sent: list[str] = Field(default_factory=list)