| """ |
| Concept Shift Simulator for ML pipeline testing |
| """ |
| import numpy as np |
| import pandas as pd |
| from sklearn.linear_model import LogisticRegression |
| from sklearn.model_selection import train_test_split |
| from sklearn.metrics import accuracy_score |
| import time |
| import json |
| from datetime import datetime |
|
|
| class ConceptShiftSimulator: |
| """Simulates concept shift (relationship changes) in ML data""" |
| |
| def __init__(self, seed=42): |
| self.seed = seed |
| np.random.seed(seed) |
| self.results = [] |
| |
| def generate_synthetic_data(self, n_samples=1000, n_features=10): |
| """Generate synthetic classification data""" |
| X = np.random.randn(n_samples, n_features) |
| |
| |
| coefficients = np.random.randn(n_features) |
| coefficients = coefficients / np.linalg.norm(coefficients) |
| |
| |
| logits = X @ coefficients |
| probabilities = 1 / (1 + np.exp(-logits)) |
| y = (probabilities > 0.5).astype(int) |
| |
| |
| noise_mask = np.random.random(n_samples) < 0.1 |
| y[noise_mask] = 1 - y[noise_mask] |
| |
| return X, y, coefficients |
| |
| def induce_concept_shift(self, X, coefficients, shift_type='abrupt', magnitude=0.3): |
| """Induce concept shift by changing feature relationships""" |
| n_samples, n_features = X.shape |
| |
| if shift_type == 'abrupt': |
| |
| new_coefficients = coefficients.copy() |
| |
| new_coefficients[:int(len(new_coefficients) * magnitude)] *= -1 |
| |
| elif shift_type == 'gradual': |
| |
| new_coefficients = coefficients.copy() |
| flip_indices = np.random.choice( |
| n_features, |
| size=int(n_features * magnitude), |
| replace=False |
| ) |
| for idx in flip_indices: |
| new_coefficients[idx] = -coefficients[idx] * magnitude |
| |
| elif shift_type == 'recurring': |
| |
| new_coefficients = coefficients.copy() |
| oscillation = np.sin(np.arange(n_features) * 0.5) * magnitude |
| new_coefficients *= (1 + oscillation) |
| |
| |
| logits = X @ new_coefficients |
| probabilities = 1 / (1 + np.exp(-logits)) |
| y_shifted = (probabilities > 0.5).astype(int) |
| |
| |
| noise_mask = np.random.random(n_samples) < 0.1 |
| y_shifted[noise_mask] = 1 - y_shifted[noise_mask] |
| |
| return X, y_shifted, new_coefficients |
| |
| def train_model(self, X_train, y_train): |
| """Train a simple classifier""" |
| model = LogisticRegression(random_state=self.seed, max_iter=1000) |
| model.fit(X_train, y_train) |
| return model |
| |
| def evaluate_shift(self, model, X_test, y_original, y_shifted): |
| """Evaluate impact of concept shift""" |
| |
| y_pred_original = model.predict(X_test) |
| y_pred_shifted = model.predict(X_test) |
| |
| |
| acc_original = accuracy_score(y_original, y_pred_original) |
| acc_shifted = accuracy_score(y_shifted, y_pred_shifted) |
| |
| accuracy_drop = acc_original - acc_shifted |
| |
| |
| drift_score = np.abs(y_pred_original - y_pred_shifted).mean() |
| |
| return { |
| 'accuracy_original': acc_original, |
| 'accuracy_shifted': acc_shifted, |
| 'accuracy_drop': accuracy_drop, |
| 'drift_score': drift_score, |
| 'shift_detected': accuracy_drop > 0.1 |
| } |
| |
| def run_experiment(self, shift_type='abrupt', magnitude=0.3): |
| """Run a complete concept shift experiment""" |
| print(f"\n🔍 Running Concept Shift Experiment:") |
| print(f" Type: {shift_type}") |
| print(f" Magnitude: {magnitude}") |
| |
| |
| X, y_original, coefficients = self.generate_synthetic_data() |
| X_train, X_test, y_train, y_test = train_test_split( |
| X, y_original, test_size=0.3, random_state=self.seed |
| ) |
| |
| |
| start_time = time.time() |
| model = self.train_model(X_train, y_train) |
| training_time = (time.time() - start_time) * 1000 |
| |
| |
| X_test_shifted, y_test_shifted, _ = self.induce_concept_shift( |
| X_test, coefficients, shift_type, magnitude |
| ) |
| |
| |
| start_time = time.time() |
| evaluation = self.evaluate_shift(model, X_test, y_test, y_test_shifted) |
| evaluation_time = (time.time() - start_time) * 1000 |
| |
| |
| result = { |
| 'shift_type': shift_type, |
| 'magnitude': magnitude, |
| 'training_time_ms': training_time, |
| 'evaluation_time_ms': evaluation_time, |
| **evaluation |
| } |
| |
| self.results.append(result) |
| |
| |
| print(f"📊 Results:") |
| print(f" Accuracy drop: {result['accuracy_drop']:.3f}") |
| print(f" Drift score: {result['drift_score']:.3f}") |
| print(f" Shift detected: {result['shift_detected']}") |
| print(f" Training time: {training_time:.2f} ms") |
| print(f" Evaluation time: {evaluation_time:.2f} ms") |
| |
| return result |
| |
| def run_comprehensive_test(self): |
| """Run multiple concept shift scenarios""" |
| print("🧪 Running comprehensive concept shift experiments...") |
| |
| scenarios = [ |
| ('abrupt', 0.1), |
| ('abrupt', 0.3), |
| ('abrupt', 0.5), |
| ('gradual', 0.2), |
| ('gradual', 0.4), |
| ('recurring', 0.3) |
| ] |
| |
| for shift_type, magnitude in scenarios: |
| try: |
| self.run_experiment(shift_type, magnitude) |
| except Exception as e: |
| print(f"❌ Error in scenario ({shift_type}, {magnitude}): {e}") |
| |
| continue |
| |
| return self.generate_summary_report() |
| |
| def generate_summary_report(self): |
| """Generate comprehensive experiment report""" |
| if not self.results: |
| print("⚠️ No results to summarize") |
| return None |
| |
| print("\n" + "="*60) |
| print("📈 CONCEPT SHIFT EXPERIMENT SUMMARY") |
| print("="*60) |
| |
| |
| detected = [r for r in self.results if r['shift_detected']] |
| not_detected = [r for r in self.results if not r['shift_detected']] |
| |
| print(f"\nDetection Performance:") |
| print(f" Shifts detected: {len(detected)}/{len(self.results)} scenarios") |
| |
| if detected: |
| avg_accuracy_drop = np.mean([r['accuracy_drop'] for r in detected]) |
| avg_drift_score = np.mean([r['drift_score'] for r in detected]) |
| avg_eval_time = np.mean([r['evaluation_time_ms'] for r in detected]) |
| |
| print(f"\n📊 Metrics (detected scenarios):") |
| print(f" Average accuracy drop: {avg_accuracy_drop:.3f}") |
| print(f" Average drift score: {avg_drift_score:.3f}") |
| print(f" Average evaluation time: {avg_eval_time:.2f} ms") |
| |
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| output_file = f"concept_shift_results_{timestamp}.json" |
| |
| with open(output_file, 'w') as f: |
| json.dump({ |
| 'timestamp': datetime.now().isoformat(), |
| 'scenarios': len(self.results), |
| 'detected': len(detected), |
| 'results': self.results, |
| 'summary': { |
| 'detection_rate': len(detected)/len(self.results) if self.results else 0, |
| 'avg_accuracy_drop': float(avg_accuracy_drop) if detected else 0, |
| 'avg_drift_score': float(avg_drift_score) if detected else 0 |
| } |
| }, f, indent=2, default=str) |
| |
| print(f"\n📁 Detailed results saved to: {output_file}") |
| |
| return { |
| 'detection_rate': len(detected)/len(self.results) if self.results else 0, |
| 'avg_accuracy_drop': avg_accuracy_drop if detected else 0, |
| 'avg_drift_score': avg_drift_score if detected else 0, |
| 'output_file': output_file |
| } |
|
|
| def main(): |
| """Command-line interface""" |
| import argparse |
| |
| parser = argparse.ArgumentParser(description='Run concept shift experiments') |
| parser.add_argument('--scenario', type=str, default='comprehensive', |
| choices=['abrupt', 'gradual', 'recurring', 'comprehensive'], |
| help='Concept shift scenario to run') |
| parser.add_argument('--magnitude', type=float, default=0.3, |
| help='Shift magnitude (0.1 to 0.5)') |
| |
| args = parser.parse_args() |
| |
| simulator = ConceptShiftSimulator() |
| |
| if args.scenario == 'comprehensive': |
| print("🧪 Running comprehensive concept shift experiments...") |
| simulator.run_comprehensive_test() |
| else: |
| print(f"🧪 Running {args.scenario} concept shift experiment...") |
| simulator.run_experiment(args.scenario, args.magnitude) |
|
|
| if __name__ == "__main__": |
| main()
|
|
|