| import numpy as np |
| import pandas as pd |
| from typing import Dict, List, Optional, Tuple |
| from datetime import datetime, timedelta |
| import json |
| import warnings |
| from dataclasses import dataclass |
|
|
| @dataclass |
| class ForecastResult: |
| metric: str |
| predicted_value: float |
| confidence: float |
| trend: str |
| time_to_threshold: Optional[timedelta] = None |
| risk_level: str = "low" |
|
|
| class SimplePredictiveEngine: |
| """ |
| Lightweight forecasting engine optimized for Hugging Face Spaces |
| Uses statistical methods instead of heavy ML models |
| """ |
| |
| def __init__(self, history_window: int = 50): |
| self.history_window = history_window |
| self.service_history: Dict[str, List] = {} |
| self.prediction_cache: Dict[str, ForecastResult] = {} |
| |
| def add_telemetry(self, service: str, event_data: Dict): |
| """Add telemetry data to service history""" |
| if service not in self.service_history: |
| self.service_history[service] = [] |
| |
| |
| telemetry_point = { |
| 'timestamp': datetime.now(), |
| 'latency': event_data.get('latency_p99', 0), |
| 'error_rate': event_data.get('error_rate', 0), |
| 'throughput': event_data.get('throughput', 0), |
| 'cpu_util': event_data.get('cpu_util'), |
| 'memory_util': event_data.get('memory_util') |
| } |
| |
| self.service_history[service].append(telemetry_point) |
| |
| |
| if len(self.service_history[service]) > self.history_window: |
| self.service_history[service].pop(0) |
| |
| def forecast_service_health(self, service: str, lookahead_minutes: int = 15) -> List[ForecastResult]: |
| """Forecast service health metrics""" |
| if service not in self.service_history or len(self.service_history[service]) < 10: |
| return [] |
| |
| history = self.service_history[service] |
| forecasts = [] |
| |
| |
| latency_forecast = self._forecast_latency(history, lookahead_minutes) |
| if latency_forecast: |
| forecasts.append(latency_forecast) |
| |
| |
| error_forecast = self._forecast_error_rate(history, lookahead_minutes) |
| if error_forecast: |
| forecasts.append(error_forecast) |
| |
| |
| resource_forecasts = self._forecast_resources(history, lookahead_minutes) |
| forecasts.extend(resource_forecasts) |
| |
| |
| for forecast in forecasts: |
| cache_key = f"{service}_{forecast.metric}" |
| self.prediction_cache[cache_key] = forecast |
| |
| return forecasts |
| |
| def _forecast_latency(self, history: List, lookahead_minutes: int) -> Optional[ForecastResult]: |
| """Forecast latency using linear regression and trend analysis""" |
| try: |
| latencies = [point['latency'] for point in history[-20:]] |
| |
| if len(latencies) < 5: |
| return None |
| |
| |
| x = np.arange(len(latencies)) |
| slope, intercept = np.polyfit(x, latencies, 1) |
| |
| |
| next_x = len(latencies) |
| predicted_latency = slope * next_x + intercept |
| |
| |
| residuals = latencies - (slope * x + intercept) |
| confidence = max(0, 1 - (np.std(residuals) / max(1, np.mean(latencies)))) |
| |
| |
| if slope > 5: |
| trend = "increasing" |
| risk = "high" if predicted_latency > 300 else "medium" |
| elif slope < -2: |
| trend = "decreasing" |
| risk = "low" |
| else: |
| trend = "stable" |
| risk = "low" |
| |
| |
| time_to_critical = None |
| if slope > 0 and predicted_latency < 500: |
| time_to_critical = timedelta( |
| minutes=lookahead_minutes * (500 - predicted_latency) / (predicted_latency - latencies[-1]) |
| ) |
| |
| return ForecastResult( |
| metric="latency", |
| predicted_value=predicted_latency, |
| confidence=confidence, |
| trend=trend, |
| time_to_threshold=time_to_critical, |
| risk_level=risk |
| ) |
| |
| except Exception as e: |
| print(f"Latency forecast error: {e}") |
| return None |
| |
| def _forecast_error_rate(self, history: List, lookahead_minutes: int) -> Optional[ForecastResult]: |
| """Forecast error rate using exponential smoothing""" |
| try: |
| error_rates = [point['error_rate'] for point in history[-15:]] |
| |
| if len(error_rates) < 5: |
| return None |
| |
| |
| alpha = 0.3 |
| forecast = error_rates[0] |
| for rate in error_rates[1:]: |
| forecast = alpha * rate + (1 - alpha) * forecast |
| |
| predicted_rate = forecast |
| |
| |
| recent_trend = np.mean(error_rates[-3:]) - np.mean(error_rates[-6:-3]) |
| |
| if recent_trend > 0.02: |
| trend = "increasing" |
| risk = "high" if predicted_rate > 0.1 else "medium" |
| elif recent_trend < -0.01: |
| trend = "decreasing" |
| risk = "low" |
| else: |
| trend = "stable" |
| risk = "low" |
| |
| |
| confidence = max(0, 1 - (np.std(error_rates) / max(0.01, np.mean(error_rates)))) |
| |
| return ForecastResult( |
| metric="error_rate", |
| predicted_value=predicted_rate, |
| confidence=confidence, |
| trend=trend, |
| risk_level=risk |
| ) |
| |
| except Exception as e: |
| print(f"Error rate forecast error: {e}") |
| return None |
| |
| def _forecast_resources(self, history: List, lookahead_minutes: int) -> List[ForecastResult]: |
| """Forecast CPU and memory utilization""" |
| forecasts = [] |
| |
| |
| cpu_values = [point['cpu_util'] for point in history if point.get('cpu_util') is not None] |
| if len(cpu_values) >= 5: |
| try: |
| predicted_cpu = np.mean(cpu_values[-5:]) |
| trend = "increasing" if cpu_values[-1] > np.mean(cpu_values[-10:-5]) else "stable" |
| |
| risk = "low" |
| if predicted_cpu > 0.8: |
| risk = "critical" if predicted_cpu > 0.9 else "high" |
| elif predicted_cpu > 0.7: |
| risk = "medium" |
| |
| forecasts.append(ForecastResult( |
| metric="cpu_util", |
| predicted_value=predicted_cpu, |
| confidence=0.7, |
| trend=trend, |
| risk_level=risk |
| )) |
| except Exception as e: |
| print(f"CPU forecast error: {e}") |
| |
| |
| memory_values = [point['memory_util'] for point in history if point.get('memory_util') is not None] |
| if len(memory_values) >= 5: |
| try: |
| predicted_memory = np.mean(memory_values[-5:]) |
| trend = "increasing" if memory_values[-1] > np.mean(memory_values[-10:-5]) else "stable" |
| |
| risk = "low" |
| if predicted_memory > 0.8: |
| risk = "critical" if predicted_memory > 0.9 else "high" |
| elif predicted_memory > 0.7: |
| risk = "medium" |
| |
| forecasts.append(ForecastResult( |
| metric="memory_util", |
| predicted_value=predicted_memory, |
| confidence=0.7, |
| trend=trend, |
| risk_level=risk |
| )) |
| except Exception as e: |
| print(f"Memory forecast error: {e}") |
| |
| return forecasts |
| |
| def get_predictive_insights(self, service: str) -> Dict[str, any]: |
| """Generate actionable insights from forecasts""" |
| forecasts = self.forecast_service_health(service) |
| |
| critical_risks = [f for f in forecasts if f.risk_level in ["high", "critical"]] |
| warnings = [] |
| recommendations = [] |
| |
| for forecast in critical_risks: |
| if forecast.metric == "latency" and forecast.risk_level in ["high", "critical"]: |
| warnings.append(f"📈 Latency expected to reach {forecast.predicted_value:.0f}ms") |
| if forecast.time_to_threshold: |
| minutes = int(forecast.time_to_threshold.total_seconds() / 60) |
| recommendations.append(f"⏰ Critical latency (~500ms) in ~{minutes} minutes") |
| recommendations.append("🔧 Consider scaling or optimizing dependencies") |
| |
| elif forecast.metric == "error_rate" and forecast.risk_level in ["high", "critical"]: |
| warnings.append(f"🚨 Errors expected to reach {forecast.predicted_value*100:.1f}%") |
| recommendations.append("🐛 Investigate recent deployments or dependency issues") |
| |
| elif forecast.metric == "cpu_util" and forecast.risk_level in ["high", "critical"]: |
| warnings.append(f"🔥 CPU expected at {forecast.predicted_value*100:.1f}%") |
| recommendations.append("⚡ Consider scaling compute resources") |
| |
| elif forecast.metric == "memory_util" and forecast.risk_level in ["high", "critical"]: |
| warnings.append(f"💾 Memory expected at {forecast.predicted_value*100:.1f}%") |
| recommendations.append("🧹 Check for memory leaks or optimize usage") |
| |
| return { |
| 'service': service, |
| 'forecasts': [f.__dict__ for f in forecasts], |
| 'warnings': warnings[:3], |
| 'recommendations': list(dict.fromkeys(recommendations))[:3], |
| 'critical_risk_count': len(critical_risks), |
| 'forecast_timestamp': datetime.now().isoformat() |
| } |