|
|
""" |
|
|
Data Drift Detection using Scipy KS Test. |
|
|
Detects distribution shifts between baseline and new data. |
|
|
""" |
|
|
|
|
|
import pickle |
|
|
import json |
|
|
import requests |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from pathlib import Path |
|
|
from datetime import datetime |
|
|
from scipy.stats import ks_2samp |
|
|
from typing import Dict, Tuple |
|
|
|
|
|
|
|
|
PROJECT_ROOT = Path(__file__).parent.parent.parent.parent |
|
|
BASELINE_DIR = Path(__file__).parent.parent / "baseline" |
|
|
REPORTS_DIR = Path(__file__).parent.parent / "reports" |
|
|
REPORTS_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
PUSHGATEWAY_URL = "http://localhost:9091" |
|
|
P_VALUE_THRESHOLD = 0.05 |
|
|
|
|
|
|
|
|
def load_baseline() -> np.ndarray: |
|
|
"""Load reference/baseline data.""" |
|
|
baseline_path = BASELINE_DIR / "reference_data.pkl" |
|
|
|
|
|
if not baseline_path.exists(): |
|
|
raise FileNotFoundError( |
|
|
f"Baseline data not found at {baseline_path}\n" |
|
|
f"Run `python prepare_baseline.py` first!" |
|
|
) |
|
|
|
|
|
with open(baseline_path, 'rb') as f: |
|
|
X_baseline = pickle.load(f) |
|
|
|
|
|
print(f"Loaded baseline data: {X_baseline.shape}") |
|
|
return X_baseline |
|
|
|
|
|
|
|
|
def load_new_data() -> np.ndarray: |
|
|
""" |
|
|
Load new/production data to check for drift. |
|
|
|
|
|
In production, this would fetch from: |
|
|
- Database |
|
|
- S3 bucket |
|
|
- API logs |
|
|
- Data lake |
|
|
|
|
|
For now, simulate or load from file. |
|
|
""" |
|
|
|
|
|
|
|
|
data_path = PROJECT_ROOT / "data" / "test.csv" |
|
|
if data_path.exists(): |
|
|
df = pd.read_csv(data_path) |
|
|
|
|
|
feature_columns = [col for col in df.columns if col not in ['label', 'id', 'timestamp']] |
|
|
X_new = df[feature_columns].values[:500] |
|
|
print(f"Loaded new data from file: {X_new.shape}") |
|
|
return X_new |
|
|
|
|
|
|
|
|
print("Simulating new data (no test file found)") |
|
|
X_baseline = load_baseline() |
|
|
|
|
|
X_new = X_baseline[:500] + np.random.normal(0, 0.1, (500, X_baseline.shape[1])) |
|
|
return X_new |
|
|
|
|
|
|
|
|
def run_drift_detection(X_baseline: np.ndarray, X_new: np.ndarray) -> Dict: |
|
|
""" |
|
|
Run Kolmogorov-Smirnov drift detection using scipy. |
|
|
|
|
|
Args: |
|
|
X_baseline: Reference data |
|
|
X_new: New data to check |
|
|
|
|
|
Returns: |
|
|
Drift detection results |
|
|
""" |
|
|
print("\n" + "=" * 60) |
|
|
print("Running Drift Detection (Kolmogorov-Smirnov Test)") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
p_values = [] |
|
|
distances = [] |
|
|
|
|
|
for i in range(X_baseline.shape[1]): |
|
|
statistic, p_value = ks_2samp(X_baseline[:, i], X_new[:, i]) |
|
|
p_values.append(p_value) |
|
|
distances.append(statistic) |
|
|
|
|
|
|
|
|
min_p_value = np.min(p_values) |
|
|
max_distance = np.max(distances) |
|
|
|
|
|
|
|
|
adjusted_threshold = P_VALUE_THRESHOLD / X_baseline.shape[1] |
|
|
drift_detected = min_p_value < adjusted_threshold |
|
|
|
|
|
|
|
|
results = { |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"drift_detected": int(drift_detected), |
|
|
"p_value": float(min_p_value), |
|
|
"threshold": adjusted_threshold, |
|
|
"distance": float(max_distance), |
|
|
"baseline_samples": X_baseline.shape[0], |
|
|
"new_samples": X_new.shape[0], |
|
|
"num_features": X_baseline.shape[1] |
|
|
} |
|
|
|
|
|
|
|
|
print(f"\nResults:") |
|
|
print(f" Drift Detected: {'YES' if results['drift_detected'] else 'NO'}") |
|
|
print(f" P-Value: {results['p_value']:.6f} (adjusted threshold: {adjusted_threshold:.6f})") |
|
|
print(f" Distance: {results['distance']:.6f}") |
|
|
print(f" Baseline: {X_baseline.shape[0]} samples") |
|
|
print(f" New Data: {X_new.shape[0]} samples") |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
def save_report(results: Dict): |
|
|
"""Save drift detection report to file.""" |
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
report_path = REPORTS_DIR / f"drift_report_{timestamp}.json" |
|
|
|
|
|
with open(report_path, 'w') as f: |
|
|
json.dump(results, f, indent=2) |
|
|
|
|
|
print(f"\nReport saved to: {report_path}") |
|
|
|
|
|
|
|
|
def push_to_prometheus(results: Dict): |
|
|
""" |
|
|
Push drift metrics to Prometheus via Pushgateway. |
|
|
|
|
|
This allows Prometheus to scrape short-lived job metrics. |
|
|
""" |
|
|
metrics = f"""# TYPE drift_detected gauge |
|
|
# HELP drift_detected Whether data drift was detected (1=yes, 0=no) |
|
|
drift_detected {results['drift_detected']} |
|
|
|
|
|
# TYPE drift_p_value gauge |
|
|
# HELP drift_p_value P-value from drift detection test |
|
|
drift_p_value {results['p_value']} |
|
|
|
|
|
# TYPE drift_distance gauge |
|
|
# HELP drift_distance Statistical distance between distributions |
|
|
drift_distance {results['distance']} |
|
|
|
|
|
# TYPE drift_check_timestamp gauge |
|
|
# HELP drift_check_timestamp Unix timestamp of last drift check |
|
|
drift_check_timestamp {datetime.now().timestamp()} |
|
|
""" |
|
|
|
|
|
try: |
|
|
response = requests.post( |
|
|
f"{PUSHGATEWAY_URL}/metrics/job/drift_detection/instance/hopcroft", |
|
|
data=metrics, |
|
|
headers={'Content-Type': 'text/plain'} |
|
|
) |
|
|
response.raise_for_status() |
|
|
print(f"Metrics pushed to Pushgateway at {PUSHGATEWAY_URL}") |
|
|
except requests.exceptions.RequestException as e: |
|
|
print(f"Failed to push to Pushgateway: {e}") |
|
|
print(f" Make sure Pushgateway is running: docker compose ps pushgateway") |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main execution.""" |
|
|
print("\n" + "=" * 60) |
|
|
print("Hopcroft Data Drift Detection") |
|
|
print("=" * 60) |
|
|
|
|
|
try: |
|
|
|
|
|
X_baseline = load_baseline() |
|
|
X_new = load_new_data() |
|
|
|
|
|
|
|
|
results = run_drift_detection(X_baseline, X_new) |
|
|
|
|
|
|
|
|
save_report(results) |
|
|
|
|
|
|
|
|
push_to_prometheus(results) |
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("Drift Detection Complete!") |
|
|
print("=" * 60) |
|
|
|
|
|
if results['drift_detected']: |
|
|
print("\nWARNING: Data drift detected!") |
|
|
print(f" P-value: {results['p_value']:.6f} < {P_VALUE_THRESHOLD}") |
|
|
return 1 |
|
|
else: |
|
|
print("\nNo significant drift detected") |
|
|
return 0 |
|
|
|
|
|
except Exception as e: |
|
|
print(f"\nError: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return 1 |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
exit(main()) |