File size: 3,386 Bytes
644b8d7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""
Pytest configuration and shared fixtures
"""

import pytest
import tempfile
import os
from datetime import datetime, timezone
from models import ReliabilityEvent, EventSeverity, HealingPolicy, HealingAction, PolicyCondition
from healing_policies import PolicyEngine
from app import (
    ThreadSafeEventStore,
    AdvancedAnomalyDetector,
    BusinessImpactCalculator,
    SimplePredictiveEngine,
    EnhancedReliabilityEngine,
    OrchestrationManager
)


@pytest.fixture
def sample_event():
    """Create a sample reliability event for testing"""
    return ReliabilityEvent(
        component="test-service",
        latency_p99=250.0,
        error_rate=0.08,
        throughput=1000.0,
        cpu_util=0.75,
        memory_util=0.65,
        severity=EventSeverity.MEDIUM
    )


@pytest.fixture
def critical_event():
    """Create a critical reliability event"""
    return ReliabilityEvent(
        component="critical-service",
        latency_p99=600.0,
        error_rate=0.35,
        throughput=500.0,
        cpu_util=0.95,
        memory_util=0.92,
        severity=EventSeverity.CRITICAL
    )


@pytest.fixture
def normal_event():
    """Create a normal (healthy) reliability event"""
    return ReliabilityEvent(
        component="healthy-service",
        latency_p99=80.0,
        error_rate=0.01,
        throughput=2000.0,
        cpu_util=0.40,
        memory_util=0.35,
        severity=EventSeverity.LOW
    )


@pytest.fixture
def sample_policy():
    """Create a sample healing policy"""
    return HealingPolicy(
        name="test_policy",
        conditions=[
            PolicyCondition(metric="latency_p99", operator="gt", threshold=300.0)
        ],
        actions=[HealingAction.RESTART_CONTAINER],
        priority=2,
        cool_down_seconds=60,
        max_executions_per_hour=5
    )


@pytest.fixture
def policy_engine():
    """Create a fresh policy engine for testing"""
    return PolicyEngine(max_cooldown_history=100, max_execution_history=100)


@pytest.fixture
def event_store():
    """Create a fresh event store"""
    return ThreadSafeEventStore(max_size=100)


@pytest.fixture
def anomaly_detector():
    """Create a fresh anomaly detector"""
    return AdvancedAnomalyDetector()


@pytest.fixture
def business_calculator():
    """Create a business impact calculator"""
    return BusinessImpactCalculator()


@pytest.fixture
def predictive_engine():
    """Create a predictive engine"""
    return SimplePredictiveEngine(history_window=20)


@pytest.fixture
def temp_dir():
    """Create a temporary directory for test files"""
    with tempfile.TemporaryDirectory() as tmpdir:
        yield tmpdir


@pytest.fixture
def mock_faiss_index(temp_dir):
    """Create a mock FAISS index for testing"""
    # This would require FAISS to be installed
    # For now, return None to allow tests to skip FAISS operations
    return None


@pytest.fixture
async def reliability_engine(
    policy_engine,
    event_store,
    anomaly_detector,
    business_calculator
):
    """Create a fully initialized reliability engine"""
    orchestrator = OrchestrationManager()
    
    engine = EnhancedReliabilityEngine(
        orchestrator=orchestrator,
        policy_engine=policy_engine,
        event_store=event_store,
        anomaly_detector=anomaly_detector,
        business_calculator=business_calculator
    )
    
    return engine