| """ |
| Unit tests for PolicyEngine with thread safety and concurrency tests |
| """ |
|
|
| import pytest |
| import threading |
| import time |
| from datetime import datetime, timezone |
| from models import ReliabilityEvent, EventSeverity, HealingPolicy, HealingAction, PolicyCondition |
| from healing_policies import PolicyEngine |
|
|
|
|
| class TestPolicyEngineBasics: |
| """Basic policy engine functionality tests""" |
| |
| def test_initialization(self, policy_engine): |
| """Test policy engine initializes correctly""" |
| assert policy_engine is not None |
| assert len(policy_engine.policies) > 0 |
| assert policy_engine.max_cooldown_history == 100 |
| |
| def test_policy_evaluation_no_match(self, policy_engine, normal_event): |
| """Test that normal events don't trigger policies""" |
| actions = policy_engine.evaluate_policies(normal_event) |
| assert actions == [HealingAction.NO_ACTION] |
| |
| def test_policy_evaluation_match(self, policy_engine, critical_event): |
| """Test that critical events trigger policies""" |
| actions = policy_engine.evaluate_policies(critical_event) |
| assert len(actions) > 0 |
| assert HealingAction.NO_ACTION not in actions |
| |
| def test_policy_disabled(self, sample_policy, sample_event): |
| """Test that disabled policies don't execute""" |
| disabled_policy = sample_policy.model_copy(update={'enabled': False}) |
| engine = PolicyEngine(policies=[disabled_policy]) |
| |
| actions = engine.evaluate_policies(sample_event) |
| assert actions == [HealingAction.NO_ACTION] |
|
|
|
|
| class TestPolicyCooldown: |
| """Test cooldown mechanism""" |
| |
| def test_cooldown_prevents_immediate_re_execution(self, sample_policy, sample_event): |
| """Test that cooldown prevents immediate re-execution""" |
| policy = sample_policy.model_copy(update={'cool_down_seconds': 60}) |
| engine = PolicyEngine(policies=[policy]) |
| |
| |
| actions1 = engine.evaluate_policies(sample_event) |
| assert HealingAction.RESTART_CONTAINER in actions1 |
| |
| |
| actions2 = engine.evaluate_policies(sample_event) |
| assert actions2 == [HealingAction.NO_ACTION] |
| |
| def test_cooldown_expires(self, sample_policy, sample_event): |
| """Test that actions work again after cooldown expires""" |
| policy = sample_policy.model_copy(update={'cool_down_seconds': 1}) |
| engine = PolicyEngine(policies=[policy]) |
| |
| |
| actions1 = engine.evaluate_policies(sample_event) |
| assert HealingAction.RESTART_CONTAINER in actions1 |
| |
| |
| time.sleep(1.1) |
| |
| |
| actions2 = engine.evaluate_policies(sample_event) |
| assert HealingAction.RESTART_CONTAINER in actions2 |
|
|
|
|
| class TestRateLimiting: |
| """Test rate limiting functionality""" |
| |
| def test_rate_limit_enforcement(self, sample_policy, sample_event): |
| """Test that rate limiting prevents excessive executions""" |
| policy = sample_policy.model_copy(update={ |
| 'cool_down_seconds': 0, |
| 'max_executions_per_hour': 3 |
| }) |
| engine = PolicyEngine(policies=[policy]) |
| |
| |
| for i in range(3): |
| actions = engine.evaluate_policies(sample_event) |
| assert HealingAction.RESTART_CONTAINER in actions |
| time.sleep(0.1) |
| |
| |
| actions = engine.evaluate_policies(sample_event) |
| assert actions == [HealingAction.NO_ACTION] |
|
|
|
|
| class TestThreadSafety: |
| """Test thread safety of policy engine""" |
| |
| def test_concurrent_evaluations_no_race_condition(self, sample_policy, sample_event): |
| """ |
| CRITICAL TEST: Verify no race condition in cooldown check |
| |
| This tests the fix for the race condition where multiple threads |
| could simultaneously pass the cooldown check |
| """ |
| policy = sample_policy.model_copy(update={'cool_down_seconds': 5}) |
| engine = PolicyEngine(policies=[policy]) |
| |
| results = [] |
| |
| def evaluate(): |
| actions = engine.evaluate_policies(sample_event) |
| results.append(actions) |
| |
| |
| threads = [threading.Thread(target=evaluate) for _ in range(10)] |
| for t in threads: |
| t.start() |
| for t in threads: |
| t.join() |
| |
| |
| trigger_count = sum( |
| 1 for actions in results |
| if HealingAction.RESTART_CONTAINER in actions |
| ) |
| |
| |
| assert trigger_count == 1, f"Expected 1 trigger, got {trigger_count}" |
| |
| def test_concurrent_different_components(self, sample_policy): |
| """Test that different components don't interfere with each other""" |
| engine = PolicyEngine(policies=[sample_policy]) |
| |
| results = {'service-1': [], 'service-2': []} |
| |
| def evaluate_service(service_name): |
| event = ReliabilityEvent( |
| component=service_name, |
| latency_p99=400.0, |
| error_rate=0.1, |
| throughput=1000.0 |
| ) |
| actions = engine.evaluate_policies(event) |
| results[service_name].append(actions) |
| |
| |
| threads = [] |
| for _ in range(5): |
| threads.append(threading.Thread(target=evaluate_service, args=('service-1',))) |
| threads.append(threading.Thread(target=evaluate_service, args=('service-2',))) |
| |
| for t in threads: |
| t.start() |
| for t in threads: |
| t.join() |
| |
| |
| assert any(HealingAction.RESTART_CONTAINER in actions |
| for actions in results['service-1']) |
| assert any(HealingAction.RESTART_CONTAINER in actions |
| for actions in results['service-2']) |
|
|
|
|
| class TestMemoryManagement: |
| """Test memory leak prevention""" |
| |
| def test_cooldown_history_bounded(self, sample_policy): |
| """Test that cooldown history doesn't grow unbounded""" |
| engine = PolicyEngine( |
| policies=[sample_policy], |
| max_cooldown_history=100 |
| ) |
| |
| |
| for i in range(500): |
| event = ReliabilityEvent( |
| component=f"service-{i}", |
| latency_p99=400.0, |
| error_rate=0.1, |
| throughput=1000.0 |
| ) |
| engine.evaluate_policies(event) |
| |
| |
| assert len(engine.last_execution) <= engine.max_cooldown_history |
| |
| def test_execution_history_bounded(self, sample_policy): |
| """Test that execution history is bounded""" |
| engine = PolicyEngine( |
| policies=[sample_policy], |
| max_execution_history=50 |
| ) |
| |
| |
| for i in range(200): |
| event = ReliabilityEvent( |
| component="test-service", |
| latency_p99=400.0, |
| error_rate=0.1, |
| throughput=1000.0 |
| ) |
| engine.evaluate_policies(event) |
| time.sleep(0.01) |
| |
| |
| for timestamps in engine.execution_timestamps.values(): |
| assert len(timestamps) <= engine.max_execution_history |
|
|
|
|
| class TestPriorityHandling: |
| """Test priority-based policy evaluation""" |
| |
| def test_policies_evaluated_by_priority(self): |
| """Test that higher priority policies are evaluated first""" |
| high_priority = HealingPolicy( |
| name="high_priority", |
| conditions=[PolicyCondition(metric="latency_p99", operator="gt", threshold=100.0)], |
| actions=[HealingAction.ROLLBACK], |
| priority=1 |
| ) |
| |
| low_priority = HealingPolicy( |
| name="low_priority", |
| conditions=[PolicyCondition(metric="latency_p99", operator="gt", threshold=100.0)], |
| actions=[HealingAction.ALERT_TEAM], |
| priority=5 |
| ) |
| |
| |
| engine = PolicyEngine(policies=[low_priority, high_priority]) |
| |
| event = ReliabilityEvent( |
| component="test", |
| latency_p99=200.0, |
| error_rate=0.05, |
| throughput=1000.0 |
| ) |
| |
| actions = engine.evaluate_policies(event) |
| |
| |
| assert HealingAction.ROLLBACK in actions |
| assert HealingAction.ALERT_TEAM in actions |
| assert actions.index(HealingAction.ROLLBACK) < actions.index(HealingAction.ALERT_TEAM) |
|
|
|
|
| class TestOperatorComparisons: |
| """Test operator comparison logic""" |
| |
| def test_greater_than_operator(self, policy_engine): |
| """Test > operator""" |
| result = policy_engine._compare_values(100.0, "gt", 50.0) |
| assert result is True |
| |
| result = policy_engine._compare_values(50.0, "gt", 100.0) |
| assert result is False |
| |
| def test_less_than_operator(self, policy_engine): |
| """Test < operator""" |
| result = policy_engine._compare_values(50.0, "lt", 100.0) |
| assert result is True |
| |
| result = policy_engine._compare_values(100.0, "lt", 50.0) |
| assert result is False |
| |
| def test_type_mismatch_handling(self, policy_engine): |
| """Test that type mismatches are handled gracefully""" |
| result = policy_engine._compare_values("invalid", "gt", 50.0) |
| assert result is False |
| |
| def test_none_value_handling(self, sample_policy): |
| """Test that None values are handled correctly""" |
| engine = PolicyEngine(policies=[sample_policy]) |
| |
| event = ReliabilityEvent( |
| component="test", |
| latency_p99=100.0, |
| error_rate=0.05, |
| throughput=1000.0, |
| cpu_util=None |
| ) |
| |
| |
| actions = engine.evaluate_policies(event) |
| assert actions is not None |
|
|
|
|
| if __name__ == "__main__": |
| pytest.main([__file__, "-v", "--tb=short"]) |