File size: 5,139 Bytes
fc6d3ff 6938754 fc6d3ff 6938754 fc6d3ff | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | """
Event models for the reliability framework.
Includes ReliabilityEvent, HealingAction, PolicyCondition, etc.
"""
from pydantic import BaseModel, Field, field_validator, computed_field, ConfigDict
from typing import Optional, List, Literal, Tuple
from enum import Enum
from datetime import datetime, timezone
import hashlib
import re
from agentic_reliability_framework.core.config.constants import (
LATENCY_WARNING, LATENCY_CRITICAL, LATENCY_EXTREME,
ERROR_RATE_WARNING, ERROR_RATE_HIGH, ERROR_RATE_CRITICAL,
CPU_WARNING, CPU_CRITICAL,
MEMORY_WARNING, MEMORY_CRITICAL
)
def validate_component_id(component: str) -> Tuple[bool, str]:
"""Validate component ID format (alphanumeric and hyphens only)."""
if not isinstance(component, str):
return False, "Component ID must be a string"
if not (1 <= len(component) <= 255):
return False, "Component ID must be 1-255 characters"
if not re.match(r"^[a-z0-9-]+$", component):
return False, "Component ID must contain only lowercase letters, numbers, and hyphens"
return True, ""
class EventSeverity(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class HealingAction(str, Enum):
RESTART_CONTAINER = "restart_container"
SCALE_OUT = "scale_out"
TRAFFIC_SHIFT = "traffic_shift"
CIRCUIT_BREAKER = "circuit_breaker"
ROLLBACK = "rollback"
ALERT_TEAM = "alert_team"
NO_ACTION = "no_action"
class HealthStatus(str, Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
UNKNOWN = "unknown"
class PolicyCondition(BaseModel):
metric: Literal["latency_p99", "error_rate", "cpu_util", "memory_util", "throughput"]
operator: Literal["gt", "lt", "eq", "gte", "lte"]
threshold: float = Field(ge=0)
model_config = ConfigDict(frozen=True)
class ReliabilityEvent(BaseModel):
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
component: str = Field(min_length=1, max_length=255)
service_mesh: str = Field(default="default", min_length=1, max_length=100)
latency_p99: float = Field(ge=0, lt=300000)
error_rate: float = Field(ge=0, le=1)
throughput: float = Field(ge=0)
cpu_util: Optional[float] = Field(default=None, ge=0, le=1)
memory_util: Optional[float] = Field(default=None, ge=0, le=1)
revenue_impact: Optional[float] = Field(default=None, ge=0)
user_impact: Optional[int] = Field(default=None, ge=0)
upstream_deps: List[str] = Field(default_factory=list)
downstream_deps: List[str] = Field(default_factory=list)
severity: EventSeverity = EventSeverity.LOW
model_config = ConfigDict(frozen=True, validate_assignment=True)
@field_validator("component")
@classmethod
def validate_component_id(cls, v: str) -> str:
if not re.match(r"^[a-z0-9-]+$", v):
raise ValueError("Component ID must contain only lowercase letters, numbers, and hyphens")
return v
@field_validator("upstream_deps", "downstream_deps")
@classmethod
def validate_dependency_format(cls, v: List[str]) -> List[str]:
for dep in v:
if not re.match(r"^[a-z0-9-]+$", dep):
raise ValueError(f"Dependency '{dep}' must contain only lowercase letters, numbers, and hyphens")
return v
@computed_field
@property
def fingerprint(self) -> str:
components = [
self.component,
self.service_mesh,
f"{self.latency_p99:.2f}",
f"{self.error_rate:.4f}",
f"{self.throughput:.2f}"
]
return hashlib.sha256(":".join(components).encode()).hexdigest()
def model_post_init(self, __context) -> None:
circular = set(self.upstream_deps) & set(self.downstream_deps)
if circular:
raise ValueError(f"Circular dependencies detected: {circular}")
class HealingPolicy(BaseModel):
name: str = Field(min_length=1, max_length=255)
conditions: List[PolicyCondition] = Field(min_length=1)
actions: List[HealingAction] = Field(min_length=1)
priority: int = Field(ge=1, le=5, default=3)
cool_down_seconds: int = Field(ge=0, default=300)
enabled: bool = Field(default=True)
max_executions_per_hour: int = Field(ge=1, default=10)
model_config = ConfigDict(frozen=True)
class AnomalyResult(BaseModel):
is_anomaly: bool
confidence: float = Field(ge=0, le=1)
anomaly_score: float = Field(ge=0, le=1)
affected_metrics: List[str] = Field(default_factory=list)
detection_timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
model_config = ConfigDict(frozen=True)
class ForecastResult(BaseModel):
metric: str
predicted_value: float
confidence: float = Field(ge=0, le=1)
trend: Literal["increasing", "decreasing", "stable"]
time_to_threshold: Optional[float] = Field(default=None)
risk_level: Literal["low", "medium", "high", "critical"]
forecast_timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
model_config = ConfigDict(frozen=True) |