Spaces:
Build error
Build error
| """ | |
| Drift Detection System | |
| Detects data drift and model performance degradation | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| from typing import Dict, List, Optional | |
| from datetime import datetime, timedelta | |
| from scipy import stats | |
| import json | |
| class DriftDetector: | |
| """ | |
| Drift Detection System | |
| Detects statistical drift in data distributions | |
| """ | |
| def __init__(self, drift_threshold: float = 0.15, reference_window: int = 30): | |
| """ | |
| Initialize Drift Detector | |
| Args: | |
| drift_threshold: Threshold for detecting drift (KS test p-value) | |
| reference_window: Days to use as reference baseline | |
| """ | |
| self.drift_threshold = drift_threshold | |
| self.reference_window = reference_window | |
| self.reference_data = None | |
| self.drift_history = [] | |
| def update_reference(self, data: pd.DataFrame): | |
| """ | |
| Update reference baseline | |
| Args: | |
| data: Reference data (should be historical stable period) | |
| """ | |
| self.reference_data = data.copy() | |
| def detect_drift(self, current_data: pd.DataFrame, | |
| features: Optional[List[str]] = None) -> Dict: | |
| """ | |
| Detect drift in current data vs reference | |
| Args: | |
| current_data: Current data to check for drift | |
| features: List of features to check (default: all numeric) | |
| Returns: | |
| Drift detection results | |
| """ | |
| if self.reference_data is None or len(self.reference_data) == 0: | |
| return { | |
| "drift_detected": False, | |
| "error": "No reference data available" | |
| } | |
| if len(current_data) == 0: | |
| return { | |
| "drift_detected": False, | |
| "error": "No current data available" | |
| } | |
| # Default to numeric features | |
| if features is None: | |
| features = current_data.select_dtypes(include=[np.number]).columns.tolist() | |
| drift_results = { | |
| "timestamp": datetime.now().isoformat(), | |
| "features_checked": features, | |
| "drift_detected": False, | |
| "feature_drifts": {} | |
| } | |
| for feature in features: | |
| if feature not in current_data.columns or feature not in self.reference_data.columns: | |
| continue | |
| ref_values = self.reference_data[feature].dropna() | |
| current_values = current_data[feature].dropna() | |
| if len(ref_values) < 10 or len(current_values) < 10: | |
| continue | |
| # Kolmogorov-Smirnov test for distribution drift | |
| try: | |
| ks_statistic, p_value = stats.ks_2samp(ref_values, current_values) | |
| # Calculate mean shift | |
| mean_shift = abs(current_values.mean() - ref_values.mean()) | |
| mean_shift_pct = (mean_shift / ref_values.mean()) * 100 if ref_values.mean() != 0 else 0 | |
| # Calculate std shift | |
| std_shift = abs(current_values.std() - ref_values.std()) | |
| std_shift_pct = (std_shift / ref_values.std()) * 100 if ref_values.std() != 0 else 0 | |
| feature_drift = { | |
| "ks_statistic": float(ks_statistic), | |
| "p_value": float(p_value), | |
| "drift_detected": p_value < self.drift_threshold, | |
| "mean_shift": float(mean_shift), | |
| "mean_shift_pct": float(mean_shift_pct), | |
| "std_shift": float(std_shift), | |
| "std_shift_pct": float(std_shift_pct), | |
| "reference_mean": float(ref_values.mean()), | |
| "current_mean": float(current_values.mean()), | |
| "reference_std": float(ref_values.std()), | |
| "current_std": float(current_values.std()) | |
| } | |
| drift_results["feature_drifts"][feature] = feature_drift | |
| if feature_drift["drift_detected"]: | |
| drift_results["drift_detected"] = True | |
| except Exception as e: | |
| drift_results["feature_drifts"][feature] = { | |
| "error": str(e) | |
| } | |
| # Calculate overall drift score | |
| if drift_results["feature_drifts"]: | |
| drift_scores = [ | |
| f["p_value"] for f in drift_results["feature_drifts"].values() | |
| if "p_value" in f | |
| ] | |
| if drift_scores: | |
| drift_results["overall_drift_score"] = float(np.mean(drift_scores)) | |
| # Store in history | |
| self.drift_history.append(drift_results) | |
| # Keep last 100 detections | |
| if len(self.drift_history) > 100: | |
| self.drift_history = self.drift_history[-100:] | |
| return drift_results | |
| def get_drift_history(self, days: int = 7) -> List[Dict]: | |
| """ | |
| Get drift history for last N days | |
| Args: | |
| days: Number of days to retrieve | |
| Returns: | |
| List of drift detection results | |
| """ | |
| cutoff = datetime.now() - timedelta(days=days) | |
| return [ | |
| d for d in self.drift_history | |
| if datetime.fromisoformat(d["timestamp"]) >= cutoff | |
| ] | |