File size: 13,071 Bytes
9ec7605
 
 
 
 
9186179
9ec7605
 
 
 
 
 
 
00342ac
9ec7605
 
00342ac
 
 
9ec7605
 
 
 
 
 
 
00342ac
 
9ec7605
 
 
 
 
 
 
 
00342ac
 
9ec7605
 
 
 
 
 
 
 
00342ac
 
9ec7605
 
 
 
 
 
 
 
 
9186179
 
9ec7605
 
 
 
 
 
 
 
00342ac
 
 
9ec7605
00342ac
9ec7605
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
00342ac
9ec7605
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
00342ac
9ec7605
 
 
 
 
 
 
 
 
 
 
 
 
00342ac
9ec7605
00342ac
9ec7605
00342ac
 
 
9ec7605
00342ac
 
9ec7605
 
 
 
00342ac
9ec7605
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
00342ac
9ec7605
00342ac
 
 
 
 
 
9ec7605
 
00342ac
9ec7605
 
 
 
 
 
 
 
 
 
 
00342ac
9ec7605
 
 
 
 
 
00342ac
9ec7605
 
 
 
 
00342ac
9ec7605
 
 
 
 
 
 
 
 
 
 
 
 
00342ac
 
9ec7605
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
00342ac
9ec7605
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
00342ac
9ec7605
00342ac
9ec7605
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
"""
Policy Engine for Automated Healing Actions
Fixed version with thread safety and memory leak prevention
"""

import datetime
import threading
import logging
from collections import OrderedDict
from typing import Dict, List, Optional
from models import HealingPolicy, HealingAction, EventSeverity, ReliabilityEvent, PolicyCondition

logger = logging.getLogger(__name__)


# Default healing policies with structured conditions
DEFAULT_HEALING_POLICIES = [
    HealingPolicy(
        name="high_latency_restart",
        conditions=[
            PolicyCondition(metric="latency_p99", operator="gt", threshold=500.0)
        ],
        actions=[HealingAction.RESTART_CONTAINER, HealingAction.ALERT_TEAM],
        priority=1,
        cool_down_seconds=300,
        max_executions_per_hour=5
    ),
    HealingPolicy(
        name="critical_error_rate_rollback",
        conditions=[
            PolicyCondition(metric="error_rate", operator="gt", threshold=0.3)
        ],
        actions=[HealingAction.ROLLBACK, HealingAction.CIRCUIT_BREAKER, HealingAction.ALERT_TEAM],
        priority=1,
        cool_down_seconds=600,
        max_executions_per_hour=3
    ),
    HealingPolicy(
        name="high_error_rate_traffic_shift",
        conditions=[
            PolicyCondition(metric="error_rate", operator="gt", threshold=0.15)
        ],
        actions=[HealingAction.TRAFFIC_SHIFT, HealingAction.ALERT_TEAM],
        priority=2,
        cool_down_seconds=300,
        max_executions_per_hour=5
    ),
    HealingPolicy(
        name="resource_exhaustion_scale",
        conditions=[
            PolicyCondition(metric="cpu_util", operator="gt", threshold=0.9),
            PolicyCondition(metric="memory_util", operator="gt", threshold=0.9)
        ],
        actions=[HealingAction.SCALE_OUT],
        priority=2,
        cool_down_seconds=600,
        max_executions_per_hour=10
    ),
    HealingPolicy(
        name="moderate_latency_circuit_breaker",
        conditions=[
            PolicyCondition(metric="latency_p99", operator="gt", threshold=300.0)
        ],
        actions=[HealingAction.CIRCUIT_BREAKER],
        priority=3,
        cool_down_seconds=180,
        max_executions_per_hour=8
    )
]


class PolicyEngine:
    """
    Thread-safe policy engine with cooldown and rate limiting
    
    CRITICAL FIXES:
    - Added RLock for thread safety
    - Fixed cooldown race condition (atomic check + update)
    - Implemented LRU eviction to prevent memory leak
    - Added priority-based policy evaluation
    - Added rate limiting per policy
    """
    
    def __init__(
        self,
        policies: Optional[List[HealingPolicy]] = None,
        max_cooldown_history: int = 10000,
        max_execution_history: int = 1000
    ):
        """
        Initialize policy engine
        
        Args:
            policies: List of healing policies (uses defaults if None)
            max_cooldown_history: Maximum cooldown entries to keep (LRU)
            max_execution_history: Maximum execution history per policy
        """
        self.policies = policies or DEFAULT_HEALING_POLICIES
        
        # FIXED: Added RLock for thread safety
        self._lock = threading.RLock()
        
        # FIXED: Use OrderedDict for LRU eviction (prevents memory leak)
        self.last_execution: OrderedDict[str, float] = OrderedDict()
        self.max_cooldown_history = max_cooldown_history
        
        # Rate limiting: track executions per hour per policy
        self.execution_timestamps: Dict[str, List[float]] = {}
        self.max_execution_history = max_execution_history
        
        # Sort policies by priority (lower number = higher priority)
        self.policies = sorted(self.policies, key=lambda p: p.priority)
        
        logger.info(
            f"Initialized PolicyEngine with {len(self.policies)} policies, "
            f"max_cooldown_history={max_cooldown_history}"
        )
    
    def evaluate_policies(self, event: ReliabilityEvent) -> List[HealingAction]:
        """
        Evaluate all policies against the event and return matching actions
        
        FIXED: Atomic check + update under lock (prevents race condition)
        FIXED: Priority-based evaluation
        
        Args:
            event: Reliability event to evaluate
            
        Returns:
            List of healing actions to execute
        """
        applicable_actions = []
        current_time = datetime.datetime.now(datetime.timezone.utc).timestamp()
        
        # Evaluate policies in priority order
        for policy in self.policies:
            if not policy.enabled:
                continue
            
            policy_key = f"{policy.name}_{event.component}"
            
            # FIXED: All cooldown operations under lock (atomic)
            with self._lock:
                # Check cooldown
                last_exec = self.last_execution.get(policy_key, 0)
                
                if current_time - last_exec < policy.cool_down_seconds:
                    logger.debug(
                        f"Policy {policy.name} for {event.component} on cooldown "
                        f"({current_time - last_exec:.0f}s / {policy.cool_down_seconds}s)"
                    )
                    continue
                
                # Check rate limit
                if self._is_rate_limited(policy_key, policy, current_time):
                    logger.warning(
                        f"Policy {policy.name} for {event.component} rate limited "
                        f"(max {policy.max_executions_per_hour}/hour)"
                    )
                    continue
                
                # FIXED: Only update timestamp if conditions match
                if self._evaluate_conditions(policy.conditions, event):
                    applicable_actions.extend(policy.actions)
                    
                    # Update cooldown timestamp (INSIDE lock, AFTER condition check)
                    self._update_cooldown(policy_key, current_time)
                    
                    # Track execution for rate limiting
                    self._record_execution(policy_key, current_time)
                    
                    logger.info(
                        f"Policy {policy.name} triggered for {event.component}: "
                        f"actions={[a.value for a in policy.actions]}"
                    )
        
        # Deduplicate actions while preserving order
        seen = set()
        unique_actions = []
        for action in applicable_actions:
            if action not in seen:
                seen.add(action)
                unique_actions.append(action)
        
        return unique_actions if unique_actions else [HealingAction.NO_ACTION]
    
    def _evaluate_conditions(
        self,
        conditions: List[PolicyCondition],
        event: ReliabilityEvent
    ) -> bool:
        """
        Evaluate all conditions against event (AND logic)
        
        Args:
            conditions: List of policy conditions
            event: Reliability event
            
        Returns:
            True if all conditions match, False otherwise
        """
        for condition in conditions:
            # Get event value
            event_value = getattr(event, condition.metric, None)
            
            # Handle None values
            if event_value is None:
                logger.debug(
                    f"Condition failed: {condition.metric} is None on event"
                )
                return False
            
            # Evaluate operator
            if not self._compare_values(
                event_value,
                condition.operator,
                condition.threshold
            ):
                logger.debug(
                    f"Condition failed: {event_value} {condition.operator} "
                    f"{condition.threshold} = False"
                )
                return False
        
        return True
    
    def _compare_values(
        self,
        event_value: float,
        operator: str,
        threshold: float
    ) -> bool:
        """
        Compare values based on operator with type safety
        
        FIXED: Added type checking and better error handling
        
        Args:
            event_value: Value from event
            operator: Comparison operator
            threshold: Threshold value
            
        Returns:
            Comparison result
        """
        try:
            # Type validation
            if not isinstance(event_value, (int, float)):
                logger.error(
                    f"Invalid event_value type: {type(event_value)}, expected number"
                )
                return False
            
            if not isinstance(threshold, (int, float)):
                logger.error(
                    f"Invalid threshold type: {type(threshold)}, expected number"
                )
                return False
            
            # Operator evaluation
            if operator == "gt":
                return event_value > threshold
            elif operator == "lt":
                return event_value < threshold
            elif operator == "eq":
                return abs(event_value - threshold) < 1e-6  # Float equality
            elif operator == "gte":
                return event_value >= threshold
            elif operator == "lte":
                return event_value <= threshold
            else:
                logger.error(f"Unknown operator: {operator}")
                return False
                
        except (TypeError, ValueError) as e:
            logger.error(f"Comparison error: {e}", exc_info=True)
            return False
    
    def _update_cooldown(self, policy_key: str, timestamp: float) -> None:
        """
        Update cooldown timestamp with LRU eviction
        
        FIXED: Prevents unbounded memory growth
        
        Args:
            policy_key: Policy identifier
            timestamp: Current timestamp
        """
        # Update timestamp
        self.last_execution[policy_key] = timestamp
        
        # Move to end (most recently used)
        self.last_execution.move_to_end(policy_key)
        
        # LRU eviction if too large
        while len(self.last_execution) > self.max_cooldown_history:
            evicted_key, _ = self.last_execution.popitem(last=False)
            logger.debug(f"Evicted cooldown entry: {evicted_key}")
    
    def _is_rate_limited(
        self,
        policy_key: str,
        policy: HealingPolicy,
        current_time: float
    ) -> bool:
        """
        Check if policy is rate limited
        
        Args:
            policy_key: Policy identifier
            policy: Policy configuration
            current_time: Current timestamp
            
        Returns:
            True if rate limited, False otherwise
        """
        if policy_key not in self.execution_timestamps:
            return False
        
        # Remove executions older than 1 hour
        one_hour_ago = current_time - 3600
        recent_executions = [
            ts for ts in self.execution_timestamps[policy_key]
            if ts > one_hour_ago
        ]
        
        self.execution_timestamps[policy_key] = recent_executions
        
        # Check rate limit
        return len(recent_executions) >= policy.max_executions_per_hour
    
    def _record_execution(self, policy_key: str, timestamp: float) -> None:
        """
        Record policy execution for rate limiting
        
        Args:
            policy_key: Policy identifier
            timestamp: Execution timestamp
        """
        if policy_key not in self.execution_timestamps:
            self.execution_timestamps[policy_key] = []
        
        self.execution_timestamps[policy_key].append(timestamp)
        
        # Limit history size (memory management)
        if len(self.execution_timestamps[policy_key]) > self.max_execution_history:
            self.execution_timestamps[policy_key] = \
                self.execution_timestamps[policy_key][-self.max_execution_history:]
    
    def get_policy_stats(self) -> Dict[str, Dict]:
        """
        Get statistics about policy execution
        
        Returns:
            Dictionary of policy statistics
        """
        with self._lock:
            stats = {}
            
            for policy in self.policies:
                policy_stats = {
                    "name": policy.name,
                    "priority": policy.priority,
                    "enabled": policy.enabled,
                    "cooldown_seconds": policy.cool_down_seconds,
                    "max_per_hour": policy.max_executions_per_hour,
                    "total_components": sum(
                        1 for key in self.last_execution.keys()
                        if key.startswith(f"{policy.name}_")
                    )
                }
                
                stats[policy.name] = policy_stats
            
            return stats