""" Monitoring Service Main service for monitoring model performance and drift """ from typing import Dict, Optional from datetime import datetime import pandas as pd from .drift_detector import DriftDetector from .metrics_collector import MetricsCollector class MonitoringService: """ Monitoring Service combining drift detection and metrics collection """ def __init__(self, drift_threshold: float = 0.15, metrics_dir: str = "metrics"): """ Initialize Monitoring Service Args: drift_threshold: Drift detection threshold metrics_dir: Directory for metrics storage """ self.drift_detector = DriftDetector(drift_threshold=drift_threshold) self.metrics_collector = MetricsCollector(metrics_dir=metrics_dir) self.alert_history = [] def check_data_quality(self, data: pd.DataFrame) -> Dict: """ Check data quality Args: data: DataFrame to check Returns: Data quality report """ report = { "timestamp": datetime.now().isoformat(), "total_rows": len(data), "total_columns": len(data.columns), "missing_values": {}, "data_types": {}, "quality_issues": [] } # Check for missing values for col in data.columns: missing_count = data[col].isna().sum() missing_pct = (missing_count / len(data)) * 100 if len(data) > 0 else 0 report["missing_values"][col] = { "count": int(missing_count), "percentage": float(missing_pct) } if missing_pct > 10: report["quality_issues"].append( f"Column {col} has {missing_pct:.1f}% missing values" ) # Data types for col in data.columns: report["data_types"][col] = str(data[col].dtype) # Check for duplicates duplicate_count = data.duplicated().sum() if duplicate_count > 0: report["quality_issues"].append( f"Found {duplicate_count} duplicate rows" ) report["duplicate_rows"] = int(duplicate_count) return report def monitor_prediction(self, symbol: str, strategy_type: str, prediction: Dict, market_data: pd.DataFrame): """ Monitor a prediction with drift detection Args: symbol: Asset symbol strategy_type: Strategy type prediction: Prediction dictionary market_data: Current market data """ # Record prediction self.metrics_collector.record_prediction(symbol, strategy_type, prediction) # Check for drift if we have reference data if self.drift_detector.reference_data is not None: drift_result = self.drift_detector.detect_drift(market_data) if drift_result.get("drift_detected"): self._create_alert("drift", { "symbol": symbol, "strategy_type": strategy_type, "drift_result": drift_result }) def initialize_reference_baseline(self, data: pd.DataFrame): """ Initialize reference baseline for drift detection Args: data: Historical reference data """ self.drift_detector.update_reference(data) def get_health_report(self) -> Dict: """ Get overall system health report Returns: Health report """ recent_metrics = self.metrics_collector.get_recent_metrics(hours=24) recent_drift = self.drift_detector.get_drift_history(days=7) # Count drift alerts drift_alerts = [ d for d in recent_drift if d.get("drift_detected", False) ] # Get performance stats performance_stats = self.metrics_collector.calculate_performance_stats() health_report = { "timestamp": datetime.now().isoformat(), "status": "healthy", "metrics": { "predictions_last_24h": len(recent_metrics), "drift_detections_last_7d": len(recent_drift), "drift_alerts_last_7d": len(drift_alerts), **performance_stats }, "alerts": len(self.alert_history) } # Determine status if len(drift_alerts) > 5: health_report["status"] = "warning" health_report["message"] = "Multiple drift alerts detected" elif len(drift_alerts) > 0: health_report["status"] = "monitoring" health_report["message"] = "Some drift detected, monitoring closely" return health_report def _create_alert(self, alert_type: str, details: Dict): """ Create an alert Args: alert_type: Type of alert (drift, performance, etc.) details: Alert details """ alert = { "timestamp": datetime.now().isoformat(), "type": alert_type, "details": details, "severity": "medium" } self.alert_history.append(alert) # Keep last 100 alerts if len(self.alert_history) > 100: self.alert_history = self.alert_history[-100:]