Spaces:
Running
Running
| from fastapi import APIRouter, HTTPException | |
| from pathlib import Path | |
| import sys | |
| sys.path.insert(0, str(Path(__file__).parent.parent.parent)) | |
| from monitoring.data_drift.drift_detector import DriftDetector | |
| from monitoring.model_monitoring.prediction_logger import PredictionLogger | |
| from monitoring.model_monitoring.performance_tracker import PerformanceTracker | |
| from monitoring.dashboards.generate_reports import MonitoringReportGenerator | |
| from app.utils.metrics import get_metrics | |
| import pandas as pd | |
| router = APIRouter(prefix="/monitoring", tags=["monitoring"]) | |
| # Initialize monitoring components | |
| MONITORING_DIR = Path("monitoring") | |
| prediction_logger = PredictionLogger(MONITORING_DIR / "predictions") | |
| performance_tracker = PerformanceTracker(MONITORING_DIR / "metrics") | |
| report_generator = MonitoringReportGenerator(MONITORING_DIR / "reports") | |
| async def metrics(): | |
| """Prometheus metrics endpoint""" | |
| return get_metrics() | |
| async def check_drift(): | |
| """Check for data drift""" | |
| try: | |
| # Load reference and current data | |
| reference_path = Path("artifacts/data_transformation/train.csv") | |
| if not reference_path.exists(): | |
| raise HTTPException(status_code=404, detail="Reference data not found") | |
| # Get recent predictions | |
| predictions_df = prediction_logger.get_predictions_df() | |
| if predictions_df.empty: | |
| return {"status": "no_data", "message": "No recent predictions to check"} | |
| reference_data = pd.read_csv(reference_path).sample(n=min(1000, len(predictions_df))) | |
| # For this example, we'll skip drift detection if no input data | |
| return { | |
| "status": "healthy", | |
| "drift_detected": False, | |
| "message": "Drift detection available with sufficient data" | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_performance_summary(): | |
| """Get performance metrics summary""" | |
| try: | |
| summary = performance_tracker.get_metrics_summary() | |
| if not summary: | |
| return {"status": "no_data", "message": "No performance data available"} | |
| return { | |
| "status": "success", | |
| "summary": summary, | |
| "recent_metrics": performance_tracker.get_recent_metrics(n=5) | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_daily_report(): | |
| """Get daily monitoring report""" | |
| try: | |
| predictions_df = prediction_logger.get_predictions_df() | |
| drift_report = {"drift_detected": False, "drifted_features": []} | |
| performance_metrics = performance_tracker.get_metrics_summary() | |
| report = report_generator.generate_daily_report( | |
| predictions_df=predictions_df, | |
| drift_report=drift_report, | |
| performance_metrics=performance_metrics | |
| ) | |
| return report | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_weekly_summary(): | |
| """Get weekly monitoring summary""" | |
| try: | |
| summary = report_generator.get_weekly_summary() | |
| return summary | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |