""" Policy Engine for Automated Healing Actions. """ import threading import logging import datetime from collections import OrderedDict from typing import Dict, List, Optional, Any from agentic_reliability_framework.core.models.event import HealingPolicy, HealingAction, ReliabilityEvent, PolicyCondition logger = logging.getLogger(__name__) DEFAULT_HEALING_POLICIES = [ HealingPolicy( name="high_latency_restart", conditions=[PolicyCondition(metric="latency_p99", operator="gt", threshold=500.0)], actions=[HealingAction.RESTART_CONTAINER, HealingAction.ALERT_TEAM], priority=1, cool_down_seconds=300, max_executions_per_hour=5 ), HealingPolicy( name="critical_error_rate_rollback", conditions=[PolicyCondition(metric="error_rate", operator="gt", threshold=0.3)], actions=[HealingAction.ROLLBACK, HealingAction.CIRCUIT_BREAKER, HealingAction.ALERT_TEAM], priority=1, cool_down_seconds=600, max_executions_per_hour=3 ), HealingPolicy( name="high_error_rate_traffic_shift", conditions=[PolicyCondition(metric="error_rate", operator="gt", threshold=0.15)], actions=[HealingAction.TRAFFIC_SHIFT, HealingAction.ALERT_TEAM], priority=2, cool_down_seconds=300, max_executions_per_hour=5 ), HealingPolicy( name="resource_exhaustion_scale", conditions=[ PolicyCondition(metric="cpu_util", operator="gt", threshold=0.9), PolicyCondition(metric="memory_util", operator="gt", threshold=0.9) ], actions=[HealingAction.SCALE_OUT], priority=2, cool_down_seconds=600, max_executions_per_hour=10 ), HealingPolicy( name="moderate_latency_circuit_breaker", conditions=[PolicyCondition(metric="latency_p99", operator="gt", threshold=300.0)], actions=[HealingAction.CIRCUIT_BREAKER], priority=3, cool_down_seconds=180, max_executions_per_hour=8 ) ] class PolicyEngine: """ Thread-safe policy engine with cooldown and rate limiting. """ def __init__( self, policies: Optional[List[HealingPolicy]] = None, max_cooldown_history: int = 10000, max_execution_history: int = 1000 ): self.policies = policies or DEFAULT_HEALING_POLICIES self._lock = threading.RLock() self.last_execution: Dict[str, float] = OrderedDict() self.max_cooldown_history = max_cooldown_history self.execution_timestamps: Dict[str, List[float]] = {} self.max_execution_history = max_execution_history self.policies = sorted(self.policies, key=lambda p: p.priority) logger.info(f"Initialized PolicyEngine with {len(self.policies)} policies") def evaluate_policies(self, event: ReliabilityEvent) -> List[HealingAction]: applicable_actions = [] current_time = datetime.datetime.now(datetime.timezone.utc).timestamp() for policy in self.policies: if not policy.enabled: continue policy_key = f"{policy.name}_{event.component}" with self._lock: last_exec = self.last_execution.get(policy_key, 0) if current_time - last_exec < policy.cool_down_seconds: continue if self._is_rate_limited(policy_key, policy, current_time): continue if self._evaluate_conditions(policy.conditions, event): applicable_actions.extend(policy.actions) self.last_execution[policy_key] = current_time self._record_execution(policy_key, current_time) seen = set() unique = [] for a in applicable_actions: if a not in seen: seen.add(a) unique.append(a) return unique if unique else [HealingAction.NO_ACTION] def _evaluate_conditions(self, conditions: List[PolicyCondition], event: ReliabilityEvent) -> bool: for cond in conditions: metric = cond.metric op = cond.operator thresh = cond.threshold val = getattr(event, metric, None) if val is None: return False if op == "gt": if not (val > thresh): return False elif op == "lt": if not (val < thresh): return False elif op == "eq": if not (abs(val - thresh) < 1e-6): return False elif op == "gte": if not (val >= thresh): return False elif op == "lte": if not (val <= thresh): return False else: return False return True def _is_rate_limited(self, key: str, policy: HealingPolicy, now: float) -> bool: if key not in self.execution_timestamps: return False one_hour_ago = now - 3600 recent = [ts for ts in self.execution_timestamps[key] if ts > one_hour_ago] self.execution_timestamps[key] = recent return len(recent) >= policy.max_executions_per_hour def _record_execution(self, key: str, ts: float): if key not in self.execution_timestamps: self.execution_timestamps[key] = [] self.execution_timestamps[key].append(ts) if len(self.execution_timestamps[key]) > self.max_execution_history: self.execution_timestamps[key] = self.execution_timestamps[key][-self.max_execution_history:]