Spaces:
Running
Running
File size: 4,696 Bytes
8b3905d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | """Shared Pydantic schemas for SentinelAI event pipeline."""
from __future__ import annotations
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Optional
from uuid import UUID, uuid4
from pydantic import BaseModel, Field
def _utc_now() -> datetime:
return datetime.now(timezone.utc)
class Severity(str, Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
class AgentStatus(str, Enum):
IDLE = "idle"
RUNNING = "running"
COMPLETE = "complete"
ERROR = "error"
class RawLogIngest(BaseModel):
"""Payload for /ingest-logs from collectors or demo scripts."""
source: str = Field(..., description="collector id, e.g. ssh, nginx, k8s")
raw_line: str
metadata: dict[str, Any] = Field(default_factory=dict)
class SecurityEvent(BaseModel):
"""Unified security event after parse + normalize."""
id: UUID = Field(default_factory=uuid4)
timestamp: datetime
event_type: str
source_ip: Optional[str] = None
host: str = "unknown"
severity: Severity = Severity.INFO
message: str = ""
raw: dict[str, Any] = Field(default_factory=dict)
normalized: dict[str, Any] = Field(default_factory=dict)
class EnrichedEvent(SecurityEvent):
"""Event with threat intel enrichment."""
enrichment: dict[str, Any] = Field(default_factory=dict)
class DetectionFinding(BaseModel):
"""Output from threat detection agent."""
id: UUID = Field(default_factory=uuid4)
event_id: UUID
technique: str
description: str
confidence: float = Field(ge=0, le=1)
mitre_technique: Optional[str] = None
severity: Severity
class IncidentNode(BaseModel):
event_id: UUID
label: str
timestamp: datetime
class IncidentEdge(BaseModel):
source: UUID
target: UUID
relation: str
class Incident(BaseModel):
"""Correlated attack chain."""
id: UUID = Field(default_factory=uuid4)
title: str
summary: str
nodes: list[IncidentNode] = Field(default_factory=list)
edges: list[IncidentEdge] = Field(default_factory=list)
timeline: list[dict[str, Any]] = Field(default_factory=list)
created_at: datetime = Field(default_factory=_utc_now)
class RiskAssessment(BaseModel):
incident_id: UUID
risk_score: float = Field(ge=0, le=100)
severity: Severity
confidence: float = Field(ge=0, le=1)
factors: dict[str, Any] = Field(default_factory=dict)
class AnalystReport(BaseModel):
incident_id: UUID
executive_summary: str
technical_analysis: str
investigation_notes: str
indicators: list[str] = Field(default_factory=list)
recommended_actions: list[str] = Field(
default_factory=list,
description="SOC-style remediation bullets (block IP, rotate creds, etc.)",
)
class RemediationPlan(BaseModel):
incident_id: UUID
actions: list[dict[str, Any]] = Field(default_factory=list)
firewall_rules: list[str] = Field(default_factory=list)
scripts: list[str] = Field(default_factory=list)
k8s_patches: list[str] = Field(default_factory=list)
iam_hardening: list[str] = Field(default_factory=list)
class AlertPayload(BaseModel):
channel: str = Field(..., description="slack|discord|email|teams|webhook")
title: str
body: str
severity: Severity
metadata: dict[str, Any] = Field(default_factory=dict)
class DashboardMetrics(BaseModel):
threats_detected: int = 0
active_incidents: int = 0
blocked_attacks: int = 0
events_per_minute: float = 0
top_countries: list[dict[str, Any]] = Field(default_factory=list)
risk_trend: list[dict[str, Any]] = Field(default_factory=list)
remediation_success_rate: float = 0.94
attack_frequency: list[dict[str, Any]] = Field(default_factory=list)
class AgentActivity(BaseModel):
agent: str
status: AgentStatus
detail: str = ""
updated_at: datetime = Field(default_factory=_utc_now)
class IncidentActionBody(BaseModel):
incident_id: UUID
class ReplayStartBody(BaseModel):
delay_ms: int = Field(default=450, ge=50, le=10_000, description="Delay between replay frames")
class WorkflowState(BaseModel):
"""Shared LangGraph-style state bag."""
events: list[EnrichedEvent] = Field(default_factory=list)
findings: list[DetectionFinding] = Field(default_factory=list)
incidents: list[Incident] = Field(default_factory=list)
risks: list[RiskAssessment] = Field(default_factory=list)
reports: list[AnalystReport] = Field(default_factory=list)
remediations: list[RemediationPlan] = Field(default_factory=list)
alerts_sent: list[str] = Field(default_factory=list)
|