| """ |
| Machine Learning Models for Advanced Anomaly Detection |
| Includes ensemble methods, causal inference, and adaptive thresholds |
| """ |
|
|
| import numpy as np |
| from typing import Tuple, Optional, Dict, List |
| import logging |
| import datetime |
|
|
| |
| try: |
| from sklearn.ensemble import IsolationForest |
| from sklearn.preprocessing import StandardScaler |
| SKLEARN_AVAILABLE = True |
| except ImportError: |
| SKLEARN_AVAILABLE = False |
| logging.warning("scikit-learn not available. Using fallback detection only.") |
|
|
| try: |
| import torch |
| import torch.nn as nn |
| PYTORCH_AVAILABLE = True |
| except ImportError: |
| PYTORCH_AVAILABLE = False |
| logging.warning("PyTorch not available. LSTM detector disabled.") |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
|
|
| if PYTORCH_AVAILABLE: |
| class LSTMAnomalyDetector(nn.Module): |
| """ |
| LSTM-based anomaly detector for time-series analysis. |
| Uses sequence-to-sequence learning to predict next values |
| and flag anomalies based on prediction error. |
| """ |
| |
| def __init__(self, input_size: int = 5, hidden_size: int = 64, num_layers: int = 2): |
| super(LSTMAnomalyDetector, self).__init__() |
| |
| self.hidden_size = hidden_size |
| self.num_layers = num_layers |
| |
| |
| self.lstm = nn.LSTM( |
| input_size=input_size, |
| hidden_size=hidden_size, |
| num_layers=num_layers, |
| batch_first=True, |
| dropout=0.2 |
| ) |
| |
| |
| self.fc1 = nn.Linear(hidden_size, 32) |
| self.fc2 = nn.Linear(32, input_size) |
| self.relu = nn.ReLU() |
| |
| def forward(self, x): |
| """Forward pass through the network""" |
| |
| lstm_out, _ = self.lstm(x) |
| |
| |
| last_output = lstm_out[:, -1, :] |
| |
| |
| out = self.relu(self.fc1(last_output)) |
| out = self.fc2(out) |
| |
| return out |
| else: |
| |
| class LSTMAnomalyDetector: |
| def __init__(self, *args, **kwargs): |
| logger.warning("LSTM detector not available (PyTorch not installed)") |
|
|
| |
|
|
| class EnsembleAnomalyDetector: |
| """ |
| Ensemble of multiple anomaly detection algorithms for robust detection. |
| Gracefully degrades if ML libraries aren't available. |
| """ |
| |
| def __init__(self): |
| self.isolation_forest = None |
| self.lstm_model = None |
| self.scaler = None |
| self.is_trained = False |
| self.training_data = [] |
| |
| |
| if SKLEARN_AVAILABLE: |
| try: |
| self.isolation_forest = IsolationForest( |
| contamination=0.1, |
| random_state=42, |
| n_estimators=100 |
| ) |
| self.scaler = StandardScaler() |
| logger.info("Initialized Isolation Forest detector") |
| except Exception as e: |
| logger.error(f"Failed to initialize Isolation Forest: {e}") |
| |
| if PYTORCH_AVAILABLE: |
| try: |
| self.lstm_model = LSTMAnomalyDetector() |
| logger.info("Initialized LSTM detector") |
| except Exception as e: |
| logger.error(f"Failed to initialize LSTM: {e}") |
| |
| logger.info(f"EnsembleAnomalyDetector initialized (sklearn={SKLEARN_AVAILABLE}, pytorch={PYTORCH_AVAILABLE})") |
| |
| def add_sample(self, features: np.ndarray) -> None: |
| """ |
| Add training sample |
| |
| Args: |
| features: numpy array of [latency, error_rate, cpu, memory, throughput] |
| """ |
| if not isinstance(features, np.ndarray): |
| features = np.array(features) |
| |
| self.training_data.append(features) |
| |
| |
| if len(self.training_data) >= 100 and not self.is_trained: |
| self.train() |
| |
| def train(self) -> None: |
| """Train all available models in the ensemble""" |
| if len(self.training_data) < 50: |
| logger.warning(f"Insufficient data for training: {len(self.training_data)} samples (need 50+)") |
| return |
| |
| try: |
| X = np.array(self.training_data) |
| |
| |
| if self.isolation_forest is not None and SKLEARN_AVAILABLE: |
| self.isolation_forest.fit(X) |
| logger.info(f"Trained Isolation Forest on {len(self.training_data)} samples") |
| |
| |
| if self.lstm_model is not None and PYTORCH_AVAILABLE: |
| |
| |
| if self.scaler is not None: |
| X_scaled = self.scaler.fit_transform(X) |
| logger.info("LSTM training not yet implemented (using fallback)") |
| |
| self.is_trained = True |
| logger.info(f"✅ Ensemble trained on {len(self.training_data)} samples") |
| |
| except Exception as e: |
| logger.error(f"Training failed: {e}", exc_info=True) |
| self.is_trained = False |
| |
| def predict_anomaly(self, features: np.ndarray) -> Tuple[bool, float, Dict]: |
| """ |
| Predict if features represent an anomaly |
| |
| Args: |
| features: numpy array of [latency, error_rate, cpu, memory, throughput] |
| |
| Returns: |
| Tuple of (is_anomaly: bool, confidence: float, explanation: dict) |
| """ |
| if not isinstance(features, np.ndarray): |
| features = np.array(features) |
| |
| |
| if not self.is_trained or not SKLEARN_AVAILABLE: |
| return self._fallback_detection(features) |
| |
| try: |
| |
| if_score = self.isolation_forest.score_samples(features.reshape(1, -1))[0] |
| if_anomaly = self.isolation_forest.predict(features.reshape(1, -1))[0] == -1 |
| |
| |
| lstm_score = 0.5 |
| |
| |
| stat_score = self._statistical_tests(features) |
| |
| |
| confidence = np.mean([ |
| abs(if_score), |
| lstm_score, |
| stat_score |
| ]) |
| |
| is_anomaly = if_anomaly or confidence > 0.7 |
| |
| explanation = { |
| 'isolation_forest_score': float(if_score), |
| 'isolation_forest_anomaly': bool(if_anomaly), |
| 'lstm_reconstruction_error': float(lstm_score), |
| 'statistical_score': float(stat_score), |
| 'ensemble_confidence': float(confidence), |
| 'primary_detector': 'isolation_forest' if if_anomaly else 'ensemble', |
| 'models_used': ['isolation_forest', 'statistical'] |
| } |
| |
| return is_anomaly, confidence, explanation |
| |
| except Exception as e: |
| logger.error(f"Prediction failed, using fallback: {e}", exc_info=True) |
| return self._fallback_detection(features) |
| |
| def _statistical_tests(self, features: np.ndarray) -> float: |
| """ |
| Perform statistical tests for anomaly detection using z-scores |
| |
| Args: |
| features: Current feature values |
| |
| Returns: |
| Anomaly probability (0-1) |
| """ |
| if len(self.training_data) < 10: |
| return 0.5 |
| |
| try: |
| |
| historical = np.array(self.training_data) |
| mean = np.mean(historical, axis=0) |
| std = np.std(historical, axis=0) |
| |
| |
| z_scores = np.abs((features - mean) / (std + 1e-8)) |
| max_z_score = np.max(z_scores) |
| |
| |
| |
| anomaly_prob = min(1.0, max_z_score / 3.0) |
| |
| return anomaly_prob |
| |
| except Exception as e: |
| logger.error(f"Statistical test failed: {e}") |
| return 0.5 |
| |
| def _fallback_detection(self, features: np.ndarray) -> Tuple[bool, float, Dict]: |
| """ |
| Fallback detection when ML models aren't trained or available |
| Uses simple threshold-based detection |
| |
| Args: |
| features: [latency, error_rate, cpu, memory, throughput] |
| |
| Returns: |
| Tuple of (is_anomaly, confidence, explanation) |
| """ |
| latency_threshold = 150 |
| error_rate_threshold = 0.05 |
| cpu_threshold = 0.8 |
| memory_threshold = 0.8 |
| |
| latency = features[0] if len(features) > 0 else 0 |
| error_rate = features[1] if len(features) > 1 else 0 |
| cpu = features[2] if len(features) > 2 else 0 |
| memory = features[3] if len(features) > 3 else 0 |
| |
| is_anomaly = ( |
| latency > latency_threshold or |
| error_rate > error_rate_threshold or |
| cpu > cpu_threshold or |
| memory > memory_threshold |
| ) |
| |
| confidence = 0.5 if is_anomaly else 0.1 |
| |
| explanation = { |
| 'method': 'fallback_threshold', |
| 'latency_exceeded': latency > latency_threshold, |
| 'error_rate_exceeded': error_rate > error_rate_threshold, |
| 'cpu_exceeded': cpu > cpu_threshold, |
| 'memory_exceeded': memory > memory_threshold |
| } |
| |
| return is_anomaly, confidence, explanation |
|
|
| |
|
|
| class CausalInferenceEngine: |
| """ |
| Bayesian causal inference for root cause analysis. |
| Uses probabilistic graphical models to infer causality. |
| """ |
| |
| def __init__(self): |
| |
| self.causal_graph = { |
| 'database_latency': ['api_latency', 'error_rate'], |
| 'network_issues': ['api_latency', 'timeout_errors'], |
| 'memory_leak': ['memory_util', 'gc_time', 'response_time'], |
| 'cpu_saturation': ['cpu_util', 'queue_length', 'latency'], |
| 'traffic_spike': ['throughput', 'latency', 'error_rate'] |
| } |
| |
| |
| self.prior_probabilities = { |
| 'database_latency': 0.3, |
| 'network_issues': 0.2, |
| 'memory_leak': 0.15, |
| 'cpu_saturation': 0.2, |
| 'traffic_spike': 0.15 |
| } |
| |
| logger.info("Initialized CausalInferenceEngine") |
| |
| def infer_root_cause(self, symptoms: Dict[str, float]) -> List[Tuple[str, float]]: |
| """ |
| Use Bayesian inference to determine likely root causes |
| |
| Args: |
| symptoms: Dictionary of observed symptoms and their values |
| e.g., {'api_latency': 500, 'error_rate': 0.15, 'cpu_util': 0.9} |
| |
| Returns: |
| List of (root_cause, probability) tuples sorted by probability |
| """ |
| posterior_probs = {} |
| |
| for cause, effects in self.causal_graph.items(): |
| |
| likelihood = self._calculate_likelihood(symptoms, effects) |
| |
| |
| prior = self.prior_probabilities[cause] |
| posterior = likelihood * prior |
| |
| posterior_probs[cause] = posterior |
| |
| |
| total = sum(posterior_probs.values()) |
| if total > 0: |
| posterior_probs = {k: v/total for k, v in posterior_probs.items()} |
| else: |
| |
| posterior_probs = {k: 1.0/len(posterior_probs) for k in posterior_probs} |
| |
| |
| ranked_causes = sorted( |
| posterior_probs.items(), |
| key=lambda x: x[1], |
| reverse=True |
| ) |
| |
| logger.info(f"Inferred root causes: {ranked_causes[:3]}") |
| |
| return ranked_causes |
| |
| def _calculate_likelihood(self, symptoms: Dict[str, float], effects: List[str]) -> float: |
| """ |
| Calculate likelihood of symptoms given a cause |
| |
| Args: |
| symptoms: Observed symptoms |
| effects: Expected effects of the cause |
| |
| Returns: |
| Likelihood score (0-1) |
| """ |
| matching_effects = sum(1 for effect in effects if effect in symptoms) |
| |
| if matching_effects == 0: |
| return 0.1 |
| |
| |
| likelihood = matching_effects / len(effects) |
| |
| return likelihood |
|
|
| |
|
|
| class AdaptiveThresholdLearner: |
| """ |
| Online learning system that adapts thresholds based on historical patterns. |
| Uses exponential moving averages and seasonality detection. |
| """ |
| |
| def __init__(self, window_size: int = 100): |
| self.window_size = window_size |
| self.historical_data: Dict[str, List[Dict]] = {} |
| self.thresholds: Dict[str, Dict] = {} |
| self.seasonality_patterns: Dict[str, Dict] = {} |
| |
| logger.info(f"Initialized AdaptiveThresholdLearner with window_size={window_size}") |
| |
| def update(self, metric: str, value: float, timestamp: datetime.datetime) -> None: |
| """ |
| Update historical data with new metric value |
| |
| Args: |
| metric: Metric name (e.g., 'latency', 'error_rate') |
| value: Metric value |
| timestamp: Timestamp of the measurement |
| """ |
| if metric not in self.historical_data: |
| self.historical_data[metric] = [] |
| |
| self.historical_data[metric].append({ |
| 'value': value, |
| 'timestamp': timestamp |
| }) |
| |
| |
| if len(self.historical_data[metric]) > self.window_size: |
| self.historical_data[metric].pop(0) |
| |
| |
| self._update_threshold(metric) |
| |
| def _update_threshold(self, metric: str) -> None: |
| """ |
| Calculate adaptive threshold using statistical methods |
| |
| Args: |
| metric: Metric name |
| """ |
| data = self.historical_data[metric] |
| if len(data) < 10: |
| return |
| |
| try: |
| values = [d['value'] for d in data] |
| |
| |
| mean = np.mean(values) |
| std = np.std(values) |
| percentile_90 = np.percentile(values, 90) |
| percentile_95 = np.percentile(values, 95) |
| |
| |
| hour_of_day = data[-1]['timestamp'].hour |
| day_of_week = data[-1]['timestamp'].weekday() |
| |
| |
| time_multiplier = self._get_time_multiplier(hour_of_day, day_of_week) |
| |
| |
| threshold = (mean + 2 * std) * time_multiplier |
| |
| self.thresholds[metric] = { |
| 'value': threshold, |
| 'mean': mean, |
| 'std': std, |
| 'p90': percentile_90, |
| 'p95': percentile_95, |
| 'last_updated': datetime.datetime.now(), |
| 'time_multiplier': time_multiplier |
| } |
| |
| logger.debug(f"Updated threshold for {metric}: {threshold:.2f}") |
| |
| except Exception as e: |
| logger.error(f"Failed to update threshold for {metric}: {e}") |
| |
| def _get_time_multiplier(self, hour: int, day_of_week: int) -> float: |
| """ |
| Adjust threshold based on time of day and day of week |
| |
| Args: |
| hour: Hour of day (0-23) |
| day_of_week: Day of week (0=Monday, 6=Sunday) |
| |
| Returns: |
| Multiplier for threshold adjustment |
| """ |
| |
| if 9 <= hour <= 17 and day_of_week < 5: |
| return 1.2 |
| |
| |
| return 0.8 |
| |
| def get_threshold(self, metric: str) -> Optional[float]: |
| """ |
| Get current adaptive threshold for metric |
| |
| Args: |
| metric: Metric name |
| |
| Returns: |
| Current threshold value or None if not available |
| """ |
| if metric in self.thresholds: |
| return self.thresholds[metric]['value'] |
| return None |
| |
| def get_statistics(self, metric: str) -> Optional[Dict]: |
| """ |
| Get full statistics for a metric |
| |
| Args: |
| metric: Metric name |
| |
| Returns: |
| Dictionary of statistics or None |
| """ |
| return self.thresholds.get(metric) |
|
|
| |
|
|
| def create_feature_vector(event) -> np.ndarray: |
| """ |
| Convert ReliabilityEvent to feature vector for ML models |
| |
| Args: |
| event: ReliabilityEvent object |
| |
| Returns: |
| numpy array of [latency, error_rate, cpu, memory, throughput] |
| """ |
| return np.array([ |
| event.latency_p99, |
| event.error_rate, |
| event.cpu_util if event.cpu_util is not None else 0.5, |
| event.memory_util if event.memory_util is not None else 0.5, |
| event.throughput |
| ]) |