argus-mlops / scripts /demo.py
hodfa840's picture
Fix scroll reset for HF Spaces double-iframe context
1aa566a
"""End-to-end demo: runs the full self-healing ML lifecycle in-process.
No API server required. Demonstrates:
1. Data generation
2. Initial model training
3. Drift injection
4. Drift detection (PSI + KS)
5. Root-cause analysis
6. Retraining trigger evaluation
7. Challenger training and promotion
Usage:
python scripts/demo.py
"""
from __future__ import annotations
import json
import sys
import time
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
import numpy as np
import pandas as pd
from src.data.generator import TaxiDataGenerator
from src.data.drift_simulator import DriftSimulator
from src.data.preprocessing import Preprocessor
from src.models.trainer import ModelTrainer
from src.models.registry import ModelRegistry
from src.models.evaluator import ModelEvaluator
from src.monitoring.drift_detector import DriftDetector
from src.monitoring.performance_monitor import PerformanceMonitor
from src.monitoring.root_cause_analyzer import RootCauseAnalyzer
from src.retraining.trigger import RetrainingTrigger
from src.retraining.pipeline import RetrainingPipeline
from src.utils.logging_config import get_logger
log = get_logger("demo")
SEP = "-" * 60
def section(title: str) -> None:
print(f"\n{SEP}")
print(f" {title}")
print(SEP)
def main() -> None:
print(SEP)
print(" SELF-HEALING ML SYSTEM — DEMO")
print(SEP)
gen = TaxiDataGenerator(random_seed=42)
simulator = DriftSimulator(random_seed=99)
preprocessor = Preprocessor()
section("STEP 1: Generate Training Data")
train_df = gen.generate(n_samples=8000)
ref_df = gen.generate_reference(n_samples=2000)
print(f" Training samples : {len(train_df):,}")
print(f" Reference samples: {len(ref_df):,}")
section("STEP 2: Train Initial Champion Model")
trainer = ModelTrainer()
result = trainer.train(train_df, run_name="demo_champion")
champion_model = result["model"]
champion_metrics = result["metrics"]
print(f" RMSE : {champion_metrics['rmse']}")
print(f" MAE : {champion_metrics['mae']}")
print(f" R2 : {champion_metrics['r2']}")
print(f" run_id: {result['run_id']}")
registry = ModelRegistry()
registry.save_champion(champion_model, {"metrics": champion_metrics, "run_id": result["run_id"]})
section("STEP 3: Set Up Monitoring")
detector = DriftDetector(reference_df=ref_df)
monitor = PerformanceMonitor()
rca = RootCauseAnalyzer(model=champion_model, feature_names=preprocessor.feature_names())
monitor.set_baseline_rmse(champion_metrics["rmse"])
print(f" Baseline RMSE: {champion_metrics['rmse']}")
pre_drift_df = gen.generate(n_samples=500)
X_pre, y_pre = preprocessor.transform_with_target(pre_drift_df)
preds_pre = champion_model.predict(X_pre)
for i, (pred, actual) in enumerate(zip(preds_pre, y_pre)):
rid = f"pre_{i}"
monitor.log_prediction(rid, float(pred), {})
monitor.log_ground_truth(rid, float(actual))
pre_metrics = monitor.compute_metrics()
print(f" Pre-drift RMSE: {pre_metrics['rmse'] if pre_metrics else 'N/A'}")
section("STEP 4: Inject Drift")
live_df = gen.generate(n_samples=1000)
drifted_df = simulator.apply(
live_df[preprocessor.feature_names()],
drift_type="sudden",
severity=1.5,
step=500,
total_steps=1000,
)
print(" Drift type: sudden")
for feat in ["trip_distance", "pickup_hour", "passenger_count"]:
if feat in live_df.columns and feat in drifted_df.columns:
orig_mean = live_df[feat].mean()
drift_mean = drifted_df[feat].mean()
print(f" {feat:<25} mean: {orig_mean:.3f} -> {drift_mean:.3f} (delta={drift_mean - orig_mean:+.3f})")
X_drifted = drifted_df.reindex(columns=preprocessor.feature_names(), fill_value=0)
y_drifted = live_df["trip_duration_min"].values
preds_post = champion_model.predict(X_drifted)
monitor2 = PerformanceMonitor()
monitor2.set_baseline_rmse(champion_metrics["rmse"])
for i, (pred, actual) in enumerate(zip(preds_post, y_drifted)):
rid = f"post_{i}"
monitor2.log_prediction(rid, float(pred), {})
monitor2.log_ground_truth(rid, float(actual))
post_metrics = monitor2.compute_metrics()
print(f"\n Post-drift RMSE: {post_metrics['rmse'] if post_metrics else 'N/A'}")
section("STEP 5: Drift Detection")
feature_drift_report = detector.detect_feature_drift(
drifted_df.reindex(columns=preprocessor.feature_names(), fill_value=0),
features=preprocessor.feature_names(),
)
perf_drift_report = detector.detect_performance_drift(
recent_rmse=post_metrics["rmse"] if post_metrics else 999,
baseline_rmse=champion_metrics["rmse"],
)
print(f" Feature drift detected : {feature_drift_report['drift_detected']}")
print(f" Drifted features : {feature_drift_report['drifted_features']}")
print(f" Performance degraded : {perf_drift_report['drift_detected']}")
if post_metrics:
pct = (post_metrics["rmse"] - champion_metrics["rmse"]) / champion_metrics["rmse"] * 100
print(f" Performance drop : {pct:.1f}%")
section("STEP 6: Root-Cause Analysis")
rca_result = rca.analyze(feature_drift_report)
print(f" Primary cause : {rca_result['primary_cause']}")
print(f" Action : {rca_result['action_recommended']}")
print(f" Explanation:\n {rca_result['explanation'][:300]}")
section("STEP 7: Retraining Decision")
trigger = RetrainingTrigger()
decision = trigger.should_retrain(
feature_drift_report=feature_drift_report,
performance_report=perf_drift_report,
samples_since_last_retrain=2000,
)
print(f" should_retrain : {decision['should_retrain']}")
print(f" Reasons : {decision['reasons']}")
print(f" Blocking : {decision['blocking_reasons']}")
section("STEP 8: Retrain and Promote")
retrain_df = drifted_df.copy()
retrain_df["trip_duration_min"] = y_drifted
pipeline = RetrainingPipeline()
retrain_result = pipeline.run(
training_df=retrain_df.reindex(
columns=preprocessor.feature_names() + ["trip_duration_min"], fill_value=0
),
eval_df=pre_drift_df.sample(200, random_state=42),
rca_report=rca_result,
tags={"trigger": "demo"},
)
print(f" Promoted : {retrain_result['promoted']}")
print(f" Improvement : {retrain_result['improvement_pct']:.2f}%")
print(f" Champion RMSE : {retrain_result['champion_metrics'].get('rmse', 'N/A')}")
print(f" Challenger RMSE : {retrain_result['challenger_metrics'].get('rmse', 'N/A')}")
section("STEP 9: Example API Response")
api_response = {
"drift_detected": feature_drift_report["drift_detected"],
"root_cause": feature_drift_report["drifted_features"][:3],
"performance_drop": f"{pct:.1f}%" if post_metrics else "N/A",
"action": rca_result["action_recommended"],
"rca_details": rca_result["root_causes"][:3],
"retrain_triggered": decision["should_retrain"],
"model_promoted": retrain_result["promoted"],
"new_model_improvement": f"{retrain_result['improvement_pct']:.2f}%",
}
print(json.dumps(api_response, indent=2, default=str))
print(f"\n{SEP}")
print(" DEMO COMPLETE")
print(f" uvicorn app:app --reload")
print(f" python -m streamlit run dashboard/app.py")
print(SEP)
if __name__ == "__main__":
main()