| """ |
| Policy Engine for Automated Healing Actions |
| Fixed version with thread safety and memory leak prevention |
| """ |
|
|
| import datetime |
| import threading |
| import logging |
| from collections import OrderedDict |
| from typing import Dict, List, Optional |
| from models import HealingPolicy, HealingAction, EventSeverity, ReliabilityEvent, PolicyCondition |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| DEFAULT_HEALING_POLICIES = [ |
| HealingPolicy( |
| name="high_latency_restart", |
| conditions=[ |
| PolicyCondition(metric="latency_p99", operator="gt", threshold=500.0) |
| ], |
| actions=[HealingAction.RESTART_CONTAINER, HealingAction.ALERT_TEAM], |
| priority=1, |
| cool_down_seconds=300, |
| max_executions_per_hour=5 |
| ), |
| HealingPolicy( |
| name="critical_error_rate_rollback", |
| conditions=[ |
| PolicyCondition(metric="error_rate", operator="gt", threshold=0.3) |
| ], |
| actions=[HealingAction.ROLLBACK, HealingAction.CIRCUIT_BREAKER, HealingAction.ALERT_TEAM], |
| priority=1, |
| cool_down_seconds=600, |
| max_executions_per_hour=3 |
| ), |
| HealingPolicy( |
| name="high_error_rate_traffic_shift", |
| conditions=[ |
| PolicyCondition(metric="error_rate", operator="gt", threshold=0.15) |
| ], |
| actions=[HealingAction.TRAFFIC_SHIFT, HealingAction.ALERT_TEAM], |
| priority=2, |
| cool_down_seconds=300, |
| max_executions_per_hour=5 |
| ), |
| HealingPolicy( |
| name="resource_exhaustion_scale", |
| conditions=[ |
| PolicyCondition(metric="cpu_util", operator="gt", threshold=0.9), |
| PolicyCondition(metric="memory_util", operator="gt", threshold=0.9) |
| ], |
| actions=[HealingAction.SCALE_OUT], |
| priority=2, |
| cool_down_seconds=600, |
| max_executions_per_hour=10 |
| ), |
| HealingPolicy( |
| name="moderate_latency_circuit_breaker", |
| conditions=[ |
| PolicyCondition(metric="latency_p99", operator="gt", threshold=300.0) |
| ], |
| actions=[HealingAction.CIRCUIT_BREAKER], |
| priority=3, |
| cool_down_seconds=180, |
| max_executions_per_hour=8 |
| ) |
| ] |
|
|
|
|
| class PolicyEngine: |
| """ |
| Thread-safe policy engine with cooldown and rate limiting |
| |
| CRITICAL FIXES: |
| - Added RLock for thread safety |
| - Fixed cooldown race condition (atomic check + update) |
| - Implemented LRU eviction to prevent memory leak |
| - Added priority-based policy evaluation |
| - Added rate limiting per policy |
| """ |
| |
| def __init__( |
| self, |
| policies: Optional[List[HealingPolicy]] = None, |
| max_cooldown_history: int = 10000, |
| max_execution_history: int = 1000 |
| ): |
| """ |
| Initialize policy engine |
| |
| Args: |
| policies: List of healing policies (uses defaults if None) |
| max_cooldown_history: Maximum cooldown entries to keep (LRU) |
| max_execution_history: Maximum execution history per policy |
| """ |
| self.policies = policies or DEFAULT_HEALING_POLICIES |
| |
| |
| self._lock = threading.RLock() |
| |
| |
| self.last_execution: OrderedDict[str, float] = OrderedDict() |
| self.max_cooldown_history = max_cooldown_history |
| |
| |
| self.execution_timestamps: Dict[str, List[float]] = {} |
| self.max_execution_history = max_execution_history |
| |
| |
| self.policies = sorted(self.policies, key=lambda p: p.priority) |
| |
| logger.info( |
| f"Initialized PolicyEngine with {len(self.policies)} policies, " |
| f"max_cooldown_history={max_cooldown_history}" |
| ) |
| |
| def evaluate_policies(self, event: ReliabilityEvent) -> List[HealingAction]: |
| """ |
| Evaluate all policies against the event and return matching actions |
| |
| FIXED: Atomic check + update under lock (prevents race condition) |
| FIXED: Priority-based evaluation |
| |
| Args: |
| event: Reliability event to evaluate |
| |
| Returns: |
| List of healing actions to execute |
| """ |
| applicable_actions = [] |
| current_time = datetime.datetime.now(datetime.timezone.utc).timestamp() |
| |
| |
| for policy in self.policies: |
| if not policy.enabled: |
| continue |
| |
| policy_key = f"{policy.name}_{event.component}" |
| |
| |
| with self._lock: |
| |
| last_exec = self.last_execution.get(policy_key, 0) |
| |
| if current_time - last_exec < policy.cool_down_seconds: |
| logger.debug( |
| f"Policy {policy.name} for {event.component} on cooldown " |
| f"({current_time - last_exec:.0f}s / {policy.cool_down_seconds}s)" |
| ) |
| continue |
| |
| |
| if self._is_rate_limited(policy_key, policy, current_time): |
| logger.warning( |
| f"Policy {policy.name} for {event.component} rate limited " |
| f"(max {policy.max_executions_per_hour}/hour)" |
| ) |
| continue |
| |
| |
| if self._evaluate_conditions(policy.conditions, event): |
| applicable_actions.extend(policy.actions) |
| |
| |
| self._update_cooldown(policy_key, current_time) |
| |
| |
| self._record_execution(policy_key, current_time) |
| |
| logger.info( |
| f"Policy {policy.name} triggered for {event.component}: " |
| f"actions={[a.value for a in policy.actions]}" |
| ) |
| |
| |
| seen = set() |
| unique_actions = [] |
| for action in applicable_actions: |
| if action not in seen: |
| seen.add(action) |
| unique_actions.append(action) |
| |
| return unique_actions if unique_actions else [HealingAction.NO_ACTION] |
| |
| def _evaluate_conditions( |
| self, |
| conditions: List[PolicyCondition], |
| event: ReliabilityEvent |
| ) -> bool: |
| """ |
| Evaluate all conditions against event (AND logic) |
| |
| Args: |
| conditions: List of policy conditions |
| event: Reliability event |
| |
| Returns: |
| True if all conditions match, False otherwise |
| """ |
| for condition in conditions: |
| |
| event_value = getattr(event, condition.metric, None) |
| |
| |
| if event_value is None: |
| logger.debug( |
| f"Condition failed: {condition.metric} is None on event" |
| ) |
| return False |
| |
| |
| if not self._compare_values( |
| event_value, |
| condition.operator, |
| condition.threshold |
| ): |
| logger.debug( |
| f"Condition failed: {event_value} {condition.operator} " |
| f"{condition.threshold} = False" |
| ) |
| return False |
| |
| return True |
| |
| def _compare_values( |
| self, |
| event_value: float, |
| operator: str, |
| threshold: float |
| ) -> bool: |
| """ |
| Compare values based on operator with type safety |
| |
| FIXED: Added type checking and better error handling |
| |
| Args: |
| event_value: Value from event |
| operator: Comparison operator |
| threshold: Threshold value |
| |
| Returns: |
| Comparison result |
| """ |
| try: |
| |
| if not isinstance(event_value, (int, float)): |
| logger.error( |
| f"Invalid event_value type: {type(event_value)}, expected number" |
| ) |
| return False |
| |
| if not isinstance(threshold, (int, float)): |
| logger.error( |
| f"Invalid threshold type: {type(threshold)}, expected number" |
| ) |
| return False |
| |
| |
| if operator == "gt": |
| return event_value > threshold |
| elif operator == "lt": |
| return event_value < threshold |
| elif operator == "eq": |
| return abs(event_value - threshold) < 1e-6 |
| elif operator == "gte": |
| return event_value >= threshold |
| elif operator == "lte": |
| return event_value <= threshold |
| else: |
| logger.error(f"Unknown operator: {operator}") |
| return False |
| |
| except (TypeError, ValueError) as e: |
| logger.error(f"Comparison error: {e}", exc_info=True) |
| return False |
| |
| def _update_cooldown(self, policy_key: str, timestamp: float) -> None: |
| """ |
| Update cooldown timestamp with LRU eviction |
| |
| FIXED: Prevents unbounded memory growth |
| |
| Args: |
| policy_key: Policy identifier |
| timestamp: Current timestamp |
| """ |
| |
| self.last_execution[policy_key] = timestamp |
| |
| |
| self.last_execution.move_to_end(policy_key) |
| |
| |
| while len(self.last_execution) > self.max_cooldown_history: |
| evicted_key, _ = self.last_execution.popitem(last=False) |
| logger.debug(f"Evicted cooldown entry: {evicted_key}") |
| |
| def _is_rate_limited( |
| self, |
| policy_key: str, |
| policy: HealingPolicy, |
| current_time: float |
| ) -> bool: |
| """ |
| Check if policy is rate limited |
| |
| Args: |
| policy_key: Policy identifier |
| policy: Policy configuration |
| current_time: Current timestamp |
| |
| Returns: |
| True if rate limited, False otherwise |
| """ |
| if policy_key not in self.execution_timestamps: |
| return False |
| |
| |
| one_hour_ago = current_time - 3600 |
| recent_executions = [ |
| ts for ts in self.execution_timestamps[policy_key] |
| if ts > one_hour_ago |
| ] |
| |
| self.execution_timestamps[policy_key] = recent_executions |
| |
| |
| return len(recent_executions) >= policy.max_executions_per_hour |
| |
| def _record_execution(self, policy_key: str, timestamp: float) -> None: |
| """ |
| Record policy execution for rate limiting |
| |
| Args: |
| policy_key: Policy identifier |
| timestamp: Execution timestamp |
| """ |
| if policy_key not in self.execution_timestamps: |
| self.execution_timestamps[policy_key] = [] |
| |
| self.execution_timestamps[policy_key].append(timestamp) |
| |
| |
| if len(self.execution_timestamps[policy_key]) > self.max_execution_history: |
| self.execution_timestamps[policy_key] = \ |
| self.execution_timestamps[policy_key][-self.max_execution_history:] |
| |
| def get_policy_stats(self) -> Dict[str, Dict]: |
| """ |
| Get statistics about policy execution |
| |
| Returns: |
| Dictionary of policy statistics |
| """ |
| with self._lock: |
| stats = {} |
| |
| for policy in self.policies: |
| policy_stats = { |
| "name": policy.name, |
| "priority": policy.priority, |
| "enabled": policy.enabled, |
| "cooldown_seconds": policy.cool_down_seconds, |
| "max_per_hour": policy.max_executions_per_hour, |
| "total_components": sum( |
| 1 for key in self.last_execution.keys() |
| if key.startswith(f"{policy.name}_") |
| ) |
| } |
| |
| stats[policy.name] = policy_stats |
| |
| return stats |