petter2025 commited on
Commit
cc0e9a4
·
verified ·
1 Parent(s): ca98764

Create test_policy_engine.py

Browse files
Files changed (1) hide show
  1. test_policy_engine.py +291 -0
test_policy_engine.py ADDED
@@ -0,0 +1,291 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Unit tests for PolicyEngine with thread safety and concurrency tests
3
+ """
4
+
5
+ import pytest
6
+ import threading
7
+ import time
8
+ from datetime import datetime, timezone
9
+ from models import ReliabilityEvent, EventSeverity, HealingPolicy, HealingAction, PolicyCondition
10
+ from healing_policies import PolicyEngine
11
+
12
+
13
+ class TestPolicyEngineBasics:
14
+ """Basic policy engine functionality tests"""
15
+
16
+ def test_initialization(self, policy_engine):
17
+ """Test policy engine initializes correctly"""
18
+ assert policy_engine is not None
19
+ assert len(policy_engine.policies) > 0
20
+ assert policy_engine.max_cooldown_history == 100
21
+
22
+ def test_policy_evaluation_no_match(self, policy_engine, normal_event):
23
+ """Test that normal events don't trigger policies"""
24
+ actions = policy_engine.evaluate_policies(normal_event)
25
+ assert actions == [HealingAction.NO_ACTION]
26
+
27
+ def test_policy_evaluation_match(self, policy_engine, critical_event):
28
+ """Test that critical events trigger policies"""
29
+ actions = policy_engine.evaluate_policies(critical_event)
30
+ assert len(actions) > 0
31
+ assert HealingAction.NO_ACTION not in actions
32
+
33
+ def test_policy_disabled(self, sample_policy, sample_event):
34
+ """Test that disabled policies don't execute"""
35
+ disabled_policy = sample_policy.model_copy(update={'enabled': False})
36
+ engine = PolicyEngine(policies=[disabled_policy])
37
+
38
+ actions = engine.evaluate_policies(sample_event)
39
+ assert actions == [HealingAction.NO_ACTION]
40
+
41
+
42
+ class TestPolicyCooldown:
43
+ """Test cooldown mechanism"""
44
+
45
+ def test_cooldown_prevents_immediate_re_execution(self, sample_policy, sample_event):
46
+ """Test that cooldown prevents immediate re-execution"""
47
+ policy = sample_policy.model_copy(update={'cool_down_seconds': 60})
48
+ engine = PolicyEngine(policies=[policy])
49
+
50
+ # First execution should work
51
+ actions1 = engine.evaluate_policies(sample_event)
52
+ assert HealingAction.RESTART_CONTAINER in actions1
53
+
54
+ # Second execution should be blocked by cooldown
55
+ actions2 = engine.evaluate_policies(sample_event)
56
+ assert actions2 == [HealingAction.NO_ACTION]
57
+
58
+ def test_cooldown_expires(self, sample_policy, sample_event):
59
+ """Test that actions work again after cooldown expires"""
60
+ policy = sample_policy.model_copy(update={'cool_down_seconds': 1})
61
+ engine = PolicyEngine(policies=[policy])
62
+
63
+ # First execution
64
+ actions1 = engine.evaluate_policies(sample_event)
65
+ assert HealingAction.RESTART_CONTAINER in actions1
66
+
67
+ # Wait for cooldown to expire
68
+ time.sleep(1.1)
69
+
70
+ # Should work again
71
+ actions2 = engine.evaluate_policies(sample_event)
72
+ assert HealingAction.RESTART_CONTAINER in actions2
73
+
74
+
75
+ class TestRateLimiting:
76
+ """Test rate limiting functionality"""
77
+
78
+ def test_rate_limit_enforcement(self, sample_policy, sample_event):
79
+ """Test that rate limiting prevents excessive executions"""
80
+ policy = sample_policy.model_copy(update={
81
+ 'cool_down_seconds': 0, # No cooldown
82
+ 'max_executions_per_hour': 3
83
+ })
84
+ engine = PolicyEngine(policies=[policy])
85
+
86
+ # Execute 3 times (should all work)
87
+ for i in range(3):
88
+ actions = engine.evaluate_policies(sample_event)
89
+ assert HealingAction.RESTART_CONTAINER in actions
90
+ time.sleep(0.1) # Small delay to avoid race
91
+
92
+ # 4th execution should be rate limited
93
+ actions = engine.evaluate_policies(sample_event)
94
+ assert actions == [HealingAction.NO_ACTION]
95
+
96
+
97
+ class TestThreadSafety:
98
+ """Test thread safety of policy engine"""
99
+
100
+ def test_concurrent_evaluations_no_race_condition(self, sample_policy, sample_event):
101
+ """
102
+ CRITICAL TEST: Verify no race condition in cooldown check
103
+
104
+ This tests the fix for the race condition where multiple threads
105
+ could simultaneously pass the cooldown check
106
+ """
107
+ policy = sample_policy.model_copy(update={'cool_down_seconds': 5})
108
+ engine = PolicyEngine(policies=[policy])
109
+
110
+ results = []
111
+
112
+ def evaluate():
113
+ actions = engine.evaluate_policies(sample_event)
114
+ results.append(actions)
115
+
116
+ # Launch 10 concurrent threads
117
+ threads = [threading.Thread(target=evaluate) for _ in range(10)]
118
+ for t in threads:
119
+ t.start()
120
+ for t in threads:
121
+ t.join()
122
+
123
+ # Count how many actually triggered the policy
124
+ trigger_count = sum(
125
+ 1 for actions in results
126
+ if HealingAction.RESTART_CONTAINER in actions
127
+ )
128
+
129
+ # Only ONE should have triggered (atomic check + update)
130
+ assert trigger_count == 1, f"Expected 1 trigger, got {trigger_count}"
131
+
132
+ def test_concurrent_different_components(self, sample_policy):
133
+ """Test that different components don't interfere with each other"""
134
+ engine = PolicyEngine(policies=[sample_policy])
135
+
136
+ results = {'service-1': [], 'service-2': []}
137
+
138
+ def evaluate_service(service_name):
139
+ event = ReliabilityEvent(
140
+ component=service_name,
141
+ latency_p99=400.0,
142
+ error_rate=0.1,
143
+ throughput=1000.0
144
+ )
145
+ actions = engine.evaluate_policies(event)
146
+ results[service_name].append(actions)
147
+
148
+ # Run both services concurrently multiple times
149
+ threads = []
150
+ for _ in range(5):
151
+ threads.append(threading.Thread(target=evaluate_service, args=('service-1',)))
152
+ threads.append(threading.Thread(target=evaluate_service, args=('service-2',)))
153
+
154
+ for t in threads:
155
+ t.start()
156
+ for t in threads:
157
+ t.join()
158
+
159
+ # Each service should have triggered at least once
160
+ assert any(HealingAction.RESTART_CONTAINER in actions
161
+ for actions in results['service-1'])
162
+ assert any(HealingAction.RESTART_CONTAINER in actions
163
+ for actions in results['service-2'])
164
+
165
+
166
+ class TestMemoryManagement:
167
+ """Test memory leak prevention"""
168
+
169
+ def test_cooldown_history_bounded(self, sample_policy):
170
+ """Test that cooldown history doesn't grow unbounded"""
171
+ engine = PolicyEngine(
172
+ policies=[sample_policy],
173
+ max_cooldown_history=100
174
+ )
175
+
176
+ # Trigger policy for many different components
177
+ for i in range(500):
178
+ event = ReliabilityEvent(
179
+ component=f"service-{i}",
180
+ latency_p99=400.0,
181
+ error_rate=0.1,
182
+ throughput=1000.0
183
+ )
184
+ engine.evaluate_policies(event)
185
+
186
+ # Cooldown history should be capped
187
+ assert len(engine.last_execution) <= engine.max_cooldown_history
188
+
189
+ def test_execution_history_bounded(self, sample_policy):
190
+ """Test that execution history is bounded"""
191
+ engine = PolicyEngine(
192
+ policies=[sample_policy],
193
+ max_execution_history=50
194
+ )
195
+
196
+ # Trigger many times
197
+ for i in range(200):
198
+ event = ReliabilityEvent(
199
+ component="test-service",
200
+ latency_p99=400.0,
201
+ error_rate=0.1,
202
+ throughput=1000.0
203
+ )
204
+ engine.evaluate_policies(event)
205
+ time.sleep(0.01)
206
+
207
+ # Check execution history size
208
+ for timestamps in engine.execution_timestamps.values():
209
+ assert len(timestamps) <= engine.max_execution_history
210
+
211
+
212
+ class TestPriorityHandling:
213
+ """Test priority-based policy evaluation"""
214
+
215
+ def test_policies_evaluated_by_priority(self):
216
+ """Test that higher priority policies are evaluated first"""
217
+ high_priority = HealingPolicy(
218
+ name="high_priority",
219
+ conditions=[PolicyCondition(metric="latency_p99", operator="gt", threshold=100.0)],
220
+ actions=[HealingAction.ROLLBACK],
221
+ priority=1
222
+ )
223
+
224
+ low_priority = HealingPolicy(
225
+ name="low_priority",
226
+ conditions=[PolicyCondition(metric="latency_p99", operator="gt", threshold=100.0)],
227
+ actions=[HealingAction.ALERT_TEAM],
228
+ priority=5
229
+ )
230
+
231
+ # Add in reverse priority order
232
+ engine = PolicyEngine(policies=[low_priority, high_priority])
233
+
234
+ event = ReliabilityEvent(
235
+ component="test",
236
+ latency_p99=200.0,
237
+ error_rate=0.05,
238
+ throughput=1000.0
239
+ )
240
+
241
+ actions = engine.evaluate_policies(event)
242
+
243
+ # Both should execute, but high priority action should come first
244
+ assert HealingAction.ROLLBACK in actions
245
+ assert HealingAction.ALERT_TEAM in actions
246
+ assert actions.index(HealingAction.ROLLBACK) < actions.index(HealingAction.ALERT_TEAM)
247
+
248
+
249
+ class TestOperatorComparisons:
250
+ """Test operator comparison logic"""
251
+
252
+ def test_greater_than_operator(self, policy_engine):
253
+ """Test > operator"""
254
+ result = policy_engine._compare_values(100.0, "gt", 50.0)
255
+ assert result is True
256
+
257
+ result = policy_engine._compare_values(50.0, "gt", 100.0)
258
+ assert result is False
259
+
260
+ def test_less_than_operator(self, policy_engine):
261
+ """Test < operator"""
262
+ result = policy_engine._compare_values(50.0, "lt", 100.0)
263
+ assert result is True
264
+
265
+ result = policy_engine._compare_values(100.0, "lt", 50.0)
266
+ assert result is False
267
+
268
+ def test_type_mismatch_handling(self, policy_engine):
269
+ """Test that type mismatches are handled gracefully"""
270
+ result = policy_engine._compare_values("invalid", "gt", 50.0)
271
+ assert result is False
272
+
273
+ def test_none_value_handling(self, sample_policy):
274
+ """Test that None values are handled correctly"""
275
+ engine = PolicyEngine(policies=[sample_policy])
276
+
277
+ event = ReliabilityEvent(
278
+ component="test",
279
+ latency_p99=100.0,
280
+ error_rate=0.05,
281
+ throughput=1000.0,
282
+ cpu_util=None # None value
283
+ )
284
+
285
+ # Should not crash
286
+ actions = engine.evaluate_policies(event)
287
+ assert actions is not None
288
+
289
+
290
+ if __name__ == "__main__":
291
+ pytest.main([__file__, "-v", "--tb=short"])