| """ |
| Event models for the reliability framework. |
| Includes ReliabilityEvent, HealingAction, PolicyCondition, etc. |
| """ |
|
|
| from pydantic import BaseModel, Field, field_validator, computed_field, ConfigDict |
| from typing import Optional, List, Literal, Tuple |
| from enum import Enum |
| from datetime import datetime, timezone |
| import hashlib |
| import re |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| def validate_component_id(component: str) -> Tuple[bool, str]: |
| """ |
| Validate component ID format (alphanumeric and hyphens only). |
| Returns (is_valid, error_message). |
| """ |
| if not isinstance(component, str): |
| return False, "Component ID must be a string" |
| if not (1 <= len(component) <= 255): |
| return False, "Component ID must be 1-255 characters" |
| if not re.match(r"^[a-z0-9-]+$", component): |
| return False, "Component ID must contain only lowercase letters, numbers, and hyphens" |
| return True, "" |
|
|
|
|
| class EventSeverity(str, Enum): |
| LOW = "low" |
| MEDIUM = "medium" |
| HIGH = "high" |
| CRITICAL = "critical" |
|
|
|
|
| class HealingAction(str, Enum): |
| RESTART_CONTAINER = "restart_container" |
| SCALE_OUT = "scale_out" |
| TRAFFIC_SHIFT = "traffic_shift" |
| CIRCUIT_BREAKER = "circuit_breaker" |
| ROLLBACK = "rollback" |
| ALERT_TEAM = "alert_team" |
| NO_ACTION = "no_action" |
|
|
|
|
| class HealthStatus(str, Enum): |
| HEALTHY = "healthy" |
| DEGRADED = "degraded" |
| UNHEALTHY = "unhealthy" |
| UNKNOWN = "unknown" |
|
|
|
|
| class PolicyCondition(BaseModel): |
| metric: Literal["latency_p99", "error_rate", "cpu_util", "memory_util", "throughput"] |
| operator: Literal["gt", "lt", "eq", "gte", "lte"] |
| threshold: float = Field(ge=0) |
| model_config = ConfigDict(frozen=True) |
|
|
|
|
| class ReliabilityEvent(BaseModel): |
| timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) |
| component: str = Field(min_length=1, max_length=255) |
| service_mesh: str = Field(default="default", min_length=1, max_length=100) |
| latency_p99: float = Field(ge=0, lt=300000) |
| error_rate: float = Field(ge=0, le=1) |
| throughput: float = Field(ge=0) |
| cpu_util: Optional[float] = Field(default=None, ge=0, le=1) |
| memory_util: Optional[float] = Field(default=None, ge=0, le=1) |
| revenue_impact: Optional[float] = Field(default=None, ge=0) |
| user_impact: Optional[int] = Field(default=None, ge=0) |
| upstream_deps: List[str] = Field(default_factory=list) |
| downstream_deps: List[str] = Field(default_factory=list) |
| severity: EventSeverity = EventSeverity.LOW |
| model_config = ConfigDict(frozen=True, validate_assignment=True) |
|
|
| @field_validator("component") |
| @classmethod |
| def validate_component_id(cls, v: str) -> str: |
| if not re.match(r"^[a-z0-9-]+$", v): |
| raise ValueError("Component ID must contain only lowercase letters, numbers, and hyphens") |
| return v |
|
|
| @field_validator("upstream_deps", "downstream_deps") |
| @classmethod |
| def validate_dependency_format(cls, v: List[str]) -> List[str]: |
| for dep in v: |
| if not re.match(r"^[a-z0-9-]+$", dep): |
| raise ValueError(f"Dependency '{dep}' must contain only lowercase letters, numbers, and hyphens") |
| return v |
|
|
| @computed_field |
| @property |
| def fingerprint(self) -> str: |
| components = [ |
| self.component, |
| self.service_mesh, |
| f"{self.latency_p99:.2f}", |
| f"{self.error_rate:.4f}", |
| f"{self.throughput:.2f}" |
| ] |
| return hashlib.sha256(":".join(components).encode()).hexdigest() |
|
|
| def model_post_init(self, __context) -> None: |
| circular = set(self.upstream_deps) & set(self.downstream_deps) |
| if circular: |
| raise ValueError(f"Circular dependencies detected: {circular}") |
|
|
|
|
| class HealingPolicy(BaseModel): |
| name: str = Field(min_length=1, max_length=255) |
| conditions: List[PolicyCondition] = Field(min_length=1) |
| actions: List[HealingAction] = Field(min_length=1) |
| priority: int = Field(ge=1, le=5, default=3) |
| cool_down_seconds: int = Field(ge=0, default=300) |
| enabled: bool = Field(default=True) |
| max_executions_per_hour: int = Field(ge=1, default=10) |
| model_config = ConfigDict(frozen=True) |
|
|
|
|
| class AnomalyResult(BaseModel): |
| is_anomaly: bool |
| confidence: float = Field(ge=0, le=1) |
| anomaly_score: float = Field(ge=0, le=1) |
| affected_metrics: List[str] = Field(default_factory=list) |
| detection_timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) |
| model_config = ConfigDict(frozen=True) |
|
|
|
|
| class ForecastResult(BaseModel): |
| metric: str |
| predicted_value: float |
| confidence: float = Field(ge=0, le=1) |
| trend: Literal["increasing", "decreasing", "stable"] |
| time_to_threshold: Optional[float] = Field(default=None) |
| risk_level: Literal["low", "medium", "high", "critical"] |
| forecast_timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) |
| model_config = ConfigDict(frozen=True) |