petter2025 commited on
Commit
e40632a
·
verified ·
1 Parent(s): 811ceea

Delete healing_policies.py

Browse files
Files changed (1) hide show
  1. healing_policies.py +0 -375
healing_policies.py DELETED
@@ -1,375 +0,0 @@
1
- """
2
- Policy Engine for Automated Healing Actions
3
- Fixed version with thread safety and memory leak prevention
4
- """
5
-
6
- import datetime
7
- import threading
8
- import logging
9
- from collections import OrderedDict
10
- from typing import Dict, List, Optional
11
- from models import HealingPolicy, HealingAction, EventSeverity, ReliabilityEvent, PolicyCondition
12
-
13
- logger = logging.getLogger(__name__)
14
-
15
-
16
- # Default healing policies with structured conditions
17
- DEFAULT_HEALING_POLICIES = [
18
- HealingPolicy(
19
- name="high_latency_restart",
20
- conditions=[
21
- PolicyCondition(metric="latency_p99", operator="gt", threshold=500.0)
22
- ],
23
- actions=[HealingAction.RESTART_CONTAINER, HealingAction.ALERT_TEAM],
24
- priority=1,
25
- cool_down_seconds=300,
26
- max_executions_per_hour=5
27
- ),
28
- HealingPolicy(
29
- name="critical_error_rate_rollback",
30
- conditions=[
31
- PolicyCondition(metric="error_rate", operator="gt", threshold=0.3)
32
- ],
33
- actions=[HealingAction.ROLLBACK, HealingAction.CIRCUIT_BREAKER, HealingAction.ALERT_TEAM],
34
- priority=1,
35
- cool_down_seconds=600,
36
- max_executions_per_hour=3
37
- ),
38
- HealingPolicy(
39
- name="high_error_rate_traffic_shift",
40
- conditions=[
41
- PolicyCondition(metric="error_rate", operator="gt", threshold=0.15)
42
- ],
43
- actions=[HealingAction.TRAFFIC_SHIFT, HealingAction.ALERT_TEAM],
44
- priority=2,
45
- cool_down_seconds=300,
46
- max_executions_per_hour=5
47
- ),
48
- HealingPolicy(
49
- name="resource_exhaustion_scale",
50
- conditions=[
51
- PolicyCondition(metric="cpu_util", operator="gt", threshold=0.9),
52
- PolicyCondition(metric="memory_util", operator="gt", threshold=0.9)
53
- ],
54
- actions=[HealingAction.SCALE_OUT],
55
- priority=2,
56
- cool_down_seconds=600,
57
- max_executions_per_hour=10
58
- ),
59
- HealingPolicy(
60
- name="moderate_latency_circuit_breaker",
61
- conditions=[
62
- PolicyCondition(metric="latency_p99", operator="gt", threshold=300.0)
63
- ],
64
- actions=[HealingAction.CIRCUIT_BREAKER],
65
- priority=3,
66
- cool_down_seconds=180,
67
- max_executions_per_hour=8
68
- )
69
- ]
70
-
71
-
72
- class PolicyEngine:
73
- """
74
- Thread-safe policy engine with cooldown and rate limiting
75
-
76
- CRITICAL FIXES:
77
- - Added RLock for thread safety
78
- - Fixed cooldown race condition (atomic check + update)
79
- - Implemented LRU eviction to prevent memory leak
80
- - Added priority-based policy evaluation
81
- - Added rate limiting per policy
82
- """
83
-
84
- def __init__(
85
- self,
86
- policies: Optional[List[HealingPolicy]] = None,
87
- max_cooldown_history: int = 10000,
88
- max_execution_history: int = 1000
89
- ):
90
- """
91
- Initialize policy engine
92
-
93
- Args:
94
- policies: List of healing policies (uses defaults if None)
95
- max_cooldown_history: Maximum cooldown entries to keep (LRU)
96
- max_execution_history: Maximum execution history per policy
97
- """
98
- self.policies = policies or DEFAULT_HEALING_POLICIES
99
-
100
- # FIXED: Added RLock for thread safety
101
- self._lock = threading.RLock()
102
-
103
- # FIXED: Use OrderedDict for LRU eviction (prevents memory leak)
104
- self.last_execution: OrderedDict[str, float] = OrderedDict()
105
- self.max_cooldown_history = max_cooldown_history
106
-
107
- # Rate limiting: track executions per hour per policy
108
- self.execution_timestamps: Dict[str, List[float]] = {}
109
- self.max_execution_history = max_execution_history
110
-
111
- # Sort policies by priority (lower number = higher priority)
112
- self.policies = sorted(self.policies, key=lambda p: p.priority)
113
-
114
- logger.info(
115
- f"Initialized PolicyEngine with {len(self.policies)} policies, "
116
- f"max_cooldown_history={max_cooldown_history}"
117
- )
118
-
119
- def evaluate_policies(self, event: ReliabilityEvent) -> List[HealingAction]:
120
- """
121
- Evaluate all policies against the event and return matching actions
122
-
123
- FIXED: Atomic check + update under lock (prevents race condition)
124
- FIXED: Priority-based evaluation
125
-
126
- Args:
127
- event: Reliability event to evaluate
128
-
129
- Returns:
130
- List of healing actions to execute
131
- """
132
- applicable_actions = []
133
- current_time = datetime.datetime.now(datetime.timezone.utc).timestamp()
134
-
135
- # Evaluate policies in priority order
136
- for policy in self.policies:
137
- if not policy.enabled:
138
- continue
139
-
140
- policy_key = f"{policy.name}_{event.component}"
141
-
142
- # FIXED: All cooldown operations under lock (atomic)
143
- with self._lock:
144
- # Check cooldown
145
- last_exec = self.last_execution.get(policy_key, 0)
146
-
147
- if current_time - last_exec < policy.cool_down_seconds:
148
- logger.debug(
149
- f"Policy {policy.name} for {event.component} on cooldown "
150
- f"({current_time - last_exec:.0f}s / {policy.cool_down_seconds}s)"
151
- )
152
- continue
153
-
154
- # Check rate limit
155
- if self._is_rate_limited(policy_key, policy, current_time):
156
- logger.warning(
157
- f"Policy {policy.name} for {event.component} rate limited "
158
- f"(max {policy.max_executions_per_hour}/hour)"
159
- )
160
- continue
161
-
162
- # FIXED: Only update timestamp if conditions match
163
- if self._evaluate_conditions(policy.conditions, event):
164
- applicable_actions.extend(policy.actions)
165
-
166
- # Update cooldown timestamp (INSIDE lock, AFTER condition check)
167
- self._update_cooldown(policy_key, current_time)
168
-
169
- # Track execution for rate limiting
170
- self._record_execution(policy_key, current_time)
171
-
172
- logger.info(
173
- f"Policy {policy.name} triggered for {event.component}: "
174
- f"actions={[a.value for a in policy.actions]}"
175
- )
176
-
177
- # Deduplicate actions while preserving order
178
- seen = set()
179
- unique_actions = []
180
- for action in applicable_actions:
181
- if action not in seen:
182
- seen.add(action)
183
- unique_actions.append(action)
184
-
185
- return unique_actions if unique_actions else [HealingAction.NO_ACTION]
186
-
187
- def _evaluate_conditions(
188
- self,
189
- conditions: List[PolicyCondition],
190
- event: ReliabilityEvent
191
- ) -> bool:
192
- """
193
- Evaluate all conditions against event (AND logic)
194
-
195
- Args:
196
- conditions: List of policy conditions
197
- event: Reliability event
198
-
199
- Returns:
200
- True if all conditions match, False otherwise
201
- """
202
- for condition in conditions:
203
- # Get event value
204
- event_value = getattr(event, condition.metric, None)
205
-
206
- # Handle None values
207
- if event_value is None:
208
- logger.debug(
209
- f"Condition failed: {condition.metric} is None on event"
210
- )
211
- return False
212
-
213
- # Evaluate operator
214
- if not self._compare_values(
215
- event_value,
216
- condition.operator,
217
- condition.threshold
218
- ):
219
- logger.debug(
220
- f"Condition failed: {event_value} {condition.operator} "
221
- f"{condition.threshold} = False"
222
- )
223
- return False
224
-
225
- return True
226
-
227
- def _compare_values(
228
- self,
229
- event_value: float,
230
- operator: str,
231
- threshold: float
232
- ) -> bool:
233
- """
234
- Compare values based on operator with type safety
235
-
236
- FIXED: Added type checking and better error handling
237
-
238
- Args:
239
- event_value: Value from event
240
- operator: Comparison operator
241
- threshold: Threshold value
242
-
243
- Returns:
244
- Comparison result
245
- """
246
- try:
247
- # Type validation
248
- if not isinstance(event_value, (int, float)):
249
- logger.error(
250
- f"Invalid event_value type: {type(event_value)}, expected number"
251
- )
252
- return False
253
-
254
- if not isinstance(threshold, (int, float)):
255
- logger.error(
256
- f"Invalid threshold type: {type(threshold)}, expected number"
257
- )
258
- return False
259
-
260
- # Operator evaluation
261
- if operator == "gt":
262
- return event_value > threshold
263
- elif operator == "lt":
264
- return event_value < threshold
265
- elif operator == "eq":
266
- return abs(event_value - threshold) < 1e-6 # Float equality
267
- elif operator == "gte":
268
- return event_value >= threshold
269
- elif operator == "lte":
270
- return event_value <= threshold
271
- else:
272
- logger.error(f"Unknown operator: {operator}")
273
- return False
274
-
275
- except (TypeError, ValueError) as e:
276
- logger.error(f"Comparison error: {e}", exc_info=True)
277
- return False
278
-
279
- def _update_cooldown(self, policy_key: str, timestamp: float) -> None:
280
- """
281
- Update cooldown timestamp with LRU eviction
282
-
283
- FIXED: Prevents unbounded memory growth
284
-
285
- Args:
286
- policy_key: Policy identifier
287
- timestamp: Current timestamp
288
- """
289
- # Update timestamp
290
- self.last_execution[policy_key] = timestamp
291
-
292
- # Move to end (most recently used)
293
- self.last_execution.move_to_end(policy_key)
294
-
295
- # LRU eviction if too large
296
- while len(self.last_execution) > self.max_cooldown_history:
297
- evicted_key, _ = self.last_execution.popitem(last=False)
298
- logger.debug(f"Evicted cooldown entry: {evicted_key}")
299
-
300
- def _is_rate_limited(
301
- self,
302
- policy_key: str,
303
- policy: HealingPolicy,
304
- current_time: float
305
- ) -> bool:
306
- """
307
- Check if policy is rate limited
308
-
309
- Args:
310
- policy_key: Policy identifier
311
- policy: Policy configuration
312
- current_time: Current timestamp
313
-
314
- Returns:
315
- True if rate limited, False otherwise
316
- """
317
- if policy_key not in self.execution_timestamps:
318
- return False
319
-
320
- # Remove executions older than 1 hour
321
- one_hour_ago = current_time - 3600
322
- recent_executions = [
323
- ts for ts in self.execution_timestamps[policy_key]
324
- if ts > one_hour_ago
325
- ]
326
-
327
- self.execution_timestamps[policy_key] = recent_executions
328
-
329
- # Check rate limit
330
- return len(recent_executions) >= policy.max_executions_per_hour
331
-
332
- def _record_execution(self, policy_key: str, timestamp: float) -> None:
333
- """
334
- Record policy execution for rate limiting
335
-
336
- Args:
337
- policy_key: Policy identifier
338
- timestamp: Execution timestamp
339
- """
340
- if policy_key not in self.execution_timestamps:
341
- self.execution_timestamps[policy_key] = []
342
-
343
- self.execution_timestamps[policy_key].append(timestamp)
344
-
345
- # Limit history size (memory management)
346
- if len(self.execution_timestamps[policy_key]) > self.max_execution_history:
347
- self.execution_timestamps[policy_key] = \
348
- self.execution_timestamps[policy_key][-self.max_execution_history:]
349
-
350
- def get_policy_stats(self) -> Dict[str, Dict]:
351
- """
352
- Get statistics about policy execution
353
-
354
- Returns:
355
- Dictionary of policy statistics
356
- """
357
- with self._lock:
358
- stats = {}
359
-
360
- for policy in self.policies:
361
- policy_stats = {
362
- "name": policy.name,
363
- "priority": policy.priority,
364
- "enabled": policy.enabled,
365
- "cooldown_seconds": policy.cool_down_seconds,
366
- "max_per_hour": policy.max_executions_per_hour,
367
- "total_components": sum(
368
- 1 for key in self.last_execution.keys()
369
- if key.startswith(f"{policy.name}_")
370
- )
371
- }
372
-
373
- stats[policy.name] = policy_stats
374
-
375
- return stats