petter2025's picture
Update event.py
48e9ee1 verified
raw
history blame
5.36 kB
"""
Event models for the reliability framework.
Includes ReliabilityEvent, HealingAction, PolicyCondition, etc.
"""
from pydantic import BaseModel, Field, field_validator, computed_field, ConfigDict
from typing import Optional, List, Literal, Tuple
from enum import Enum
from datetime import datetime, timezone
import hashlib
import re
# Note: The following constants are not used directly in this file,
# but they are kept for potential future extensions or consistency with other modules.
# from agentic_reliability_framework.core.config.constants import (
# LATENCY_WARNING, LATENCY_CRITICAL, LATENCY_EXTREME,
# ERROR_RATE_WARNING, ERROR_RATE_HIGH, ERROR_RATE_CRITICAL,
# CPU_WARNING, CPU_CRITICAL,
# MEMORY_WARNING, MEMORY_CRITICAL
# )
def validate_component_id(component: str) -> Tuple[bool, str]:
"""
Validate component ID format (alphanumeric and hyphens only).
Returns (is_valid, error_message).
"""
if not isinstance(component, str):
return False, "Component ID must be a string"
if not (1 <= len(component) <= 255):
return False, "Component ID must be 1-255 characters"
if not re.match(r"^[a-z0-9-]+$", component):
return False, "Component ID must contain only lowercase letters, numbers, and hyphens"
return True, ""
class EventSeverity(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class HealingAction(str, Enum):
RESTART_CONTAINER = "restart_container"
SCALE_OUT = "scale_out"
TRAFFIC_SHIFT = "traffic_shift"
CIRCUIT_BREAKER = "circuit_breaker"
ROLLBACK = "rollback"
ALERT_TEAM = "alert_team"
NO_ACTION = "no_action"
class HealthStatus(str, Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
UNKNOWN = "unknown"
class PolicyCondition(BaseModel):
metric: Literal["latency_p99", "error_rate", "cpu_util", "memory_util", "throughput"]
operator: Literal["gt", "lt", "eq", "gte", "lte"]
threshold: float = Field(ge=0)
model_config = ConfigDict(frozen=True)
class ReliabilityEvent(BaseModel):
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
component: str = Field(min_length=1, max_length=255)
service_mesh: str = Field(default="default", min_length=1, max_length=100)
latency_p99: float = Field(ge=0, lt=300000)
error_rate: float = Field(ge=0, le=1)
throughput: float = Field(ge=0)
cpu_util: Optional[float] = Field(default=None, ge=0, le=1)
memory_util: Optional[float] = Field(default=None, ge=0, le=1)
revenue_impact: Optional[float] = Field(default=None, ge=0)
user_impact: Optional[int] = Field(default=None, ge=0)
upstream_deps: List[str] = Field(default_factory=list)
downstream_deps: List[str] = Field(default_factory=list)
severity: EventSeverity = EventSeverity.LOW
model_config = ConfigDict(frozen=True, validate_assignment=True)
@field_validator("component")
@classmethod
def validate_component_id(cls, v: str) -> str:
if not re.match(r"^[a-z0-9-]+$", v):
raise ValueError("Component ID must contain only lowercase letters, numbers, and hyphens")
return v
@field_validator("upstream_deps", "downstream_deps")
@classmethod
def validate_dependency_format(cls, v: List[str]) -> List[str]:
for dep in v:
if not re.match(r"^[a-z0-9-]+$", dep):
raise ValueError(f"Dependency '{dep}' must contain only lowercase letters, numbers, and hyphens")
return v
@computed_field
@property
def fingerprint(self) -> str:
components = [
self.component,
self.service_mesh,
f"{self.latency_p99:.2f}",
f"{self.error_rate:.4f}",
f"{self.throughput:.2f}"
]
return hashlib.sha256(":".join(components).encode()).hexdigest()
def model_post_init(self, __context) -> None:
circular = set(self.upstream_deps) & set(self.downstream_deps)
if circular:
raise ValueError(f"Circular dependencies detected: {circular}")
class HealingPolicy(BaseModel):
name: str = Field(min_length=1, max_length=255)
conditions: List[PolicyCondition] = Field(min_length=1)
actions: List[HealingAction] = Field(min_length=1)
priority: int = Field(ge=1, le=5, default=3)
cool_down_seconds: int = Field(ge=0, default=300)
enabled: bool = Field(default=True)
max_executions_per_hour: int = Field(ge=1, default=10)
model_config = ConfigDict(frozen=True)
class AnomalyResult(BaseModel):
is_anomaly: bool
confidence: float = Field(ge=0, le=1)
anomaly_score: float = Field(ge=0, le=1)
affected_metrics: List[str] = Field(default_factory=list)
detection_timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
model_config = ConfigDict(frozen=True)
class ForecastResult(BaseModel):
metric: str
predicted_value: float
confidence: float = Field(ge=0, le=1)
trend: Literal["increasing", "decreasing", "stable"]
time_to_threshold: Optional[float] = Field(default=None)
risk_level: Literal["low", "medium", "high", "critical"]
forecast_timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
model_config = ConfigDict(frozen=True)