Spaces:
Sleeping
Sleeping
File size: 4,667 Bytes
1aa566a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | """
Tests for the intelligent retraining trigger and pipeline.
"""
from __future__ import annotations
import pytest
# Fixtures
_FEATURE_DRIFT_DETECTED = {
"drift_detected": True,
"drifted_features": ["trip_distance", "pickup_hour"],
"feature_results": {
"trip_distance": {"psi": 0.31, "ks_stat": 0.2, "ks_pvalue": 0.001, "drifted": True},
"pickup_hour": {"psi": 0.24, "ks_stat": 0.18, "ks_pvalue": 0.01, "drifted": True},
},
"n_live_samples": 500,
"timestamp": "2024-01-01T00:00:00Z",
}
_FEATURE_DRIFT_NONE = {
"drift_detected": False,
"drifted_features": [],
"feature_results": {
"trip_distance": {"psi": 0.05, "ks_stat": 0.05, "ks_pvalue": 0.5, "drifted": False},
},
"n_live_samples": 500,
"timestamp": "2024-01-01T00:00:00Z",
}
_PERF_DEGRADED = {
"drift_detected": True,
"recent_rmse": 5.2,
"baseline_rmse": 3.5,
"pct_change": 48.6,
"timestamp": "2024-01-01T00:00:00Z",
}
_PERF_OK = {
"drift_detected": False,
"recent_rmse": 3.6,
"baseline_rmse": 3.5,
"pct_change": 2.8,
"timestamp": "2024-01-01T00:00:00Z",
}
class TestRetrainingTrigger:
@pytest.fixture
def trigger(self):
from src.retraining.trigger import RetrainingTrigger
return RetrainingTrigger()
def test_trigger_when_both_drift_and_perf_degrade(self, trigger) -> None:
decision = trigger.should_retrain(
feature_drift_report=_FEATURE_DRIFT_DETECTED,
performance_report=_PERF_DEGRADED,
samples_since_last_retrain=2000,
)
assert decision["should_retrain"] is True
def test_no_trigger_when_only_feature_drift(self, trigger) -> None:
decision = trigger.should_retrain(
feature_drift_report=_FEATURE_DRIFT_DETECTED,
performance_report=_PERF_OK,
samples_since_last_retrain=2000,
)
assert decision["should_retrain"] is False
def test_no_trigger_when_only_perf_degraded(self, trigger) -> None:
decision = trigger.should_retrain(
feature_drift_report=_FEATURE_DRIFT_NONE,
performance_report=_PERF_DEGRADED,
samples_since_last_retrain=2000,
)
assert decision["should_retrain"] is False
def test_no_trigger_insufficient_samples(self, trigger) -> None:
decision = trigger.should_retrain(
feature_drift_report=_FEATURE_DRIFT_DETECTED,
performance_report=_PERF_DEGRADED,
samples_since_last_retrain=10, # way below minimum
)
assert decision["should_retrain"] is False
assert any("samples" in b.lower() for b in decision["blocking_reasons"])
def test_cooldown_blocks_retrain(self, trigger) -> None:
import time
trigger._last_retrain_time = time.time() # just retrained
decision = trigger.should_retrain(
feature_drift_report=_FEATURE_DRIFT_DETECTED,
performance_report=_PERF_DEGRADED,
samples_since_last_retrain=2000,
)
assert decision["should_retrain"] is False
assert any("cooldown" in b.lower() for b in decision["blocking_reasons"])
def test_decision_structure(self, trigger) -> None:
decision = trigger.should_retrain(
feature_drift_report=_FEATURE_DRIFT_DETECTED,
performance_report=_PERF_DEGRADED,
samples_since_last_retrain=2000,
)
required_keys = {"should_retrain", "reasons", "blocking_reasons", "feature_drift",
"performance_drift", "samples_since_last_retrain", "timestamp"}
assert required_keys.issubset(set(decision.keys()))
class TestRootCauseAnalyzer:
@pytest.fixture
def rca(self):
from src.monitoring.root_cause_analyzer import RootCauseAnalyzer
return RootCauseAnalyzer()
def test_no_drift_returns_monitor_action(self, rca) -> None:
result = rca.analyze({"drift_detected": False, "feature_results": {}, "drifted_features": []})
assert result["action_recommended"] == "monitor"
assert result["root_causes"] == []
def test_drift_returns_ranked_causes(self, rca) -> None:
result = rca.analyze(_FEATURE_DRIFT_DETECTED)
assert len(result["root_causes"]) > 0
# Sorted by rca_score descending
scores = [c["rca_score"] for c in result["root_causes"]]
assert scores == sorted(scores, reverse=True)
def test_primary_cause_is_top_ranked(self, rca) -> None:
result = rca.analyze(_FEATURE_DRIFT_DETECTED)
top = result["root_causes"][0]["feature"]
assert result["primary_cause"] == top
|