petter2025 commited on
Commit
905f518
·
verified ·
1 Parent(s): 36fa36c

Create predictive_models.py

Browse files
Files changed (1) hide show
  1. predictive_models.py +261 -0
predictive_models.py ADDED
@@ -0,0 +1,261 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import numpy as np
2
+ import pandas as pd
3
+ from typing import Dict, List, Optional, Tuple
4
+ from datetime import datetime, timedelta
5
+ import json
6
+ import warnings
7
+ from dataclasses import dataclass
8
+
9
+ @dataclass
10
+ class ForecastResult:
11
+ metric: str
12
+ predicted_value: float
13
+ confidence: float
14
+ trend: str # "increasing", "decreasing", "stable"
15
+ time_to_threshold: Optional[timedelta] = None
16
+ risk_level: str = "low" # low, medium, high, critical
17
+
18
+ class SimplePredictiveEngine:
19
+ """
20
+ Lightweight forecasting engine optimized for Hugging Face Spaces
21
+ Uses statistical methods instead of heavy ML models
22
+ """
23
+
24
+ def __init__(self, history_window: int = 50):
25
+ self.history_window = history_window
26
+ self.service_history: Dict[str, List] = {}
27
+ self.prediction_cache: Dict[str, ForecastResult] = {}
28
+
29
+ def add_telemetry(self, service: str, event_data: Dict):
30
+ """Add telemetry data to service history"""
31
+ if service not in self.service_history:
32
+ self.service_history[service] = []
33
+
34
+ # Store key metrics with timestamp
35
+ telemetry_point = {
36
+ 'timestamp': datetime.now(),
37
+ 'latency': event_data.get('latency_p99', 0),
38
+ 'error_rate': event_data.get('error_rate', 0),
39
+ 'throughput': event_data.get('throughput', 0),
40
+ 'cpu_util': event_data.get('cpu_util'),
41
+ 'memory_util': event_data.get('memory_util')
42
+ }
43
+
44
+ self.service_history[service].append(telemetry_point)
45
+
46
+ # Keep only recent history
47
+ if len(self.service_history[service]) > self.history_window:
48
+ self.service_history[service].pop(0)
49
+
50
+ def forecast_service_health(self, service: str, lookahead_minutes: int = 15) -> List[ForecastResult]:
51
+ """Forecast service health metrics"""
52
+ if service not in self.service_history or len(self.service_history[service]) < 10:
53
+ return []
54
+
55
+ history = self.service_history[service]
56
+ forecasts = []
57
+
58
+ # Forecast latency
59
+ latency_forecast = self._forecast_latency(history, lookahead_minutes)
60
+ if latency_forecast:
61
+ forecasts.append(latency_forecast)
62
+
63
+ # Forecast error rate
64
+ error_forecast = self._forecast_error_rate(history, lookahead_minutes)
65
+ if error_forecast:
66
+ forecasts.append(error_forecast)
67
+
68
+ # Forecast resource utilization
69
+ resource_forecasts = self._forecast_resources(history, lookahead_minutes)
70
+ forecasts.extend(resource_forecasts)
71
+
72
+ # Cache results
73
+ for forecast in forecasts:
74
+ cache_key = f"{service}_{forecast.metric}"
75
+ self.prediction_cache[cache_key] = forecast
76
+
77
+ return forecasts
78
+
79
+ def _forecast_latency(self, history: List, lookahead_minutes: int) -> Optional[ForecastResult]:
80
+ """Forecast latency using linear regression and trend analysis"""
81
+ try:
82
+ latencies = [point['latency'] for point in history[-20:]] # Last 20 points
83
+
84
+ if len(latencies) < 5:
85
+ return None
86
+
87
+ # Simple linear trend
88
+ x = np.arange(len(latencies))
89
+ slope, intercept = np.polyfit(x, latencies, 1)
90
+
91
+ # Predict next value
92
+ next_x = len(latencies)
93
+ predicted_latency = slope * next_x + intercept
94
+
95
+ # Calculate confidence based on data quality
96
+ residuals = latencies - (slope * x + intercept)
97
+ confidence = max(0, 1 - (np.std(residuals) / max(1, np.mean(latencies))))
98
+
99
+ # Determine trend
100
+ if slope > 5: # Increasing by more than 5ms per interval
101
+ trend = "increasing"
102
+ risk = "high" if predicted_latency > 300 else "medium"
103
+ elif slope < -2: # Decreasing
104
+ trend = "decreasing"
105
+ risk = "low"
106
+ else:
107
+ trend = "stable"
108
+ risk = "low"
109
+
110
+ # Calculate time to reach critical threshold (500ms)
111
+ time_to_critical = None
112
+ if slope > 0 and predicted_latency < 500:
113
+ time_to_critical = timedelta(
114
+ minutes=lookahead_minutes * (500 - predicted_latency) / (predicted_latency - latencies[-1])
115
+ )
116
+
117
+ return ForecastResult(
118
+ metric="latency",
119
+ predicted_value=predicted_latency,
120
+ confidence=confidence,
121
+ trend=trend,
122
+ time_to_threshold=time_to_critical,
123
+ risk_level=risk
124
+ )
125
+
126
+ except Exception as e:
127
+ print(f"Latency forecast error: {e}")
128
+ return None
129
+
130
+ def _forecast_error_rate(self, history: List, lookahead_minutes: int) -> Optional[ForecastResult]:
131
+ """Forecast error rate using exponential smoothing"""
132
+ try:
133
+ error_rates = [point['error_rate'] for point in history[-15:]]
134
+
135
+ if len(error_rates) < 5:
136
+ return None
137
+
138
+ # Exponential smoothing
139
+ alpha = 0.3 # Smoothing factor
140
+ forecast = error_rates[0]
141
+ for rate in error_rates[1:]:
142
+ forecast = alpha * rate + (1 - alpha) * forecast
143
+
144
+ predicted_rate = forecast
145
+
146
+ # Trend analysis
147
+ recent_trend = np.mean(error_rates[-3:]) - np.mean(error_rates[-6:-3])
148
+
149
+ if recent_trend > 0.02: # Increasing trend
150
+ trend = "increasing"
151
+ risk = "high" if predicted_rate > 0.1 else "medium"
152
+ elif recent_trend < -0.01: # Decreasing
153
+ trend = "decreasing"
154
+ risk = "low"
155
+ else:
156
+ trend = "stable"
157
+ risk = "low"
158
+
159
+ # Confidence based on volatility
160
+ confidence = max(0, 1 - (np.std(error_rates) / max(0.01, np.mean(error_rates))))
161
+
162
+ return ForecastResult(
163
+ metric="error_rate",
164
+ predicted_value=predicted_rate,
165
+ confidence=confidence,
166
+ trend=trend,
167
+ risk_level=risk
168
+ )
169
+
170
+ except Exception as e:
171
+ print(f"Error rate forecast error: {e}")
172
+ return None
173
+
174
+ def _forecast_resources(self, history: List, lookahead_minutes: int) -> List[ForecastResult]:
175
+ """Forecast CPU and memory utilization"""
176
+ forecasts = []
177
+
178
+ # CPU forecast
179
+ cpu_values = [point['cpu_util'] for point in history if point.get('cpu_util') is not None]
180
+ if len(cpu_values) >= 5:
181
+ try:
182
+ predicted_cpu = np.mean(cpu_values[-5:]) # Simple moving average
183
+ trend = "increasing" if cpu_values[-1] > np.mean(cpu_values[-10:-5]) else "stable"
184
+
185
+ risk = "low"
186
+ if predicted_cpu > 0.8:
187
+ risk = "critical" if predicted_cpu > 0.9 else "high"
188
+ elif predicted_cpu > 0.7:
189
+ risk = "medium"
190
+
191
+ forecasts.append(ForecastResult(
192
+ metric="cpu_util",
193
+ predicted_value=predicted_cpu,
194
+ confidence=0.7, # Moderate confidence for resources
195
+ trend=trend,
196
+ risk_level=risk
197
+ ))
198
+ except Exception as e:
199
+ print(f"CPU forecast error: {e}")
200
+
201
+ # Memory forecast (similar approach)
202
+ memory_values = [point['memory_util'] for point in history if point.get('memory_util') is not None]
203
+ if len(memory_values) >= 5:
204
+ try:
205
+ predicted_memory = np.mean(memory_values[-5:])
206
+ trend = "increasing" if memory_values[-1] > np.mean(memory_values[-10:-5]) else "stable"
207
+
208
+ risk = "low"
209
+ if predicted_memory > 0.8:
210
+ risk = "critical" if predicted_memory > 0.9 else "high"
211
+ elif predicted_memory > 0.7:
212
+ risk = "medium"
213
+
214
+ forecasts.append(ForecastResult(
215
+ metric="memory_util",
216
+ predicted_value=predicted_memory,
217
+ confidence=0.7,
218
+ trend=trend,
219
+ risk_level=risk
220
+ ))
221
+ except Exception as e:
222
+ print(f"Memory forecast error: {e}")
223
+
224
+ return forecasts
225
+
226
+ def get_predictive_insights(self, service: str) -> Dict[str, any]:
227
+ """Generate actionable insights from forecasts"""
228
+ forecasts = self.forecast_service_health(service)
229
+
230
+ critical_risks = [f for f in forecasts if f.risk_level in ["high", "critical"]]
231
+ warnings = []
232
+ recommendations = []
233
+
234
+ for forecast in critical_risks:
235
+ if forecast.metric == "latency" and forecast.risk_level in ["high", "critical"]:
236
+ warnings.append(f"📈 Latency expected to reach {forecast.predicted_value:.0f}ms")
237
+ if forecast.time_to_threshold:
238
+ minutes = int(forecast.time_to_threshold.total_seconds() / 60)
239
+ recommendations.append(f"⏰ Critical latency (~500ms) in ~{minutes} minutes")
240
+ recommendations.append("🔧 Consider scaling or optimizing dependencies")
241
+
242
+ elif forecast.metric == "error_rate" and forecast.risk_level in ["high", "critical"]:
243
+ warnings.append(f"🚨 Errors expected to reach {forecast.predicted_value*100:.1f}%")
244
+ recommendations.append("🐛 Investigate recent deployments or dependency issues")
245
+
246
+ elif forecast.metric == "cpu_util" and forecast.risk_level in ["high", "critical"]:
247
+ warnings.append(f"🔥 CPU expected at {forecast.predicted_value*100:.1f}%")
248
+ recommendations.append("⚡ Consider scaling compute resources")
249
+
250
+ elif forecast.metric == "memory_util" and forecast.risk_level in ["high", "critical"]:
251
+ warnings.append(f"💾 Memory expected at {forecast.predicted_value*100:.1f}%")
252
+ recommendations.append("🧹 Check for memory leaks or optimize usage")
253
+
254
+ return {
255
+ 'service': service,
256
+ 'forecasts': [f.__dict__ for f in forecasts],
257
+ 'warnings': warnings[:3], # Top 3 warnings
258
+ 'recommendations': list(dict.fromkeys(recommendations))[:3], # Unique top 3
259
+ 'critical_risk_count': len(critical_risks),
260
+ 'forecast_timestamp': datetime.now().isoformat()
261
+ }