Spaces:
Running
Running
| """Shared Pydantic schemas for SentinelAI event pipeline.""" | |
| from __future__ import annotations | |
| from datetime import datetime, timezone | |
| from enum import Enum | |
| from typing import Any, Optional | |
| from uuid import UUID, uuid4 | |
| from pydantic import BaseModel, Field | |
| def _utc_now() -> datetime: | |
| return datetime.now(timezone.utc) | |
| class Severity(str, Enum): | |
| CRITICAL = "critical" | |
| HIGH = "high" | |
| MEDIUM = "medium" | |
| LOW = "low" | |
| INFO = "info" | |
| class AgentStatus(str, Enum): | |
| IDLE = "idle" | |
| RUNNING = "running" | |
| COMPLETE = "complete" | |
| ERROR = "error" | |
| class RawLogIngest(BaseModel): | |
| """Payload for /ingest-logs from collectors or demo scripts.""" | |
| source: str = Field(..., description="collector id, e.g. ssh, nginx, k8s") | |
| raw_line: str | |
| metadata: dict[str, Any] = Field(default_factory=dict) | |
| class SecurityEvent(BaseModel): | |
| """Unified security event after parse + normalize.""" | |
| id: UUID = Field(default_factory=uuid4) | |
| timestamp: datetime | |
| event_type: str | |
| source_ip: Optional[str] = None | |
| host: str = "unknown" | |
| severity: Severity = Severity.INFO | |
| message: str = "" | |
| raw: dict[str, Any] = Field(default_factory=dict) | |
| normalized: dict[str, Any] = Field(default_factory=dict) | |
| class EnrichedEvent(SecurityEvent): | |
| """Event with threat intel enrichment.""" | |
| enrichment: dict[str, Any] = Field(default_factory=dict) | |
| class DetectionFinding(BaseModel): | |
| """Output from threat detection agent.""" | |
| id: UUID = Field(default_factory=uuid4) | |
| event_id: UUID | |
| technique: str | |
| description: str | |
| confidence: float = Field(ge=0, le=1) | |
| mitre_technique: Optional[str] = None | |
| severity: Severity | |
| class IncidentNode(BaseModel): | |
| event_id: UUID | |
| label: str | |
| timestamp: datetime | |
| class IncidentEdge(BaseModel): | |
| source: UUID | |
| target: UUID | |
| relation: str | |
| class Incident(BaseModel): | |
| """Correlated attack chain.""" | |
| id: UUID = Field(default_factory=uuid4) | |
| title: str | |
| summary: str | |
| nodes: list[IncidentNode] = Field(default_factory=list) | |
| edges: list[IncidentEdge] = Field(default_factory=list) | |
| timeline: list[dict[str, Any]] = Field(default_factory=list) | |
| created_at: datetime = Field(default_factory=_utc_now) | |
| class RiskAssessment(BaseModel): | |
| incident_id: UUID | |
| risk_score: float = Field(ge=0, le=100) | |
| severity: Severity | |
| confidence: float = Field(ge=0, le=1) | |
| factors: dict[str, Any] = Field(default_factory=dict) | |
| class AnalystReport(BaseModel): | |
| incident_id: UUID | |
| executive_summary: str | |
| technical_analysis: str | |
| investigation_notes: str | |
| indicators: list[str] = Field(default_factory=list) | |
| recommended_actions: list[str] = Field( | |
| default_factory=list, | |
| description="SOC-style remediation bullets (block IP, rotate creds, etc.)", | |
| ) | |
| class RemediationPlan(BaseModel): | |
| incident_id: UUID | |
| actions: list[dict[str, Any]] = Field(default_factory=list) | |
| firewall_rules: list[str] = Field(default_factory=list) | |
| scripts: list[str] = Field(default_factory=list) | |
| k8s_patches: list[str] = Field(default_factory=list) | |
| iam_hardening: list[str] = Field(default_factory=list) | |
| class AlertPayload(BaseModel): | |
| channel: str = Field(..., description="slack|discord|email|teams|webhook") | |
| title: str | |
| body: str | |
| severity: Severity | |
| metadata: dict[str, Any] = Field(default_factory=dict) | |
| class DashboardMetrics(BaseModel): | |
| threats_detected: int = 0 | |
| active_incidents: int = 0 | |
| blocked_attacks: int = 0 | |
| events_per_minute: float = 0 | |
| top_countries: list[dict[str, Any]] = Field(default_factory=list) | |
| risk_trend: list[dict[str, Any]] = Field(default_factory=list) | |
| remediation_success_rate: float = 0.94 | |
| attack_frequency: list[dict[str, Any]] = Field(default_factory=list) | |
| class AgentActivity(BaseModel): | |
| agent: str | |
| status: AgentStatus | |
| detail: str = "" | |
| updated_at: datetime = Field(default_factory=_utc_now) | |
| class IncidentActionBody(BaseModel): | |
| incident_id: UUID | |
| class ReplayStartBody(BaseModel): | |
| delay_ms: int = Field(default=450, ge=50, le=10_000, description="Delay between replay frames") | |
| class WorkflowState(BaseModel): | |
| """Shared LangGraph-style state bag.""" | |
| events: list[EnrichedEvent] = Field(default_factory=list) | |
| findings: list[DetectionFinding] = Field(default_factory=list) | |
| incidents: list[Incident] = Field(default_factory=list) | |
| risks: list[RiskAssessment] = Field(default_factory=list) | |
| reports: list[AnalystReport] = Field(default_factory=list) | |
| remediations: list[RemediationPlan] = Field(default_factory=list) | |
| alerts_sent: list[str] = Field(default_factory=list) | |