File size: 4,125 Bytes
1aa566a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"""
Tests for drift detection components.
"""
from __future__ import annotations

import numpy as np
import pandas as pd
import pytest

from src.monitoring.drift_detector import DriftDetector
from src.data.generator import TaxiDataGenerator
from src.data.drift_simulator import DriftSimulator


@pytest.fixture
def reference_df() -> pd.DataFrame:
    gen = TaxiDataGenerator(random_seed=0)
    return gen.generate_reference(n_samples=1000)


@pytest.fixture
def detector(reference_df: pd.DataFrame) -> DriftDetector:
    d = DriftDetector()
    d.set_reference(reference_df)
    return d


class TestPSI:
    def test_identical_distributions_psi_near_zero(self, detector: DriftDetector) -> None:
        """PSI should be ~0 when reference == current."""
        ref = np.random.default_rng(42).normal(0, 1, 1000)
        psi = DriftDetector._psi(ref, ref)
        assert psi < 0.05, f"Expected PSI≈0 for identical distributions, got {psi}"

    def test_very_different_distributions_high_psi(self, detector: DriftDetector) -> None:
        """PSI should be large for very different distributions."""
        ref = np.random.default_rng(1).normal(0, 1, 1000)
        current = np.random.default_rng(2).normal(5, 1, 1000)  # large shift
        psi = DriftDetector._psi(ref, current)
        assert psi >= 0.2, f"Expected PSI>=0.2 for large shift, got {psi}"

    def test_psi_non_negative(self) -> None:
        rng = np.random.default_rng(99)
        ref = rng.uniform(0, 10, 500)
        cur = rng.uniform(0, 10, 500)
        psi = DriftDetector._psi(ref, cur)
        assert psi >= 0, "PSI must be non-negative"


class TestFeatureDrift:
    def test_no_drift_on_same_data(self, detector: DriftDetector, reference_df: pd.DataFrame) -> None:
        report = detector.detect_feature_drift(reference_df)
        # Most features should not show drift on the same data
        assert "drift_detected" in report
        assert "feature_results" in report

    def test_drift_detected_after_shift(self, detector: DriftDetector, reference_df: pd.DataFrame) -> None:
        simulator = DriftSimulator(random_seed=7)
        feature_cols = ["trip_distance", "passenger_count", "pickup_hour"]
        drifted = reference_df.copy()
        drifted_features = simulator.apply(
            drifted[feature_cols],
            drift_type="sudden",
            severity=3.0,
        )
        for col in feature_cols:
            drifted[col] = drifted_features[col].values

        report = detector.detect_feature_drift(drifted, features=feature_cols)
        assert report["drift_detected"] is True, "Drift should be detected after large shift"
        assert len(report["drifted_features"]) > 0

    def test_report_structure(self, detector: DriftDetector, reference_df: pd.DataFrame) -> None:
        report = detector.detect_feature_drift(reference_df, features=["trip_distance"])
        assert "drift_detected" in report
        assert "feature_results" in report
        assert "drifted_features" in report
        assert "n_live_samples" in report
        assert "timestamp" in report
        assert "trip_distance" in report["feature_results"]
        feat = report["feature_results"]["trip_distance"]
        assert "psi" in feat
        assert "ks_stat" in feat
        assert "ks_pvalue" in feat


class TestPerformanceDrift:
    def test_no_degradation_when_same_rmse(self, detector: DriftDetector) -> None:
        report = detector.detect_performance_drift(recent_rmse=3.5, baseline_rmse=3.5)
        assert report["drift_detected"] is False

    def test_degradation_detected_above_threshold(self, detector: DriftDetector) -> None:
        # 20% increase should trigger (threshold is 15%)
        report = detector.detect_performance_drift(recent_rmse=3.5 * 1.20, baseline_rmse=3.5)
        assert report["drift_detected"] is True

    def test_borderline_case(self, detector: DriftDetector) -> None:
        # Exactly at threshold: 15% → not triggered (strict >)
        report = detector.detect_performance_drift(recent_rmse=3.5 * 1.15, baseline_rmse=3.5)
        assert isinstance(report["drift_detected"], bool)