""" Drift Detection System Detects data drift and model performance degradation """ import numpy as np import pandas as pd from typing import Dict, List, Optional from datetime import datetime, timedelta from scipy import stats import json class DriftDetector: """ Drift Detection System Detects statistical drift in data distributions """ def __init__(self, drift_threshold: float = 0.15, reference_window: int = 30): """ Initialize Drift Detector Args: drift_threshold: Threshold for detecting drift (KS test p-value) reference_window: Days to use as reference baseline """ self.drift_threshold = drift_threshold self.reference_window = reference_window self.reference_data = None self.drift_history = [] def update_reference(self, data: pd.DataFrame): """ Update reference baseline Args: data: Reference data (should be historical stable period) """ self.reference_data = data.copy() def detect_drift(self, current_data: pd.DataFrame, features: Optional[List[str]] = None) -> Dict: """ Detect drift in current data vs reference Args: current_data: Current data to check for drift features: List of features to check (default: all numeric) Returns: Drift detection results """ if self.reference_data is None or len(self.reference_data) == 0: return { "drift_detected": False, "error": "No reference data available" } if len(current_data) == 0: return { "drift_detected": False, "error": "No current data available" } # Default to numeric features if features is None: features = current_data.select_dtypes(include=[np.number]).columns.tolist() drift_results = { "timestamp": datetime.now().isoformat(), "features_checked": features, "drift_detected": False, "feature_drifts": {} } for feature in features: if feature not in current_data.columns or feature not in self.reference_data.columns: continue ref_values = self.reference_data[feature].dropna() current_values = current_data[feature].dropna() if len(ref_values) < 10 or len(current_values) < 10: continue # Kolmogorov-Smirnov test for distribution drift try: ks_statistic, p_value = stats.ks_2samp(ref_values, current_values) # Calculate mean shift mean_shift = abs(current_values.mean() - ref_values.mean()) mean_shift_pct = (mean_shift / ref_values.mean()) * 100 if ref_values.mean() != 0 else 0 # Calculate std shift std_shift = abs(current_values.std() - ref_values.std()) std_shift_pct = (std_shift / ref_values.std()) * 100 if ref_values.std() != 0 else 0 feature_drift = { "ks_statistic": float(ks_statistic), "p_value": float(p_value), "drift_detected": p_value < self.drift_threshold, "mean_shift": float(mean_shift), "mean_shift_pct": float(mean_shift_pct), "std_shift": float(std_shift), "std_shift_pct": float(std_shift_pct), "reference_mean": float(ref_values.mean()), "current_mean": float(current_values.mean()), "reference_std": float(ref_values.std()), "current_std": float(current_values.std()) } drift_results["feature_drifts"][feature] = feature_drift if feature_drift["drift_detected"]: drift_results["drift_detected"] = True except Exception as e: drift_results["feature_drifts"][feature] = { "error": str(e) } # Calculate overall drift score if drift_results["feature_drifts"]: drift_scores = [ f["p_value"] for f in drift_results["feature_drifts"].values() if "p_value" in f ] if drift_scores: drift_results["overall_drift_score"] = float(np.mean(drift_scores)) # Store in history self.drift_history.append(drift_results) # Keep last 100 detections if len(self.drift_history) > 100: self.drift_history = self.drift_history[-100:] return drift_results def get_drift_history(self, days: int = 7) -> List[Dict]: """ Get drift history for last N days Args: days: Number of days to retrieve Returns: List of drift detection results """ cutoff = datetime.now() - timedelta(days=days) return [ d for d in self.drift_history if datetime.fromisoformat(d["timestamp"]) >= cutoff ]