| | """ |
| | Data Drift Detection using Scipy KS Test. |
| | Detects distribution shifts between baseline and new data. |
| | """ |
| |
|
| | import pickle |
| | import json |
| | import requests |
| | import numpy as np |
| | import pandas as pd |
| | from pathlib import Path |
| | from datetime import datetime |
| | from scipy.stats import ks_2samp |
| | from typing import Dict, Tuple |
| |
|
| | |
| | PROJECT_ROOT = Path(__file__).parent.parent.parent.parent |
| | BASELINE_DIR = Path(__file__).parent.parent / "baseline" |
| | REPORTS_DIR = Path(__file__).parent.parent / "reports" |
| | REPORTS_DIR.mkdir(parents=True, exist_ok=True) |
| |
|
| | PUSHGATEWAY_URL = "http://localhost:9091" |
| | P_VALUE_THRESHOLD = 0.05 |
| |
|
| |
|
| | def load_baseline() -> np.ndarray: |
| | """Load reference/baseline data.""" |
| | baseline_path = BASELINE_DIR / "reference_data.pkl" |
| | |
| | if not baseline_path.exists(): |
| | raise FileNotFoundError( |
| | f"Baseline data not found at {baseline_path}\n" |
| | f"Run `python prepare_baseline.py` first!" |
| | ) |
| | |
| | with open(baseline_path, 'rb') as f: |
| | X_baseline = pickle.load(f) |
| | |
| | print(f"Loaded baseline data: {X_baseline.shape}") |
| | return X_baseline |
| |
|
| |
|
| | def load_new_data() -> np.ndarray: |
| | """ |
| | Load new/production data to check for drift. |
| | |
| | In production, this would fetch from: |
| | - Database |
| | - S3 bucket |
| | - API logs |
| | - Data lake |
| | |
| | For now, simulate or load from file. |
| | """ |
| | |
| | |
| | data_path = PROJECT_ROOT / "data" / "test.csv" |
| | if data_path.exists(): |
| | df = pd.read_csv(data_path) |
| | |
| | feature_columns = [col for col in df.columns if col not in ['label', 'id', 'timestamp']] |
| | X_new = df[feature_columns].values[:500] |
| | print(f"Loaded new data from file: {X_new.shape}") |
| | return X_new |
| | |
| | |
| | print("Simulating new data (no test file found)") |
| | X_baseline = load_baseline() |
| | |
| | X_new = X_baseline[:500] + np.random.normal(0, 0.1, (500, X_baseline.shape[1])) |
| | return X_new |
| |
|
| |
|
| | def run_drift_detection(X_baseline: np.ndarray, X_new: np.ndarray) -> Dict: |
| | """ |
| | Run Kolmogorov-Smirnov drift detection using scipy. |
| | |
| | Args: |
| | X_baseline: Reference data |
| | X_new: New data to check |
| | |
| | Returns: |
| | Drift detection results |
| | """ |
| | print("\n" + "=" * 60) |
| | print("Running Drift Detection (Kolmogorov-Smirnov Test)") |
| | print("=" * 60) |
| | |
| | |
| | p_values = [] |
| | distances = [] |
| | |
| | for i in range(X_baseline.shape[1]): |
| | statistic, p_value = ks_2samp(X_baseline[:, i], X_new[:, i]) |
| | p_values.append(p_value) |
| | distances.append(statistic) |
| | |
| | |
| | min_p_value = np.min(p_values) |
| | max_distance = np.max(distances) |
| | |
| | |
| | adjusted_threshold = P_VALUE_THRESHOLD / X_baseline.shape[1] |
| | drift_detected = min_p_value < adjusted_threshold |
| | |
| | |
| | results = { |
| | "timestamp": datetime.now().isoformat(), |
| | "drift_detected": int(drift_detected), |
| | "p_value": float(min_p_value), |
| | "threshold": adjusted_threshold, |
| | "distance": float(max_distance), |
| | "baseline_samples": X_baseline.shape[0], |
| | "new_samples": X_new.shape[0], |
| | "num_features": X_baseline.shape[1] |
| | } |
| | |
| | |
| | print(f"\nResults:") |
| | print(f" Drift Detected: {'YES' if results['drift_detected'] else 'NO'}") |
| | print(f" P-Value: {results['p_value']:.6f} (adjusted threshold: {adjusted_threshold:.6f})") |
| | print(f" Distance: {results['distance']:.6f}") |
| | print(f" Baseline: {X_baseline.shape[0]} samples") |
| | print(f" New Data: {X_new.shape[0]} samples") |
| | |
| | return results |
| |
|
| |
|
| | def save_report(results: Dict): |
| | """Save drift detection report to file.""" |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | report_path = REPORTS_DIR / f"drift_report_{timestamp}.json" |
| | |
| | with open(report_path, 'w') as f: |
| | json.dump(results, f, indent=2) |
| | |
| | print(f"\nReport saved to: {report_path}") |
| |
|
| |
|
| | def push_to_prometheus(results: Dict): |
| | """ |
| | Push drift metrics to Prometheus via Pushgateway. |
| | |
| | This allows Prometheus to scrape short-lived job metrics. |
| | """ |
| | metrics = f"""# TYPE drift_detected gauge |
| | # HELP drift_detected Whether data drift was detected (1=yes, 0=no) |
| | drift_detected {results['drift_detected']} |
| | |
| | # TYPE drift_p_value gauge |
| | # HELP drift_p_value P-value from drift detection test |
| | drift_p_value {results['p_value']} |
| | |
| | # TYPE drift_distance gauge |
| | # HELP drift_distance Statistical distance between distributions |
| | drift_distance {results['distance']} |
| | |
| | # TYPE drift_check_timestamp gauge |
| | # HELP drift_check_timestamp Unix timestamp of last drift check |
| | drift_check_timestamp {datetime.now().timestamp()} |
| | """ |
| | |
| | try: |
| | response = requests.post( |
| | f"{PUSHGATEWAY_URL}/metrics/job/drift_detection/instance/hopcroft", |
| | data=metrics, |
| | headers={'Content-Type': 'text/plain'} |
| | ) |
| | response.raise_for_status() |
| | print(f"Metrics pushed to Pushgateway at {PUSHGATEWAY_URL}") |
| | except requests.exceptions.RequestException as e: |
| | print(f"Failed to push to Pushgateway: {e}") |
| | print(f" Make sure Pushgateway is running: docker compose ps pushgateway") |
| |
|
| |
|
| | def main(): |
| | """Main execution.""" |
| | print("\n" + "=" * 60) |
| | print("Hopcroft Data Drift Detection") |
| | print("=" * 60) |
| | |
| | try: |
| | |
| | X_baseline = load_baseline() |
| | X_new = load_new_data() |
| | |
| | |
| | results = run_drift_detection(X_baseline, X_new) |
| | |
| | |
| | save_report(results) |
| | |
| | |
| | push_to_prometheus(results) |
| | |
| | print("\n" + "=" * 60) |
| | print("Drift Detection Complete!") |
| | print("=" * 60) |
| | |
| | if results['drift_detected']: |
| | print("\nWARNING: Data drift detected!") |
| | print(f" P-value: {results['p_value']:.6f} < {P_VALUE_THRESHOLD}") |
| | return 1 |
| | else: |
| | print("\nNo significant drift detected") |
| | return 0 |
| | |
| | except Exception as e: |
| | print(f"\nError: {e}") |
| | import traceback |
| | traceback.print_exc() |
| | return 1 |
| |
|
| |
|
| | if __name__ == "__main__": |
| | exit(main()) |