File size: 4,667 Bytes
1aa566a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""
Tests for the intelligent retraining trigger and pipeline.
"""
from __future__ import annotations

import pytest


# Fixtures
_FEATURE_DRIFT_DETECTED = {
    "drift_detected": True,
    "drifted_features": ["trip_distance", "pickup_hour"],
    "feature_results": {
        "trip_distance": {"psi": 0.31, "ks_stat": 0.2, "ks_pvalue": 0.001, "drifted": True},
        "pickup_hour": {"psi": 0.24, "ks_stat": 0.18, "ks_pvalue": 0.01, "drifted": True},
    },
    "n_live_samples": 500,
    "timestamp": "2024-01-01T00:00:00Z",
}

_FEATURE_DRIFT_NONE = {
    "drift_detected": False,
    "drifted_features": [],
    "feature_results": {
        "trip_distance": {"psi": 0.05, "ks_stat": 0.05, "ks_pvalue": 0.5, "drifted": False},
    },
    "n_live_samples": 500,
    "timestamp": "2024-01-01T00:00:00Z",
}

_PERF_DEGRADED = {
    "drift_detected": True,
    "recent_rmse": 5.2,
    "baseline_rmse": 3.5,
    "pct_change": 48.6,
    "timestamp": "2024-01-01T00:00:00Z",
}

_PERF_OK = {
    "drift_detected": False,
    "recent_rmse": 3.6,
    "baseline_rmse": 3.5,
    "pct_change": 2.8,
    "timestamp": "2024-01-01T00:00:00Z",
}


class TestRetrainingTrigger:
    @pytest.fixture
    def trigger(self):
        from src.retraining.trigger import RetrainingTrigger
        return RetrainingTrigger()

    def test_trigger_when_both_drift_and_perf_degrade(self, trigger) -> None:
        decision = trigger.should_retrain(
            feature_drift_report=_FEATURE_DRIFT_DETECTED,
            performance_report=_PERF_DEGRADED,
            samples_since_last_retrain=2000,
        )
        assert decision["should_retrain"] is True

    def test_no_trigger_when_only_feature_drift(self, trigger) -> None:
        decision = trigger.should_retrain(
            feature_drift_report=_FEATURE_DRIFT_DETECTED,
            performance_report=_PERF_OK,
            samples_since_last_retrain=2000,
        )
        assert decision["should_retrain"] is False

    def test_no_trigger_when_only_perf_degraded(self, trigger) -> None:
        decision = trigger.should_retrain(
            feature_drift_report=_FEATURE_DRIFT_NONE,
            performance_report=_PERF_DEGRADED,
            samples_since_last_retrain=2000,
        )
        assert decision["should_retrain"] is False

    def test_no_trigger_insufficient_samples(self, trigger) -> None:
        decision = trigger.should_retrain(
            feature_drift_report=_FEATURE_DRIFT_DETECTED,
            performance_report=_PERF_DEGRADED,
            samples_since_last_retrain=10,  # way below minimum
        )
        assert decision["should_retrain"] is False
        assert any("samples" in b.lower() for b in decision["blocking_reasons"])

    def test_cooldown_blocks_retrain(self, trigger) -> None:
        import time
        trigger._last_retrain_time = time.time()  # just retrained
        decision = trigger.should_retrain(
            feature_drift_report=_FEATURE_DRIFT_DETECTED,
            performance_report=_PERF_DEGRADED,
            samples_since_last_retrain=2000,
        )
        assert decision["should_retrain"] is False
        assert any("cooldown" in b.lower() for b in decision["blocking_reasons"])

    def test_decision_structure(self, trigger) -> None:
        decision = trigger.should_retrain(
            feature_drift_report=_FEATURE_DRIFT_DETECTED,
            performance_report=_PERF_DEGRADED,
            samples_since_last_retrain=2000,
        )
        required_keys = {"should_retrain", "reasons", "blocking_reasons", "feature_drift",
                         "performance_drift", "samples_since_last_retrain", "timestamp"}
        assert required_keys.issubset(set(decision.keys()))


class TestRootCauseAnalyzer:
    @pytest.fixture
    def rca(self):
        from src.monitoring.root_cause_analyzer import RootCauseAnalyzer
        return RootCauseAnalyzer()

    def test_no_drift_returns_monitor_action(self, rca) -> None:
        result = rca.analyze({"drift_detected": False, "feature_results": {}, "drifted_features": []})
        assert result["action_recommended"] == "monitor"
        assert result["root_causes"] == []

    def test_drift_returns_ranked_causes(self, rca) -> None:
        result = rca.analyze(_FEATURE_DRIFT_DETECTED)
        assert len(result["root_causes"]) > 0
        # Sorted by rca_score descending
        scores = [c["rca_score"] for c in result["root_causes"]]
        assert scores == sorted(scores, reverse=True)

    def test_primary_cause_is_top_ranked(self, rca) -> None:
        result = rca.analyze(_FEATURE_DRIFT_DETECTED)
        top = result["root_causes"][0]["feature"]
        assert result["primary_cause"] == top