Spaces:
Sleeping
Sleeping
File size: 7,627 Bytes
1aa566a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 | """End-to-end demo: runs the full self-healing ML lifecycle in-process.
No API server required. Demonstrates:
1. Data generation
2. Initial model training
3. Drift injection
4. Drift detection (PSI + KS)
5. Root-cause analysis
6. Retraining trigger evaluation
7. Challenger training and promotion
Usage:
python scripts/demo.py
"""
from __future__ import annotations
import json
import sys
import time
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
import numpy as np
import pandas as pd
from src.data.generator import TaxiDataGenerator
from src.data.drift_simulator import DriftSimulator
from src.data.preprocessing import Preprocessor
from src.models.trainer import ModelTrainer
from src.models.registry import ModelRegistry
from src.models.evaluator import ModelEvaluator
from src.monitoring.drift_detector import DriftDetector
from src.monitoring.performance_monitor import PerformanceMonitor
from src.monitoring.root_cause_analyzer import RootCauseAnalyzer
from src.retraining.trigger import RetrainingTrigger
from src.retraining.pipeline import RetrainingPipeline
from src.utils.logging_config import get_logger
log = get_logger("demo")
SEP = "-" * 60
def section(title: str) -> None:
print(f"\n{SEP}")
print(f" {title}")
print(SEP)
def main() -> None:
print(SEP)
print(" SELF-HEALING ML SYSTEM — DEMO")
print(SEP)
gen = TaxiDataGenerator(random_seed=42)
simulator = DriftSimulator(random_seed=99)
preprocessor = Preprocessor()
section("STEP 1: Generate Training Data")
train_df = gen.generate(n_samples=8000)
ref_df = gen.generate_reference(n_samples=2000)
print(f" Training samples : {len(train_df):,}")
print(f" Reference samples: {len(ref_df):,}")
section("STEP 2: Train Initial Champion Model")
trainer = ModelTrainer()
result = trainer.train(train_df, run_name="demo_champion")
champion_model = result["model"]
champion_metrics = result["metrics"]
print(f" RMSE : {champion_metrics['rmse']}")
print(f" MAE : {champion_metrics['mae']}")
print(f" R2 : {champion_metrics['r2']}")
print(f" run_id: {result['run_id']}")
registry = ModelRegistry()
registry.save_champion(champion_model, {"metrics": champion_metrics, "run_id": result["run_id"]})
section("STEP 3: Set Up Monitoring")
detector = DriftDetector(reference_df=ref_df)
monitor = PerformanceMonitor()
rca = RootCauseAnalyzer(model=champion_model, feature_names=preprocessor.feature_names())
monitor.set_baseline_rmse(champion_metrics["rmse"])
print(f" Baseline RMSE: {champion_metrics['rmse']}")
pre_drift_df = gen.generate(n_samples=500)
X_pre, y_pre = preprocessor.transform_with_target(pre_drift_df)
preds_pre = champion_model.predict(X_pre)
for i, (pred, actual) in enumerate(zip(preds_pre, y_pre)):
rid = f"pre_{i}"
monitor.log_prediction(rid, float(pred), {})
monitor.log_ground_truth(rid, float(actual))
pre_metrics = monitor.compute_metrics()
print(f" Pre-drift RMSE: {pre_metrics['rmse'] if pre_metrics else 'N/A'}")
section("STEP 4: Inject Drift")
live_df = gen.generate(n_samples=1000)
drifted_df = simulator.apply(
live_df[preprocessor.feature_names()],
drift_type="sudden",
severity=1.5,
step=500,
total_steps=1000,
)
print(" Drift type: sudden")
for feat in ["trip_distance", "pickup_hour", "passenger_count"]:
if feat in live_df.columns and feat in drifted_df.columns:
orig_mean = live_df[feat].mean()
drift_mean = drifted_df[feat].mean()
print(f" {feat:<25} mean: {orig_mean:.3f} -> {drift_mean:.3f} (delta={drift_mean - orig_mean:+.3f})")
X_drifted = drifted_df.reindex(columns=preprocessor.feature_names(), fill_value=0)
y_drifted = live_df["trip_duration_min"].values
preds_post = champion_model.predict(X_drifted)
monitor2 = PerformanceMonitor()
monitor2.set_baseline_rmse(champion_metrics["rmse"])
for i, (pred, actual) in enumerate(zip(preds_post, y_drifted)):
rid = f"post_{i}"
monitor2.log_prediction(rid, float(pred), {})
monitor2.log_ground_truth(rid, float(actual))
post_metrics = monitor2.compute_metrics()
print(f"\n Post-drift RMSE: {post_metrics['rmse'] if post_metrics else 'N/A'}")
section("STEP 5: Drift Detection")
feature_drift_report = detector.detect_feature_drift(
drifted_df.reindex(columns=preprocessor.feature_names(), fill_value=0),
features=preprocessor.feature_names(),
)
perf_drift_report = detector.detect_performance_drift(
recent_rmse=post_metrics["rmse"] if post_metrics else 999,
baseline_rmse=champion_metrics["rmse"],
)
print(f" Feature drift detected : {feature_drift_report['drift_detected']}")
print(f" Drifted features : {feature_drift_report['drifted_features']}")
print(f" Performance degraded : {perf_drift_report['drift_detected']}")
if post_metrics:
pct = (post_metrics["rmse"] - champion_metrics["rmse"]) / champion_metrics["rmse"] * 100
print(f" Performance drop : {pct:.1f}%")
section("STEP 6: Root-Cause Analysis")
rca_result = rca.analyze(feature_drift_report)
print(f" Primary cause : {rca_result['primary_cause']}")
print(f" Action : {rca_result['action_recommended']}")
print(f" Explanation:\n {rca_result['explanation'][:300]}")
section("STEP 7: Retraining Decision")
trigger = RetrainingTrigger()
decision = trigger.should_retrain(
feature_drift_report=feature_drift_report,
performance_report=perf_drift_report,
samples_since_last_retrain=2000,
)
print(f" should_retrain : {decision['should_retrain']}")
print(f" Reasons : {decision['reasons']}")
print(f" Blocking : {decision['blocking_reasons']}")
section("STEP 8: Retrain and Promote")
retrain_df = drifted_df.copy()
retrain_df["trip_duration_min"] = y_drifted
pipeline = RetrainingPipeline()
retrain_result = pipeline.run(
training_df=retrain_df.reindex(
columns=preprocessor.feature_names() + ["trip_duration_min"], fill_value=0
),
eval_df=pre_drift_df.sample(200, random_state=42),
rca_report=rca_result,
tags={"trigger": "demo"},
)
print(f" Promoted : {retrain_result['promoted']}")
print(f" Improvement : {retrain_result['improvement_pct']:.2f}%")
print(f" Champion RMSE : {retrain_result['champion_metrics'].get('rmse', 'N/A')}")
print(f" Challenger RMSE : {retrain_result['challenger_metrics'].get('rmse', 'N/A')}")
section("STEP 9: Example API Response")
api_response = {
"drift_detected": feature_drift_report["drift_detected"],
"root_cause": feature_drift_report["drifted_features"][:3],
"performance_drop": f"{pct:.1f}%" if post_metrics else "N/A",
"action": rca_result["action_recommended"],
"rca_details": rca_result["root_causes"][:3],
"retrain_triggered": decision["should_retrain"],
"model_promoted": retrain_result["promoted"],
"new_model_improvement": f"{retrain_result['improvement_pct']:.2f}%",
}
print(json.dumps(api_response, indent=2, default=str))
print(f"\n{SEP}")
print(" DEMO COMPLETE")
print(f" uvicorn app:app --reload")
print(f" python -m streamlit run dashboard/app.py")
print(SEP)
if __name__ == "__main__":
main()
|