AutoML_MLOps_PipeLine / app /routers /monitoring.py
Abeshith's picture
Added Monitoring Stages
b53ee19
from fastapi import APIRouter, HTTPException
from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from monitoring.data_drift.drift_detector import DriftDetector
from monitoring.model_monitoring.prediction_logger import PredictionLogger
from monitoring.model_monitoring.performance_tracker import PerformanceTracker
from monitoring.dashboards.generate_reports import MonitoringReportGenerator
from app.utils.metrics import get_metrics
import pandas as pd
router = APIRouter(prefix="/monitoring", tags=["monitoring"])
# Initialize monitoring components
MONITORING_DIR = Path("monitoring")
prediction_logger = PredictionLogger(MONITORING_DIR / "predictions")
performance_tracker = PerformanceTracker(MONITORING_DIR / "metrics")
report_generator = MonitoringReportGenerator(MONITORING_DIR / "reports")
@router.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint"""
return get_metrics()
@router.get("/health/drift")
async def check_drift():
"""Check for data drift"""
try:
# Load reference and current data
reference_path = Path("artifacts/data_transformation/train.csv")
if not reference_path.exists():
raise HTTPException(status_code=404, detail="Reference data not found")
# Get recent predictions
predictions_df = prediction_logger.get_predictions_df()
if predictions_df.empty:
return {"status": "no_data", "message": "No recent predictions to check"}
reference_data = pd.read_csv(reference_path).sample(n=min(1000, len(predictions_df)))
# For this example, we'll skip drift detection if no input data
return {
"status": "healthy",
"drift_detected": False,
"message": "Drift detection available with sufficient data"
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/performance/summary")
async def get_performance_summary():
"""Get performance metrics summary"""
try:
summary = performance_tracker.get_metrics_summary()
if not summary:
return {"status": "no_data", "message": "No performance data available"}
return {
"status": "success",
"summary": summary,
"recent_metrics": performance_tracker.get_recent_metrics(n=5)
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/reports/daily")
async def get_daily_report():
"""Get daily monitoring report"""
try:
predictions_df = prediction_logger.get_predictions_df()
drift_report = {"drift_detected": False, "drifted_features": []}
performance_metrics = performance_tracker.get_metrics_summary()
report = report_generator.generate_daily_report(
predictions_df=predictions_df,
drift_report=drift_report,
performance_metrics=performance_metrics
)
return report
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/reports/weekly")
async def get_weekly_summary():
"""Get weekly monitoring summary"""
try:
summary = report_generator.get_weekly_summary()
return summary
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))