Spaces:
Running
Running
| """End-to-end demo: runs the full self-healing ML lifecycle in-process. | |
| No API server required. Demonstrates: | |
| 1. Data generation | |
| 2. Initial model training | |
| 3. Drift injection | |
| 4. Drift detection (PSI + KS) | |
| 5. Root-cause analysis | |
| 6. Retraining trigger evaluation | |
| 7. Challenger training and promotion | |
| Usage: | |
| python scripts/demo.py | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import sys | |
| import time | |
| from pathlib import Path | |
| sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) | |
| import numpy as np | |
| import pandas as pd | |
| from src.data.generator import TaxiDataGenerator | |
| from src.data.drift_simulator import DriftSimulator | |
| from src.data.preprocessing import Preprocessor | |
| from src.models.trainer import ModelTrainer | |
| from src.models.registry import ModelRegistry | |
| from src.models.evaluator import ModelEvaluator | |
| from src.monitoring.drift_detector import DriftDetector | |
| from src.monitoring.performance_monitor import PerformanceMonitor | |
| from src.monitoring.root_cause_analyzer import RootCauseAnalyzer | |
| from src.retraining.trigger import RetrainingTrigger | |
| from src.retraining.pipeline import RetrainingPipeline | |
| from src.utils.logging_config import get_logger | |
| log = get_logger("demo") | |
| SEP = "-" * 60 | |
| def section(title: str) -> None: | |
| print(f"\n{SEP}") | |
| print(f" {title}") | |
| print(SEP) | |
| def main() -> None: | |
| print(SEP) | |
| print(" SELF-HEALING ML SYSTEM — DEMO") | |
| print(SEP) | |
| gen = TaxiDataGenerator(random_seed=42) | |
| simulator = DriftSimulator(random_seed=99) | |
| preprocessor = Preprocessor() | |
| section("STEP 1: Generate Training Data") | |
| train_df = gen.generate(n_samples=8000) | |
| ref_df = gen.generate_reference(n_samples=2000) | |
| print(f" Training samples : {len(train_df):,}") | |
| print(f" Reference samples: {len(ref_df):,}") | |
| section("STEP 2: Train Initial Champion Model") | |
| trainer = ModelTrainer() | |
| result = trainer.train(train_df, run_name="demo_champion") | |
| champion_model = result["model"] | |
| champion_metrics = result["metrics"] | |
| print(f" RMSE : {champion_metrics['rmse']}") | |
| print(f" MAE : {champion_metrics['mae']}") | |
| print(f" R2 : {champion_metrics['r2']}") | |
| print(f" run_id: {result['run_id']}") | |
| registry = ModelRegistry() | |
| registry.save_champion(champion_model, {"metrics": champion_metrics, "run_id": result["run_id"]}) | |
| section("STEP 3: Set Up Monitoring") | |
| detector = DriftDetector(reference_df=ref_df) | |
| monitor = PerformanceMonitor() | |
| rca = RootCauseAnalyzer(model=champion_model, feature_names=preprocessor.feature_names()) | |
| monitor.set_baseline_rmse(champion_metrics["rmse"]) | |
| print(f" Baseline RMSE: {champion_metrics['rmse']}") | |
| pre_drift_df = gen.generate(n_samples=500) | |
| X_pre, y_pre = preprocessor.transform_with_target(pre_drift_df) | |
| preds_pre = champion_model.predict(X_pre) | |
| for i, (pred, actual) in enumerate(zip(preds_pre, y_pre)): | |
| rid = f"pre_{i}" | |
| monitor.log_prediction(rid, float(pred), {}) | |
| monitor.log_ground_truth(rid, float(actual)) | |
| pre_metrics = monitor.compute_metrics() | |
| print(f" Pre-drift RMSE: {pre_metrics['rmse'] if pre_metrics else 'N/A'}") | |
| section("STEP 4: Inject Drift") | |
| live_df = gen.generate(n_samples=1000) | |
| drifted_df = simulator.apply( | |
| live_df[preprocessor.feature_names()], | |
| drift_type="sudden", | |
| severity=1.5, | |
| step=500, | |
| total_steps=1000, | |
| ) | |
| print(" Drift type: sudden") | |
| for feat in ["trip_distance", "pickup_hour", "passenger_count"]: | |
| if feat in live_df.columns and feat in drifted_df.columns: | |
| orig_mean = live_df[feat].mean() | |
| drift_mean = drifted_df[feat].mean() | |
| print(f" {feat:<25} mean: {orig_mean:.3f} -> {drift_mean:.3f} (delta={drift_mean - orig_mean:+.3f})") | |
| X_drifted = drifted_df.reindex(columns=preprocessor.feature_names(), fill_value=0) | |
| y_drifted = live_df["trip_duration_min"].values | |
| preds_post = champion_model.predict(X_drifted) | |
| monitor2 = PerformanceMonitor() | |
| monitor2.set_baseline_rmse(champion_metrics["rmse"]) | |
| for i, (pred, actual) in enumerate(zip(preds_post, y_drifted)): | |
| rid = f"post_{i}" | |
| monitor2.log_prediction(rid, float(pred), {}) | |
| monitor2.log_ground_truth(rid, float(actual)) | |
| post_metrics = monitor2.compute_metrics() | |
| print(f"\n Post-drift RMSE: {post_metrics['rmse'] if post_metrics else 'N/A'}") | |
| section("STEP 5: Drift Detection") | |
| feature_drift_report = detector.detect_feature_drift( | |
| drifted_df.reindex(columns=preprocessor.feature_names(), fill_value=0), | |
| features=preprocessor.feature_names(), | |
| ) | |
| perf_drift_report = detector.detect_performance_drift( | |
| recent_rmse=post_metrics["rmse"] if post_metrics else 999, | |
| baseline_rmse=champion_metrics["rmse"], | |
| ) | |
| print(f" Feature drift detected : {feature_drift_report['drift_detected']}") | |
| print(f" Drifted features : {feature_drift_report['drifted_features']}") | |
| print(f" Performance degraded : {perf_drift_report['drift_detected']}") | |
| if post_metrics: | |
| pct = (post_metrics["rmse"] - champion_metrics["rmse"]) / champion_metrics["rmse"] * 100 | |
| print(f" Performance drop : {pct:.1f}%") | |
| section("STEP 6: Root-Cause Analysis") | |
| rca_result = rca.analyze(feature_drift_report) | |
| print(f" Primary cause : {rca_result['primary_cause']}") | |
| print(f" Action : {rca_result['action_recommended']}") | |
| print(f" Explanation:\n {rca_result['explanation'][:300]}") | |
| section("STEP 7: Retraining Decision") | |
| trigger = RetrainingTrigger() | |
| decision = trigger.should_retrain( | |
| feature_drift_report=feature_drift_report, | |
| performance_report=perf_drift_report, | |
| samples_since_last_retrain=2000, | |
| ) | |
| print(f" should_retrain : {decision['should_retrain']}") | |
| print(f" Reasons : {decision['reasons']}") | |
| print(f" Blocking : {decision['blocking_reasons']}") | |
| section("STEP 8: Retrain and Promote") | |
| retrain_df = drifted_df.copy() | |
| retrain_df["trip_duration_min"] = y_drifted | |
| pipeline = RetrainingPipeline() | |
| retrain_result = pipeline.run( | |
| training_df=retrain_df.reindex( | |
| columns=preprocessor.feature_names() + ["trip_duration_min"], fill_value=0 | |
| ), | |
| eval_df=pre_drift_df.sample(200, random_state=42), | |
| rca_report=rca_result, | |
| tags={"trigger": "demo"}, | |
| ) | |
| print(f" Promoted : {retrain_result['promoted']}") | |
| print(f" Improvement : {retrain_result['improvement_pct']:.2f}%") | |
| print(f" Champion RMSE : {retrain_result['champion_metrics'].get('rmse', 'N/A')}") | |
| print(f" Challenger RMSE : {retrain_result['challenger_metrics'].get('rmse', 'N/A')}") | |
| section("STEP 9: Example API Response") | |
| api_response = { | |
| "drift_detected": feature_drift_report["drift_detected"], | |
| "root_cause": feature_drift_report["drifted_features"][:3], | |
| "performance_drop": f"{pct:.1f}%" if post_metrics else "N/A", | |
| "action": rca_result["action_recommended"], | |
| "rca_details": rca_result["root_causes"][:3], | |
| "retrain_triggered": decision["should_retrain"], | |
| "model_promoted": retrain_result["promoted"], | |
| "new_model_improvement": f"{retrain_result['improvement_pct']:.2f}%", | |
| } | |
| print(json.dumps(api_response, indent=2, default=str)) | |
| print(f"\n{SEP}") | |
| print(" DEMO COMPLETE") | |
| print(f" uvicorn app:app --reload") | |
| print(f" python -m streamlit run dashboard/app.py") | |
| print(SEP) | |
| if __name__ == "__main__": | |
| main() | |