Investment_Assistant / src /monitoring /monitoring_service.py
Egeekle's picture
Add MLOps, RAG, monitoring, and utility dependencies to requirements.txt
7a658e1
"""
Monitoring Service
Main service for monitoring model performance and drift
"""
from typing import Dict, Optional
from datetime import datetime
import pandas as pd
from .drift_detector import DriftDetector
from .metrics_collector import MetricsCollector
class MonitoringService:
"""
Monitoring Service combining drift detection and metrics collection
"""
def __init__(self, drift_threshold: float = 0.15, metrics_dir: str = "metrics"):
"""
Initialize Monitoring Service
Args:
drift_threshold: Drift detection threshold
metrics_dir: Directory for metrics storage
"""
self.drift_detector = DriftDetector(drift_threshold=drift_threshold)
self.metrics_collector = MetricsCollector(metrics_dir=metrics_dir)
self.alert_history = []
def check_data_quality(self, data: pd.DataFrame) -> Dict:
"""
Check data quality
Args:
data: DataFrame to check
Returns:
Data quality report
"""
report = {
"timestamp": datetime.now().isoformat(),
"total_rows": len(data),
"total_columns": len(data.columns),
"missing_values": {},
"data_types": {},
"quality_issues": []
}
# Check for missing values
for col in data.columns:
missing_count = data[col].isna().sum()
missing_pct = (missing_count / len(data)) * 100 if len(data) > 0 else 0
report["missing_values"][col] = {
"count": int(missing_count),
"percentage": float(missing_pct)
}
if missing_pct > 10:
report["quality_issues"].append(
f"Column {col} has {missing_pct:.1f}% missing values"
)
# Data types
for col in data.columns:
report["data_types"][col] = str(data[col].dtype)
# Check for duplicates
duplicate_count = data.duplicated().sum()
if duplicate_count > 0:
report["quality_issues"].append(
f"Found {duplicate_count} duplicate rows"
)
report["duplicate_rows"] = int(duplicate_count)
return report
def monitor_prediction(self, symbol: str, strategy_type: str,
prediction: Dict, market_data: pd.DataFrame):
"""
Monitor a prediction with drift detection
Args:
symbol: Asset symbol
strategy_type: Strategy type
prediction: Prediction dictionary
market_data: Current market data
"""
# Record prediction
self.metrics_collector.record_prediction(symbol, strategy_type, prediction)
# Check for drift if we have reference data
if self.drift_detector.reference_data is not None:
drift_result = self.drift_detector.detect_drift(market_data)
if drift_result.get("drift_detected"):
self._create_alert("drift", {
"symbol": symbol,
"strategy_type": strategy_type,
"drift_result": drift_result
})
def initialize_reference_baseline(self, data: pd.DataFrame):
"""
Initialize reference baseline for drift detection
Args:
data: Historical reference data
"""
self.drift_detector.update_reference(data)
def get_health_report(self) -> Dict:
"""
Get overall system health report
Returns:
Health report
"""
recent_metrics = self.metrics_collector.get_recent_metrics(hours=24)
recent_drift = self.drift_detector.get_drift_history(days=7)
# Count drift alerts
drift_alerts = [
d for d in recent_drift if d.get("drift_detected", False)
]
# Get performance stats
performance_stats = self.metrics_collector.calculate_performance_stats()
health_report = {
"timestamp": datetime.now().isoformat(),
"status": "healthy",
"metrics": {
"predictions_last_24h": len(recent_metrics),
"drift_detections_last_7d": len(recent_drift),
"drift_alerts_last_7d": len(drift_alerts),
**performance_stats
},
"alerts": len(self.alert_history)
}
# Determine status
if len(drift_alerts) > 5:
health_report["status"] = "warning"
health_report["message"] = "Multiple drift alerts detected"
elif len(drift_alerts) > 0:
health_report["status"] = "monitoring"
health_report["message"] = "Some drift detected, monitoring closely"
return health_report
def _create_alert(self, alert_type: str, details: Dict):
"""
Create an alert
Args:
alert_type: Type of alert (drift, performance, etc.)
details: Alert details
"""
alert = {
"timestamp": datetime.now().isoformat(),
"type": alert_type,
"details": details,
"severity": "medium"
}
self.alert_history.append(alert)
# Keep last 100 alerts
if len(self.alert_history) > 100:
self.alert_history = self.alert_history[-100:]