petter2025 commited on
Commit
f5b825d
·
verified ·
1 Parent(s): 03ff9cc

Create policy_engine.py

Browse files
Files changed (1) hide show
  1. policy_engine.py +147 -0
policy_engine.py ADDED
@@ -0,0 +1,147 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Policy Engine for Automated Healing Actions.
3
+ """
4
+
5
+ import threading
6
+ import logging
7
+ from collections import OrderedDict
8
+ from typing import Dict, List, Optional, Any
9
+
10
+ from agentic_reliability_framework.core.models.event import HealingPolicy, HealingAction, ReliabilityEvent
11
+
12
+ logger = logging.getLogger(__name__)
13
+
14
+
15
+ DEFAULT_HEALING_POLICIES = [
16
+ HealingPolicy(
17
+ name="high_latency_restart",
18
+ conditions=[{"metric": "latency_p99", "operator": "gt", "threshold": 500.0}],
19
+ actions=[HealingAction.RESTART_CONTAINER, HealingAction.ALERT_TEAM],
20
+ priority=1,
21
+ cool_down_seconds=300,
22
+ max_executions_per_hour=5
23
+ ),
24
+ HealingPolicy(
25
+ name="critical_error_rate_rollback",
26
+ conditions=[{"metric": "error_rate", "operator": "gt", "threshold": 0.3}],
27
+ actions=[HealingAction.ROLLBACK, HealingAction.CIRCUIT_BREAKER, HealingAction.ALERT_TEAM],
28
+ priority=1,
29
+ cool_down_seconds=600,
30
+ max_executions_per_hour=3
31
+ ),
32
+ HealingPolicy(
33
+ name="high_error_rate_traffic_shift",
34
+ conditions=[{"metric": "error_rate", "operator": "gt", "threshold": 0.15}],
35
+ actions=[HealingAction.TRAFFIC_SHIFT, HealingAction.ALERT_TEAM],
36
+ priority=2,
37
+ cool_down_seconds=300,
38
+ max_executions_per_hour=5
39
+ ),
40
+ HealingPolicy(
41
+ name="resource_exhaustion_scale",
42
+ conditions=[
43
+ {"metric": "cpu_util", "operator": "gt", "threshold": 0.9},
44
+ {"metric": "memory_util", "operator": "gt", "threshold": 0.9}
45
+ ],
46
+ actions=[HealingAction.SCALE_OUT],
47
+ priority=2,
48
+ cool_down_seconds=600,
49
+ max_executions_per_hour=10
50
+ ),
51
+ HealingPolicy(
52
+ name="moderate_latency_circuit_breaker",
53
+ conditions=[{"metric": "latency_p99", "operator": "gt", "threshold": 300.0}],
54
+ actions=[HealingAction.CIRCUIT_BREAKER],
55
+ priority=3,
56
+ cool_down_seconds=180,
57
+ max_executions_per_hour=8
58
+ )
59
+ ]
60
+
61
+
62
+ class PolicyEngine:
63
+ """
64
+ Thread-safe policy engine with cooldown and rate limiting.
65
+ """
66
+
67
+ def __init__(
68
+ self,
69
+ policies: Optional[List[HealingPolicy]] = None,
70
+ max_cooldown_history: int = 10000,
71
+ max_execution_history: int = 1000
72
+ ):
73
+ self.policies = policies or DEFAULT_HEALING_POLICIES
74
+ self._lock = threading.RLock()
75
+ self.last_execution: Dict[str, float] = OrderedDict()
76
+ self.max_cooldown_history = max_cooldown_history
77
+ self.execution_timestamps: Dict[str, List[float]] = {}
78
+ self.max_execution_history = max_execution_history
79
+ self.policies = sorted(self.policies, key=lambda p: p.priority)
80
+ logger.info(f"Initialized PolicyEngine with {len(self.policies)} policies")
81
+
82
+ def evaluate_policies(self, event: ReliabilityEvent) -> List[HealingAction]:
83
+ applicable_actions = []
84
+ current_time = datetime.datetime.now(datetime.timezone.utc).timestamp()
85
+ for policy in self.policies:
86
+ if not policy.enabled:
87
+ continue
88
+ policy_key = f"{policy.name}_{event.component}"
89
+ with self._lock:
90
+ last_exec = self.last_execution.get(policy_key, 0)
91
+ if current_time - last_exec < policy.cool_down_seconds:
92
+ continue
93
+ if self._is_rate_limited(policy_key, policy, current_time):
94
+ continue
95
+ if self._evaluate_conditions(policy.conditions, event):
96
+ applicable_actions.extend(policy.actions)
97
+ self.last_execution[policy_key] = current_time
98
+ self._record_execution(policy_key, current_time)
99
+ seen = set()
100
+ unique = []
101
+ for a in applicable_actions:
102
+ if a not in seen:
103
+ seen.add(a)
104
+ unique.append(a)
105
+ return unique if unique else [HealingAction.NO_ACTION]
106
+
107
+ def _evaluate_conditions(self, conditions: List[Dict[str, Any]], event: ReliabilityEvent) -> bool:
108
+ for cond in conditions:
109
+ metric = cond["metric"]
110
+ op = cond["operator"]
111
+ thresh = cond["threshold"]
112
+ val = getattr(event, metric, None)
113
+ if val is None:
114
+ return False
115
+ if op == "gt":
116
+ if not (val > thresh):
117
+ return False
118
+ elif op == "lt":
119
+ if not (val < thresh):
120
+ return False
121
+ elif op == "eq":
122
+ if not (abs(val - thresh) < 1e-6):
123
+ return False
124
+ elif op == "gte":
125
+ if not (val >= thresh):
126
+ return False
127
+ elif op == "lte":
128
+ if not (val <= thresh):
129
+ return False
130
+ else:
131
+ return False
132
+ return True
133
+
134
+ def _is_rate_limited(self, key: str, policy: HealingPolicy, now: float) -> bool:
135
+ if key not in self.execution_timestamps:
136
+ return False
137
+ one_hour_ago = now - 3600
138
+ recent = [ts for ts in self.execution_timestamps[key] if ts > one_hour_ago]
139
+ self.execution_timestamps[key] = recent
140
+ return len(recent) >= policy.max_executions_per_hour
141
+
142
+ def _record_execution(self, key: str, ts: float):
143
+ if key not in self.execution_timestamps:
144
+ self.execution_timestamps[key] = []
145
+ self.execution_timestamps[key].append(ts)
146
+ if len(self.execution_timestamps[key]) > self.max_execution_history:
147
+ self.execution_timestamps[key] = self.execution_timestamps[key][-self.max_execution_history:]