""" Data Drift Detection using Scipy KS Test. Detects distribution shifts between baseline and new data. """ import pickle import json import requests import numpy as np import pandas as pd from pathlib import Path from datetime import datetime from scipy.stats import ks_2samp from typing import Dict, Tuple # Configuration PROJECT_ROOT = Path(__file__).parent.parent.parent.parent BASELINE_DIR = Path(__file__).parent.parent / "baseline" REPORTS_DIR = Path(__file__).parent.parent / "reports" REPORTS_DIR.mkdir(parents=True, exist_ok=True) PUSHGATEWAY_URL = "http://localhost:9091" P_VALUE_THRESHOLD = 0.05 # Significance level def load_baseline() -> np.ndarray: """Load reference/baseline data.""" baseline_path = BASELINE_DIR / "reference_data.pkl" if not baseline_path.exists(): raise FileNotFoundError( f"Baseline data not found at {baseline_path}\n" f"Run `python prepare_baseline.py` first!" ) with open(baseline_path, 'rb') as f: X_baseline = pickle.load(f) print(f"Loaded baseline data: {X_baseline.shape}") return X_baseline def load_new_data() -> np.ndarray: """ Load new/production data to check for drift. In production, this would fetch from: - Database - S3 bucket - API logs - Data lake For now, simulate or load from file. """ # Option 1: Load from file data_path = PROJECT_ROOT / "data" / "test.csv" if data_path.exists(): df = pd.read_csv(data_path) # Extract same features as baseline feature_columns = [col for col in df.columns if col not in ['label', 'id', 'timestamp']] X_new = df[feature_columns].values[:500] # Take 500 samples print(f"Loaded new data from file: {X_new.shape}") return X_new # Option 2: Simulate (for testing) print("Simulating new data (no test file found)") X_baseline = load_baseline() # Add slight shift to simulate drift X_new = X_baseline[:500] + np.random.normal(0, 0.1, (500, X_baseline.shape[1])) return X_new def run_drift_detection(X_baseline: np.ndarray, X_new: np.ndarray) -> Dict: """ Run Kolmogorov-Smirnov drift detection using scipy. Args: X_baseline: Reference data X_new: New data to check Returns: Drift detection results """ print("\n" + "=" * 60) print("Running Drift Detection (Kolmogorov-Smirnov Test)") print("=" * 60) # Run KS test for each feature p_values = [] distances = [] for i in range(X_baseline.shape[1]): statistic, p_value = ks_2samp(X_baseline[:, i], X_new[:, i]) p_values.append(p_value) distances.append(statistic) # Aggregate results min_p_value = np.min(p_values) max_distance = np.max(distances) # Apply Bonferroni correction for multiple testing adjusted_threshold = P_VALUE_THRESHOLD / X_baseline.shape[1] drift_detected = min_p_value < adjusted_threshold # Extract results results = { "timestamp": datetime.now().isoformat(), "drift_detected": int(drift_detected), "p_value": float(min_p_value), "threshold": adjusted_threshold, "distance": float(max_distance), "baseline_samples": X_baseline.shape[0], "new_samples": X_new.shape[0], "num_features": X_baseline.shape[1] } # Print results print(f"\nResults:") print(f" Drift Detected: {'YES' if results['drift_detected'] else 'NO'}") print(f" P-Value: {results['p_value']:.6f} (adjusted threshold: {adjusted_threshold:.6f})") print(f" Distance: {results['distance']:.6f}") print(f" Baseline: {X_baseline.shape[0]} samples") print(f" New Data: {X_new.shape[0]} samples") return results def save_report(results: Dict): """Save drift detection report to file.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") report_path = REPORTS_DIR / f"drift_report_{timestamp}.json" with open(report_path, 'w') as f: json.dump(results, f, indent=2) print(f"\nReport saved to: {report_path}") def push_to_prometheus(results: Dict): """ Push drift metrics to Prometheus via Pushgateway. This allows Prometheus to scrape short-lived job metrics. """ metrics = f"""# TYPE drift_detected gauge # HELP drift_detected Whether data drift was detected (1=yes, 0=no) drift_detected {results['drift_detected']} # TYPE drift_p_value gauge # HELP drift_p_value P-value from drift detection test drift_p_value {results['p_value']} # TYPE drift_distance gauge # HELP drift_distance Statistical distance between distributions drift_distance {results['distance']} # TYPE drift_check_timestamp gauge # HELP drift_check_timestamp Unix timestamp of last drift check drift_check_timestamp {datetime.now().timestamp()} """ try: response = requests.post( f"{PUSHGATEWAY_URL}/metrics/job/drift_detection/instance/hopcroft", data=metrics, headers={'Content-Type': 'text/plain'} ) response.raise_for_status() print(f"Metrics pushed to Pushgateway at {PUSHGATEWAY_URL}") except requests.exceptions.RequestException as e: print(f"Failed to push to Pushgateway: {e}") print(f" Make sure Pushgateway is running: docker compose ps pushgateway") def main(): """Main execution.""" print("\n" + "=" * 60) print("Hopcroft Data Drift Detection") print("=" * 60) try: # Load data X_baseline = load_baseline() X_new = load_new_data() # Run drift detection results = run_drift_detection(X_baseline, X_new) # Save report save_report(results) # Push to Prometheus push_to_prometheus(results) print("\n" + "=" * 60) print("Drift Detection Complete!") print("=" * 60) if results['drift_detected']: print("\nWARNING: Data drift detected!") print(f" P-value: {results['p_value']:.6f} < {P_VALUE_THRESHOLD}") return 1 else: print("\nNo significant drift detected") return 0 except Exception as e: print(f"\nError: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": exit(main())