| """ |
| Policy Engine for Automated Healing Actions. |
| """ |
|
|
| import threading |
| import logging |
| import datetime |
| from collections import OrderedDict |
| from typing import Dict, List, Optional, Any |
|
|
| from agentic_reliability_framework.core.models.event import HealingPolicy, HealingAction, PolicyCondition, ReliabilityEvent |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| DEFAULT_HEALING_POLICIES = [ |
| HealingPolicy( |
| name="high_latency_restart", |
| conditions=[PolicyCondition(metric="latency_p99", operator="gt", threshold=500.0)], |
| actions=[HealingAction.RESTART_CONTAINER, HealingAction.ALERT_TEAM], |
| priority=1, |
| cool_down_seconds=300, |
| max_executions_per_hour=5 |
| ), |
| HealingPolicy( |
| name="critical_error_rate_rollback", |
| conditions=[PolicyCondition(metric="error_rate", operator="gt", threshold=0.3)], |
| actions=[HealingAction.ROLLBACK, HealingAction.CIRCUIT_BREAKER, HealingAction.ALERT_TEAM], |
| priority=1, |
| cool_down_seconds=600, |
| max_executions_per_hour=3 |
| ), |
| HealingPolicy( |
| name="high_error_rate_traffic_shift", |
| conditions=[PolicyCondition(metric="error_rate", operator="gt", threshold=0.15)], |
| actions=[HealingAction.TRAFFIC_SHIFT, HealingAction.ALERT_TEAM], |
| priority=2, |
| cool_down_seconds=300, |
| max_executions_per_hour=5 |
| ), |
| HealingPolicy( |
| name="resource_exhaustion_scale", |
| conditions=[ |
| PolicyCondition(metric="cpu_util", operator="gt", threshold=0.9), |
| PolicyCondition(metric="memory_util", operator="gt", threshold=0.9) |
| ], |
| actions=[HealingAction.SCALE_OUT], |
| priority=2, |
| cool_down_seconds=600, |
| max_executions_per_hour=10 |
| ), |
| HealingPolicy( |
| name="moderate_latency_circuit_breaker", |
| conditions=[PolicyCondition(metric="latency_p99", operator="gt", threshold=300.0)], |
| actions=[HealingAction.CIRCUIT_BREAKER], |
| priority=3, |
| cool_down_seconds=180, |
| max_executions_per_hour=8 |
| ) |
| ] |
|
|
|
|
| class PolicyEngine: |
| """ |
| Thread‑safe policy engine with cooldown and rate limiting. |
| Policies are evaluated in priority order. Each policy has: |
| - conditions (AND logic) |
| - cooldown per (policy, component) |
| - rate limit per hour |
| The engine maintains an LRU cache of last execution timestamps |
| (using OrderedDict) to bound memory usage. |
| """ |
|
|
| def __init__( |
| self, |
| policies: Optional[List[HealingPolicy]] = None, |
| max_cooldown_history: int = 10000, |
| max_execution_history: int = 1000 |
| ): |
| """ |
| Args: |
| policies: List of HealingPolicy objects. If None, DEFAULT_HEALING_POLICIES used. |
| max_cooldown_history: Maximum number of (policy, component) entries to keep. |
| max_execution_history: Maximum number of timestamps stored per policy for rate limiting. |
| """ |
| self.policies = policies or DEFAULT_HEALING_POLICIES |
| self._lock = threading.RLock() |
| |
| self.last_execution: OrderedDict[str, float] = OrderedDict() |
| self.max_cooldown_history = max_cooldown_history |
| self.execution_timestamps: Dict[str, List[float]] = {} |
| self.max_execution_history = max_execution_history |
| self.policies = sorted(self.policies, key=lambda p: p.priority) |
| logger.info(f"Initialized PolicyEngine with {len(self.policies)} policies") |
|
|
| def evaluate_policies(self, event: ReliabilityEvent) -> List[HealingAction]: |
| """ |
| Evaluate all policies against the event and return the set of actions |
| triggered (deduplicated). Returns [NO_ACTION] if none triggered. |
| """ |
| applicable_actions = [] |
| current_time = datetime.datetime.now(datetime.timezone.utc).timestamp() |
| for policy in self.policies: |
| if not policy.enabled: |
| continue |
| policy_key = f"{policy.name}_{event.component}" |
| with self._lock: |
| last_exec = self.last_execution.get(policy_key, 0) |
| if current_time - last_exec < policy.cool_down_seconds: |
| continue |
| if self._is_rate_limited(policy_key, policy, current_time): |
| continue |
| if self._evaluate_conditions(policy.conditions, event): |
| applicable_actions.extend(policy.actions) |
| |
| self.last_execution[policy_key] = current_time |
| self.last_execution.move_to_end(policy_key) |
| |
| if len(self.last_execution) > self.max_cooldown_history: |
| self.last_execution.popitem(last=False) |
| self._record_execution(policy_key, current_time) |
|
|
| |
| seen = set() |
| unique = [] |
| for a in applicable_actions: |
| if a not in seen: |
| seen.add(a) |
| unique.append(a) |
| return unique if unique else [HealingAction.NO_ACTION] |
|
|
| def _evaluate_conditions(self, conditions: List[PolicyCondition], event: ReliabilityEvent) -> bool: |
| """Return True if all conditions are satisfied.""" |
| for cond in conditions: |
| metric = cond.metric |
| op = cond.operator |
| thresh = cond.threshold |
| val = getattr(event, metric, None) |
| if val is None: |
| return False |
| if op == "gt": |
| if not (val > thresh): |
| return False |
| elif op == "lt": |
| if not (val < thresh): |
| return False |
| elif op == "eq": |
| if not (abs(val - thresh) < 1e-6): |
| return False |
| elif op == "gte": |
| if not (val >= thresh): |
| return False |
| elif op == "lte": |
| if not (val <= thresh): |
| return False |
| else: |
| return False |
| return True |
|
|
| def _is_rate_limited(self, key: str, policy: HealingPolicy, now: float) -> bool: |
| """Check if the policy has exceeded its hourly execution limit.""" |
| if key not in self.execution_timestamps: |
| return False |
| one_hour_ago = now - 3600 |
| recent = [ts for ts in self.execution_timestamps[key] if ts > one_hour_ago] |
| self.execution_timestamps[key] = recent |
| return len(recent) >= policy.max_executions_per_hour |
|
|
| def _record_execution(self, key: str, ts: float): |
| """Record an execution timestamp for rate limiting.""" |
| if key not in self.execution_timestamps: |
| self.execution_timestamps[key] = [] |
| self.execution_timestamps[key].append(ts) |
| if len(self.execution_timestamps[key]) > self.max_execution_history: |
| self.execution_timestamps[key] = self.execution_timestamps[key][-self.max_execution_history:] |
|
|
| |
| def evaluate(self, event_type: str, severity: str, component: str) -> List[Dict[str, Any]]: |
| """ |
| Simplified policy evaluation for the governance demo. |
| Returns a list of recommended actions based on severity and event type. |
| Each action is a dict with keys: policy, action, reason. |
| """ |
| actions = [] |
| if severity == "critical": |
| actions.append({ |
| "policy": "POL-002", |
| "action": "isolate_affected", |
| "reason": "Critical failure detected" |
| }) |
| elif severity == "high": |
| actions.append({ |
| "policy": "POL-004", |
| "action": "require_approval", |
| "reason": "High risk" |
| }) |
| elif severity == "medium" and event_type == "text_generation": |
| actions.append({ |
| "policy": "POL-001", |
| "action": "regenerate", |
| "reason": "Low confidence" |
| }) |
| |
| return actions |