Spaces:
Build error
Build error
| """ | |
| Monitoring Service | |
| Main service for monitoring model performance and drift | |
| """ | |
| from typing import Dict, Optional | |
| from datetime import datetime | |
| import pandas as pd | |
| from .drift_detector import DriftDetector | |
| from .metrics_collector import MetricsCollector | |
| class MonitoringService: | |
| """ | |
| Monitoring Service combining drift detection and metrics collection | |
| """ | |
| def __init__(self, drift_threshold: float = 0.15, metrics_dir: str = "metrics"): | |
| """ | |
| Initialize Monitoring Service | |
| Args: | |
| drift_threshold: Drift detection threshold | |
| metrics_dir: Directory for metrics storage | |
| """ | |
| self.drift_detector = DriftDetector(drift_threshold=drift_threshold) | |
| self.metrics_collector = MetricsCollector(metrics_dir=metrics_dir) | |
| self.alert_history = [] | |
| def check_data_quality(self, data: pd.DataFrame) -> Dict: | |
| """ | |
| Check data quality | |
| Args: | |
| data: DataFrame to check | |
| Returns: | |
| Data quality report | |
| """ | |
| report = { | |
| "timestamp": datetime.now().isoformat(), | |
| "total_rows": len(data), | |
| "total_columns": len(data.columns), | |
| "missing_values": {}, | |
| "data_types": {}, | |
| "quality_issues": [] | |
| } | |
| # Check for missing values | |
| for col in data.columns: | |
| missing_count = data[col].isna().sum() | |
| missing_pct = (missing_count / len(data)) * 100 if len(data) > 0 else 0 | |
| report["missing_values"][col] = { | |
| "count": int(missing_count), | |
| "percentage": float(missing_pct) | |
| } | |
| if missing_pct > 10: | |
| report["quality_issues"].append( | |
| f"Column {col} has {missing_pct:.1f}% missing values" | |
| ) | |
| # Data types | |
| for col in data.columns: | |
| report["data_types"][col] = str(data[col].dtype) | |
| # Check for duplicates | |
| duplicate_count = data.duplicated().sum() | |
| if duplicate_count > 0: | |
| report["quality_issues"].append( | |
| f"Found {duplicate_count} duplicate rows" | |
| ) | |
| report["duplicate_rows"] = int(duplicate_count) | |
| return report | |
| def monitor_prediction(self, symbol: str, strategy_type: str, | |
| prediction: Dict, market_data: pd.DataFrame): | |
| """ | |
| Monitor a prediction with drift detection | |
| Args: | |
| symbol: Asset symbol | |
| strategy_type: Strategy type | |
| prediction: Prediction dictionary | |
| market_data: Current market data | |
| """ | |
| # Record prediction | |
| self.metrics_collector.record_prediction(symbol, strategy_type, prediction) | |
| # Check for drift if we have reference data | |
| if self.drift_detector.reference_data is not None: | |
| drift_result = self.drift_detector.detect_drift(market_data) | |
| if drift_result.get("drift_detected"): | |
| self._create_alert("drift", { | |
| "symbol": symbol, | |
| "strategy_type": strategy_type, | |
| "drift_result": drift_result | |
| }) | |
| def initialize_reference_baseline(self, data: pd.DataFrame): | |
| """ | |
| Initialize reference baseline for drift detection | |
| Args: | |
| data: Historical reference data | |
| """ | |
| self.drift_detector.update_reference(data) | |
| def get_health_report(self) -> Dict: | |
| """ | |
| Get overall system health report | |
| Returns: | |
| Health report | |
| """ | |
| recent_metrics = self.metrics_collector.get_recent_metrics(hours=24) | |
| recent_drift = self.drift_detector.get_drift_history(days=7) | |
| # Count drift alerts | |
| drift_alerts = [ | |
| d for d in recent_drift if d.get("drift_detected", False) | |
| ] | |
| # Get performance stats | |
| performance_stats = self.metrics_collector.calculate_performance_stats() | |
| health_report = { | |
| "timestamp": datetime.now().isoformat(), | |
| "status": "healthy", | |
| "metrics": { | |
| "predictions_last_24h": len(recent_metrics), | |
| "drift_detections_last_7d": len(recent_drift), | |
| "drift_alerts_last_7d": len(drift_alerts), | |
| **performance_stats | |
| }, | |
| "alerts": len(self.alert_history) | |
| } | |
| # Determine status | |
| if len(drift_alerts) > 5: | |
| health_report["status"] = "warning" | |
| health_report["message"] = "Multiple drift alerts detected" | |
| elif len(drift_alerts) > 0: | |
| health_report["status"] = "monitoring" | |
| health_report["message"] = "Some drift detected, monitoring closely" | |
| return health_report | |
| def _create_alert(self, alert_type: str, details: Dict): | |
| """ | |
| Create an alert | |
| Args: | |
| alert_type: Type of alert (drift, performance, etc.) | |
| details: Alert details | |
| """ | |
| alert = { | |
| "timestamp": datetime.now().isoformat(), | |
| "type": alert_type, | |
| "details": details, | |
| "severity": "medium" | |
| } | |
| self.alert_history.append(alert) | |
| # Keep last 100 alerts | |
| if len(self.alert_history) > 100: | |
| self.alert_history = self.alert_history[-100:] | |