""" Event models for the reliability framework. Includes ReliabilityEvent, HealingAction, PolicyCondition, etc. """ from pydantic import BaseModel, Field, field_validator, computed_field, ConfigDict from typing import Optional, List, Literal, Tuple from enum import Enum from datetime import datetime, timezone import hashlib import re # Note: The following constants are not used directly in this file, # but they are kept for potential future extensions or consistency with other modules. # from agentic_reliability_framework.core.config.constants import ( # LATENCY_WARNING, LATENCY_CRITICAL, LATENCY_EXTREME, # ERROR_RATE_WARNING, ERROR_RATE_HIGH, ERROR_RATE_CRITICAL, # CPU_WARNING, CPU_CRITICAL, # MEMORY_WARNING, MEMORY_CRITICAL # ) def validate_component_id(component: str) -> Tuple[bool, str]: """ Validate component ID format (alphanumeric and hyphens only). Returns (is_valid, error_message). """ if not isinstance(component, str): return False, "Component ID must be a string" if not (1 <= len(component) <= 255): return False, "Component ID must be 1-255 characters" if not re.match(r"^[a-z0-9-]+$", component): return False, "Component ID must contain only lowercase letters, numbers, and hyphens" return True, "" class EventSeverity(str, Enum): LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" class HealingAction(str, Enum): RESTART_CONTAINER = "restart_container" SCALE_OUT = "scale_out" TRAFFIC_SHIFT = "traffic_shift" CIRCUIT_BREAKER = "circuit_breaker" ROLLBACK = "rollback" ALERT_TEAM = "alert_team" NO_ACTION = "no_action" class HealthStatus(str, Enum): HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy" UNKNOWN = "unknown" class PolicyCondition(BaseModel): metric: Literal["latency_p99", "error_rate", "cpu_util", "memory_util", "throughput"] operator: Literal["gt", "lt", "eq", "gte", "lte"] threshold: float = Field(ge=0) model_config = ConfigDict(frozen=True) class ReliabilityEvent(BaseModel): timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) component: str = Field(min_length=1, max_length=255) service_mesh: str = Field(default="default", min_length=1, max_length=100) latency_p99: float = Field(ge=0, lt=300000) error_rate: float = Field(ge=0, le=1) throughput: float = Field(ge=0) cpu_util: Optional[float] = Field(default=None, ge=0, le=1) memory_util: Optional[float] = Field(default=None, ge=0, le=1) revenue_impact: Optional[float] = Field(default=None, ge=0) user_impact: Optional[int] = Field(default=None, ge=0) upstream_deps: List[str] = Field(default_factory=list) downstream_deps: List[str] = Field(default_factory=list) severity: EventSeverity = EventSeverity.LOW model_config = ConfigDict(frozen=True, validate_assignment=True) @field_validator("component") @classmethod def validate_component_id(cls, v: str) -> str: if not re.match(r"^[a-z0-9-]+$", v): raise ValueError("Component ID must contain only lowercase letters, numbers, and hyphens") return v @field_validator("upstream_deps", "downstream_deps") @classmethod def validate_dependency_format(cls, v: List[str]) -> List[str]: for dep in v: if not re.match(r"^[a-z0-9-]+$", dep): raise ValueError(f"Dependency '{dep}' must contain only lowercase letters, numbers, and hyphens") return v @computed_field @property def fingerprint(self) -> str: components = [ self.component, self.service_mesh, f"{self.latency_p99:.2f}", f"{self.error_rate:.4f}", f"{self.throughput:.2f}" ] return hashlib.sha256(":".join(components).encode()).hexdigest() def model_post_init(self, __context) -> None: circular = set(self.upstream_deps) & set(self.downstream_deps) if circular: raise ValueError(f"Circular dependencies detected: {circular}") class HealingPolicy(BaseModel): name: str = Field(min_length=1, max_length=255) conditions: List[PolicyCondition] = Field(min_length=1) actions: List[HealingAction] = Field(min_length=1) priority: int = Field(ge=1, le=5, default=3) cool_down_seconds: int = Field(ge=0, default=300) enabled: bool = Field(default=True) max_executions_per_hour: int = Field(ge=1, default=10) model_config = ConfigDict(frozen=True) class AnomalyResult(BaseModel): is_anomaly: bool confidence: float = Field(ge=0, le=1) anomaly_score: float = Field(ge=0, le=1) affected_metrics: List[str] = Field(default_factory=list) detection_timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) model_config = ConfigDict(frozen=True) class ForecastResult(BaseModel): metric: str predicted_value: float confidence: float = Field(ge=0, le=1) trend: Literal["increasing", "decreasing", "stable"] time_to_threshold: Optional[float] = Field(default=None) risk_level: Literal["low", "medium", "high", "critical"] forecast_timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) model_config = ConfigDict(frozen=True)