argus-mlops / tests /test_retraining.py
hodfa840's picture
Fix scroll reset for HF Spaces double-iframe context
1aa566a
"""
Tests for the intelligent retraining trigger and pipeline.
"""
from __future__ import annotations
import pytest
# Fixtures
_FEATURE_DRIFT_DETECTED = {
"drift_detected": True,
"drifted_features": ["trip_distance", "pickup_hour"],
"feature_results": {
"trip_distance": {"psi": 0.31, "ks_stat": 0.2, "ks_pvalue": 0.001, "drifted": True},
"pickup_hour": {"psi": 0.24, "ks_stat": 0.18, "ks_pvalue": 0.01, "drifted": True},
},
"n_live_samples": 500,
"timestamp": "2024-01-01T00:00:00Z",
}
_FEATURE_DRIFT_NONE = {
"drift_detected": False,
"drifted_features": [],
"feature_results": {
"trip_distance": {"psi": 0.05, "ks_stat": 0.05, "ks_pvalue": 0.5, "drifted": False},
},
"n_live_samples": 500,
"timestamp": "2024-01-01T00:00:00Z",
}
_PERF_DEGRADED = {
"drift_detected": True,
"recent_rmse": 5.2,
"baseline_rmse": 3.5,
"pct_change": 48.6,
"timestamp": "2024-01-01T00:00:00Z",
}
_PERF_OK = {
"drift_detected": False,
"recent_rmse": 3.6,
"baseline_rmse": 3.5,
"pct_change": 2.8,
"timestamp": "2024-01-01T00:00:00Z",
}
class TestRetrainingTrigger:
@pytest.fixture
def trigger(self):
from src.retraining.trigger import RetrainingTrigger
return RetrainingTrigger()
def test_trigger_when_both_drift_and_perf_degrade(self, trigger) -> None:
decision = trigger.should_retrain(
feature_drift_report=_FEATURE_DRIFT_DETECTED,
performance_report=_PERF_DEGRADED,
samples_since_last_retrain=2000,
)
assert decision["should_retrain"] is True
def test_no_trigger_when_only_feature_drift(self, trigger) -> None:
decision = trigger.should_retrain(
feature_drift_report=_FEATURE_DRIFT_DETECTED,
performance_report=_PERF_OK,
samples_since_last_retrain=2000,
)
assert decision["should_retrain"] is False
def test_no_trigger_when_only_perf_degraded(self, trigger) -> None:
decision = trigger.should_retrain(
feature_drift_report=_FEATURE_DRIFT_NONE,
performance_report=_PERF_DEGRADED,
samples_since_last_retrain=2000,
)
assert decision["should_retrain"] is False
def test_no_trigger_insufficient_samples(self, trigger) -> None:
decision = trigger.should_retrain(
feature_drift_report=_FEATURE_DRIFT_DETECTED,
performance_report=_PERF_DEGRADED,
samples_since_last_retrain=10, # way below minimum
)
assert decision["should_retrain"] is False
assert any("samples" in b.lower() for b in decision["blocking_reasons"])
def test_cooldown_blocks_retrain(self, trigger) -> None:
import time
trigger._last_retrain_time = time.time() # just retrained
decision = trigger.should_retrain(
feature_drift_report=_FEATURE_DRIFT_DETECTED,
performance_report=_PERF_DEGRADED,
samples_since_last_retrain=2000,
)
assert decision["should_retrain"] is False
assert any("cooldown" in b.lower() for b in decision["blocking_reasons"])
def test_decision_structure(self, trigger) -> None:
decision = trigger.should_retrain(
feature_drift_report=_FEATURE_DRIFT_DETECTED,
performance_report=_PERF_DEGRADED,
samples_since_last_retrain=2000,
)
required_keys = {"should_retrain", "reasons", "blocking_reasons", "feature_drift",
"performance_drift", "samples_since_last_retrain", "timestamp"}
assert required_keys.issubset(set(decision.keys()))
class TestRootCauseAnalyzer:
@pytest.fixture
def rca(self):
from src.monitoring.root_cause_analyzer import RootCauseAnalyzer
return RootCauseAnalyzer()
def test_no_drift_returns_monitor_action(self, rca) -> None:
result = rca.analyze({"drift_detected": False, "feature_results": {}, "drifted_features": []})
assert result["action_recommended"] == "monitor"
assert result["root_causes"] == []
def test_drift_returns_ranked_causes(self, rca) -> None:
result = rca.analyze(_FEATURE_DRIFT_DETECTED)
assert len(result["root_causes"]) > 0
# Sorted by rca_score descending
scores = [c["rca_score"] for c in result["root_causes"]]
assert scores == sorted(scores, reverse=True)
def test_primary_cause_is_top_ranked(self, rca) -> None:
result = rca.analyze(_FEATURE_DRIFT_DETECTED)
top = result["root_causes"][0]["feature"]
assert result["primary_cause"] == top