import numpy as np import pandas as pd from typing import Dict, List, Optional, Tuple from datetime import datetime, timedelta import json import warnings from dataclasses import dataclass @dataclass class ForecastResult: metric: str predicted_value: float confidence: float trend: str # "increasing", "decreasing", "stable" time_to_threshold: Optional[timedelta] = None risk_level: str = "low" # low, medium, high, critical class SimplePredictiveEngine: """ Lightweight forecasting engine optimized for Hugging Face Spaces Uses statistical methods instead of heavy ML models """ def __init__(self, history_window: int = 50): self.history_window = history_window self.service_history: Dict[str, List] = {} self.prediction_cache: Dict[str, ForecastResult] = {} def add_telemetry(self, service: str, event_data: Dict): """Add telemetry data to service history""" if service not in self.service_history: self.service_history[service] = [] # Store key metrics with timestamp telemetry_point = { 'timestamp': datetime.now(), 'latency': event_data.get('latency_p99', 0), 'error_rate': event_data.get('error_rate', 0), 'throughput': event_data.get('throughput', 0), 'cpu_util': event_data.get('cpu_util'), 'memory_util': event_data.get('memory_util') } self.service_history[service].append(telemetry_point) # Keep only recent history if len(self.service_history[service]) > self.history_window: self.service_history[service].pop(0) def forecast_service_health(self, service: str, lookahead_minutes: int = 15) -> List[ForecastResult]: """Forecast service health metrics""" if service not in self.service_history or len(self.service_history[service]) < 10: return [] history = self.service_history[service] forecasts = [] # Forecast latency latency_forecast = self._forecast_latency(history, lookahead_minutes) if latency_forecast: forecasts.append(latency_forecast) # Forecast error rate error_forecast = self._forecast_error_rate(history, lookahead_minutes) if error_forecast: forecasts.append(error_forecast) # Forecast resource utilization resource_forecasts = self._forecast_resources(history, lookahead_minutes) forecasts.extend(resource_forecasts) # Cache results for forecast in forecasts: cache_key = f"{service}_{forecast.metric}" self.prediction_cache[cache_key] = forecast return forecasts def _forecast_latency(self, history: List, lookahead_minutes: int) -> Optional[ForecastResult]: """Forecast latency using linear regression and trend analysis""" try: latencies = [point['latency'] for point in history[-20:]] # Last 20 points if len(latencies) < 5: return None # Simple linear trend x = np.arange(len(latencies)) slope, intercept = np.polyfit(x, latencies, 1) # Predict next value next_x = len(latencies) predicted_latency = slope * next_x + intercept # Calculate confidence based on data quality residuals = latencies - (slope * x + intercept) confidence = max(0, 1 - (np.std(residuals) / max(1, np.mean(latencies)))) # Determine trend if slope > 5: # Increasing by more than 5ms per interval trend = "increasing" risk = "high" if predicted_latency > 300 else "medium" elif slope < -2: # Decreasing trend = "decreasing" risk = "low" else: trend = "stable" risk = "low" # Calculate time to reach critical threshold (500ms) time_to_critical = None if slope > 0 and predicted_latency < 500: time_to_critical = timedelta( minutes=lookahead_minutes * (500 - predicted_latency) / (predicted_latency - latencies[-1]) ) return ForecastResult( metric="latency", predicted_value=predicted_latency, confidence=confidence, trend=trend, time_to_threshold=time_to_critical, risk_level=risk ) except Exception as e: print(f"Latency forecast error: {e}") return None def _forecast_error_rate(self, history: List, lookahead_minutes: int) -> Optional[ForecastResult]: """Forecast error rate using exponential smoothing""" try: error_rates = [point['error_rate'] for point in history[-15:]] if len(error_rates) < 5: return None # Exponential smoothing alpha = 0.3 # Smoothing factor forecast = error_rates[0] for rate in error_rates[1:]: forecast = alpha * rate + (1 - alpha) * forecast predicted_rate = forecast # Trend analysis recent_trend = np.mean(error_rates[-3:]) - np.mean(error_rates[-6:-3]) if recent_trend > 0.02: # Increasing trend trend = "increasing" risk = "high" if predicted_rate > 0.1 else "medium" elif recent_trend < -0.01: # Decreasing trend = "decreasing" risk = "low" else: trend = "stable" risk = "low" # Confidence based on volatility confidence = max(0, 1 - (np.std(error_rates) / max(0.01, np.mean(error_rates)))) return ForecastResult( metric="error_rate", predicted_value=predicted_rate, confidence=confidence, trend=trend, risk_level=risk ) except Exception as e: print(f"Error rate forecast error: {e}") return None def _forecast_resources(self, history: List, lookahead_minutes: int) -> List[ForecastResult]: """Forecast CPU and memory utilization""" forecasts = [] # CPU forecast cpu_values = [point['cpu_util'] for point in history if point.get('cpu_util') is not None] if len(cpu_values) >= 5: try: predicted_cpu = np.mean(cpu_values[-5:]) # Simple moving average trend = "increasing" if cpu_values[-1] > np.mean(cpu_values[-10:-5]) else "stable" risk = "low" if predicted_cpu > 0.8: risk = "critical" if predicted_cpu > 0.9 else "high" elif predicted_cpu > 0.7: risk = "medium" forecasts.append(ForecastResult( metric="cpu_util", predicted_value=predicted_cpu, confidence=0.7, # Moderate confidence for resources trend=trend, risk_level=risk )) except Exception as e: print(f"CPU forecast error: {e}") # Memory forecast (similar approach) memory_values = [point['memory_util'] for point in history if point.get('memory_util') is not None] if len(memory_values) >= 5: try: predicted_memory = np.mean(memory_values[-5:]) trend = "increasing" if memory_values[-1] > np.mean(memory_values[-10:-5]) else "stable" risk = "low" if predicted_memory > 0.8: risk = "critical" if predicted_memory > 0.9 else "high" elif predicted_memory > 0.7: risk = "medium" forecasts.append(ForecastResult( metric="memory_util", predicted_value=predicted_memory, confidence=0.7, trend=trend, risk_level=risk )) except Exception as e: print(f"Memory forecast error: {e}") return forecasts def get_predictive_insights(self, service: str) -> Dict[str, any]: """Generate actionable insights from forecasts""" forecasts = self.forecast_service_health(service) critical_risks = [f for f in forecasts if f.risk_level in ["high", "critical"]] warnings = [] recommendations = [] for forecast in critical_risks: if forecast.metric == "latency" and forecast.risk_level in ["high", "critical"]: warnings.append(f"๐Ÿ“ˆ Latency expected to reach {forecast.predicted_value:.0f}ms") if forecast.time_to_threshold: minutes = int(forecast.time_to_threshold.total_seconds() / 60) recommendations.append(f"โฐ Critical latency (~500ms) in ~{minutes} minutes") recommendations.append("๐Ÿ”ง Consider scaling or optimizing dependencies") elif forecast.metric == "error_rate" and forecast.risk_level in ["high", "critical"]: warnings.append(f"๐Ÿšจ Errors expected to reach {forecast.predicted_value*100:.1f}%") recommendations.append("๐Ÿ› Investigate recent deployments or dependency issues") elif forecast.metric == "cpu_util" and forecast.risk_level in ["high", "critical"]: warnings.append(f"๐Ÿ”ฅ CPU expected at {forecast.predicted_value*100:.1f}%") recommendations.append("โšก Consider scaling compute resources") elif forecast.metric == "memory_util" and forecast.risk_level in ["high", "critical"]: warnings.append(f"๐Ÿ’พ Memory expected at {forecast.predicted_value*100:.1f}%") recommendations.append("๐Ÿงน Check for memory leaks or optimize usage") return { 'service': service, 'forecasts': [f.__dict__ for f in forecasts], 'warnings': warnings[:3], # Top 3 warnings 'recommendations': list(dict.fromkeys(recommendations))[:3], # Unique top 3 'critical_risk_count': len(critical_risks), 'forecast_timestamp': datetime.now().isoformat() }