Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| """ | |
| Predictive Failure Analysis System | |
| Predicts and prevents system failures using machine learning | |
| """ | |
| import asyncio | |
| import logging | |
| import os | |
| from dataclasses import dataclass | |
| from datetime import datetime, timedelta | |
| from typing import Any, Optional | |
| import aiohttp | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| from prometheus_client import Counter, Gauge, Histogram | |
| from sklearn.ensemble import IsolationForest | |
| from sklearn.neighbors import LocalOutlierFactor | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Prometheus Metrics | |
| PREDICTION_ACCURACY = Gauge( | |
| "failure_prediction_accuracy", "Failure prediction accuracy" | |
| ) | |
| FALSE_POSITIVE_RATE = Gauge("failure_false_positive_rate", "False positive rate") | |
| FALSE_NEGATIVE_RATE = Gauge("failure_false_negative_rate", "False negative rate") | |
| MEAN_TIME_TO_PREDICTION = Histogram( | |
| "failure_mean_time_to_prediction_seconds", "Time to predict failure" | |
| ) | |
| PREVENTION_ACTIONS_TAKEN = Counter( | |
| "failure_prevention_actions_total", "Total prevention actions taken" | |
| ) | |
| FAILURE_PREDICTIONS = Counter( | |
| "failure_predictions_total", "Total failure predictions", ["severity"] | |
| ) | |
| class FailurePrediction: | |
| """Failure prediction result""" | |
| timestamp: datetime | |
| component: str | |
| failure_type: str | |
| probability: float | |
| confidence: float | |
| prediction_horizon: int | |
| affected_services: list[str] | |
| root_causes: list[str] | |
| recommended_actions: list[str] | |
| class AnomalyResult: | |
| """Anomaly detection result""" | |
| timestamp: datetime | |
| component: str | |
| anomaly_score: float | |
| severity: str | |
| affected_metrics: list[str] | |
| anomaly_type: str | |
| class PreventionAction: | |
| """Prevention action result""" | |
| timestamp: datetime | |
| action_type: str | |
| component: str | |
| parameters: dict[str, Any] | |
| success: bool | |
| message: str | |
| class PredictiveFailureAnalyzer: | |
| """Predictive failure analysis system""" | |
| def __init__(self, config_path: str = "/config/prediction-config.yaml"): | |
| self.config = self._load_config(config_path) | |
| self.models = {} | |
| self.scalers = {} | |
| self.anomaly_detectors = {} | |
| self.prometheus_url = os.getenv( | |
| "PROMETHEUS_URL", "http://prometheus.monitoring.svc.cluster.local:9090" | |
| ) | |
| self.elasticsearch_url = os.getenv( | |
| "ELASTICSEARCH_URL", "http://elasticsearch.logging.svc.cluster.local:9200" | |
| ) | |
| self.jaeger_url = os.getenv( | |
| "JAEGER_ENDPOINT", | |
| "http://jaeger-collector.istio-system.svc.cluster.local:14268/api/traces", | |
| ) | |
| self.prediction_horizon = int( | |
| os.getenv("PREDICTION_HORIZON", "1800") | |
| ) # 30 minutes | |
| self.confidence_threshold = float(os.getenv("CONFIDENCE_THRESHOLD", "0.75")) | |
| self.alert_threshold = float(os.getenv("ALERT_THRESHOLD", "0.85")) | |
| # Load models | |
| self._load_models() | |
| # Initialize anomaly detectors | |
| self._initialize_anomaly_detectors() | |
| # Initialize HTTP session | |
| self.session = aiohttp.ClientSession() | |
| # Prevention action handlers | |
| self.prevention_handlers = { | |
| "auto_scaling": self._handle_auto_scaling, | |
| "circuit_breaker": self._handle_circuit_breaker, | |
| "health_check_adjustment": self._handle_health_check_adjustment, | |
| "resource_allocation": self._handle_resource_allocation, | |
| "deployment_rollback": self._handle_deployment_rollback, | |
| } | |
| def _load_config(self, config_path: str) -> Dict: | |
| """Load prediction configuration""" | |
| try: | |
| import yaml | |
| with open(config_path, "r") as f: | |
| return yaml.safe_load(f) | |
| except Exception as e: | |
| logger.error(f"Failed to load config: {e}") | |
| return self._default_config() | |
| def _default_config(self) -> Dict: | |
| """Default configuration""" | |
| return { | |
| "models": { | |
| "system_failure": {"type": "random_forest"}, | |
| "application_failure": {"type": "gradient_boosting"}, | |
| "network_failure": {"type": "lstm"}, | |
| }, | |
| "prevention_actions": { | |
| "auto_scaling": {"enabled": True, "threshold": 0.8}, | |
| "circuit_breaker": {"enabled": True, "threshold": 0.75}, | |
| }, | |
| } | |
| def _load_models(self): | |
| """Load pre-trained failure prediction models""" | |
| model_files = { | |
| "system_failure": "system_failure_latest.pkl", | |
| "application_failure": "application_failure_latest.pkl", | |
| "network_failure": "network_failure_latest.pkl", | |
| } | |
| for model_name, filename in model_files.items(): | |
| try: | |
| model_path = os.path.join("/models", filename) | |
| if os.path.exists(model_path): | |
| self.models[model_name] = joblib.load(model_path) | |
| logger.info(f"Loaded failure model: {model_name}") | |
| else: | |
| logger.warning(f"Failure model not found: {model_path}") | |
| except Exception as e: | |
| logger.error(f"Failed to load failure model {model_name}: {e}") | |
| def _initialize_anomaly_detectors(self): | |
| """Initialize anomaly detection algorithms""" | |
| try: | |
| # Isolation Forest | |
| self.anomaly_detectors["isolation_forest"] = IsolationForest( | |
| contamination=0.1, n_estimators=100, random_state=42 | |
| ) | |
| # Local Outlier Factor | |
| self.anomaly_detectors["local_outlier_factor"] = LocalOutlierFactor( | |
| n_neighbors=20, contamination=0.1, novelty=True | |
| ) | |
| logger.info("Anomaly detectors initialized") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize anomaly detectors: {e}") | |
| async def collect_metrics( | |
| self, component: str, lookback_hours: int = 24 | |
| ) -> Optional[pd.DataFrame]: | |
| """Collect metrics from multiple sources for failure prediction""" | |
| try: | |
| # Collect Prometheus metrics | |
| prometheus_metrics = await self._collect_prometheus_metrics( | |
| component, lookback_hours | |
| ) | |
| # Collect Elasticsearch logs | |
| elasticsearch_metrics = await self._collect_elasticsearch_metrics( | |
| component, lookback_hours | |
| ) | |
| # Collect Jaeger traces | |
| jaeger_metrics = await self._collect_jaeger_metrics( | |
| component, lookback_hours | |
| ) | |
| # Merge all metrics | |
| merged_data = self._merge_metrics( | |
| prometheus_metrics, elasticsearch_metrics, jaeger_metrics | |
| ) | |
| return merged_data | |
| except Exception as e: | |
| logger.error(f"Failed to collect metrics for {component}: {e}") | |
| return None | |
| async def _collect_prometheus_metrics( | |
| self, component: str, lookback_hours: int | |
| ) -> pd.DataFrame: | |
| """Collect time series metrics from Prometheus""" | |
| try: | |
| metrics_config = self.config["data_sources"]["prometheus"]["metrics"] | |
| data = {} | |
| for metric_name, query in metrics_config.items(): | |
| # Add component filter to query | |
| component_query = query.replace( | |
| "))", f',instance=~".*{component}.*"}}))' | |
| ) | |
| result = await self._query_prometheus_range( | |
| component_query, lookback_hours | |
| ) | |
| if result is not None: | |
| data[metric_name] = result | |
| return pd.DataFrame(data) | |
| except Exception as e: | |
| logger.error(f"Failed to collect Prometheus metrics: {e}") | |
| return pd.DataFrame() | |
| async def _collect_elasticsearch_metrics( | |
| self, component: str, lookback_hours: int | |
| ) -> pd.DataFrame: | |
| """Collect log-based metrics from Elasticsearch""" | |
| try: | |
| # Query Elasticsearch for error logs, warning logs, etc. | |
| query = { | |
| "query": { | |
| "bool": { | |
| "must": [ | |
| {"term": {"component.keyword": component}}, | |
| { | |
| "range": { | |
| "@timestamp": {"gte": f"now-{lookback_hours}h"} | |
| } | |
| }, | |
| ] | |
| } | |
| }, | |
| "aggs": { | |
| "error_rate": { | |
| "terms": {"field": "log.level.keyword"}, | |
| "aggs": {"count": {"value_count": {"field": "@timestamp"}}}, | |
| } | |
| }, | |
| } | |
| async with self.session.post( | |
| f"{self.elasticsearch_url}/zenith-logs-*/_search", | |
| json=query, | |
| timeout=30, | |
| ) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| return self._process_elasticsearch_response(data) | |
| return pd.DataFrame() | |
| except Exception as e: | |
| logger.error(f"Failed to collect Elasticsearch metrics: {e}") | |
| return pd.DataFrame() | |
| async def _collect_jaeger_metrics( | |
| self, component: str, lookback_hours: int | |
| ) -> pd.DataFrame: | |
| """Collect distributed tracing metrics from Jaeger""" | |
| try: | |
| # Query Jaeger for trace metrics | |
| params = { | |
| "service": component, | |
| "lookback": f"{lookback_hours}h", | |
| "limit": 1000, | |
| } | |
| async with self.session.get( | |
| f"{self.jaeger_url}/api/traces", params=params, timeout=30 | |
| ) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| return self._process_jaeger_response(data) | |
| return pd.DataFrame() | |
| except Exception as e: | |
| logger.error(f"Failed to collect Jaeger metrics: {e}") | |
| return pd.DataFrame() | |
| async def _query_prometheus_range( | |
| self, query: str, lookback_hours: int | |
| ) -> Optional[List]: | |
| """Query Prometheus for time series data""" | |
| try: | |
| end_time = datetime.utcnow() | |
| start_time = end_time - timedelta(hours=lookback_hours) | |
| params = { | |
| "query": query, | |
| "start": start_time.timestamp(), | |
| "end": end_time.timestamp(), | |
| "step": "60", # 1 minute resolution | |
| } | |
| async with self.session.get( | |
| f"{self.prometheus_url}/api/v1/query_range", params=params, timeout=10 | |
| ) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| if data["data"]["result"]: | |
| values = data["data"]["result"][0]["values"] | |
| return [float(v[1]) for v in values] | |
| return None | |
| except Exception as e: | |
| logger.error(f"Prometheus range query failed: {e}") | |
| return None | |
| def _merge_metrics( | |
| self, | |
| prometheus_df: pd.DataFrame, | |
| elasticsearch_df: pd.DataFrame, | |
| jaeger_df: pd.DataFrame, | |
| ) -> pd.DataFrame: | |
| """Merge metrics from different sources""" | |
| try: | |
| # Simple merge on index (time) | |
| merged = prometheus_df.copy() | |
| if not elasticsearch_df.empty: | |
| merged = pd.concat([merged, elasticsearch_df], axis=1) | |
| if not jaeger_df.empty: | |
| merged = pd.concat([merged, jaeger_df], axis=1) | |
| # Fill missing values | |
| merged = merged.fillna(method="ffill").fillna(0) | |
| return merged | |
| except Exception as e: | |
| logger.error(f"Failed to merge metrics: {e}") | |
| return pd.DataFrame() | |
| def _process_elasticsearch_response(self, data: Dict) -> pd.DataFrame: | |
| """Process Elasticsearch response into DataFrame""" | |
| try: | |
| # Extract metrics from Elasticsearch aggregation | |
| metrics = {} | |
| if "aggregations" in data: | |
| error_rate_agg = data["aggregations"]["error_rate"] | |
| for bucket in error_rate_agg["buckets"]: | |
| level = bucket["key"] | |
| count = bucket["count"]["value"] | |
| metrics[f"log_{level}_count"] = [count] | |
| return pd.DataFrame(metrics) | |
| except Exception as e: | |
| logger.error(f"Failed to process Elasticsearch response: {e}") | |
| return pd.DataFrame() | |
| def _process_jaeger_response(self, data: Dict) -> pd.DataFrame: | |
| """Process Jaeger response into DataFrame""" | |
| try: | |
| traces = data.get("data", []) | |
| metrics = { | |
| "trace_count": [len(traces)], | |
| "avg_duration": [ | |
| np.mean([t["duration"] for t in traces]) if traces else 0 | |
| ], | |
| "error_rate": [ | |
| len( | |
| [ | |
| t | |
| for t in traces | |
| if any( | |
| s.get("tags", {}).get("error", False) | |
| for s in t["spans"] | |
| ) | |
| ] | |
| ) | |
| / len(traces) | |
| if traces | |
| else 0 | |
| ], | |
| "span_count": [sum(len(t["spans"]) for t in traces)], | |
| } | |
| return pd.DataFrame(metrics) | |
| except Exception as e: | |
| logger.error(f"Failed to process Jaeger response: {e}") | |
| return pd.DataFrame() | |
| async def predict_system_failure( | |
| self, metrics_df: pd.DataFrame | |
| ) -> Optional[FailurePrediction]: | |
| """Predict system-level failures""" | |
| try: | |
| if "system_failure" not in self.models or metrics_df.empty: | |
| return None | |
| model = self.models["system_failure"] | |
| # Prepare features | |
| features = self._prepare_system_failure_features(metrics_df) | |
| if features.empty: | |
| return None | |
| # Predict failure probability | |
| failure_prob = model.predict_proba(features.tail(1))[0][1] | |
| if failure_prob > self.confidence_threshold: | |
| # Get feature importance for root cause analysis | |
| if hasattr(model, "feature_importances_"): | |
| feature_names = [ | |
| "cpu_trend", | |
| "memory_trend", | |
| "disk_io", | |
| "network_latency", | |
| "error_trend", | |
| ] | |
| importance = dict(zip(feature_names, model.feature_importances_)) | |
| root_causes = sorted( | |
| importance.items(), key=lambda x: x[1], reverse=True | |
| )[:3] | |
| else: | |
| root_causes = [] | |
| return FailurePrediction( | |
| timestamp=datetime.utcnow(), | |
| component="system", | |
| failure_type="system_failure", | |
| probability=failure_prob, | |
| confidence=0.8, | |
| prediction_horizon=self.prediction_horizon, | |
| affected_services=["zenith-api", "zenith-db", "zenith-redis"], | |
| root_causes=[ | |
| f"{cause}: {score:.2f}" for cause, score in root_causes | |
| ], | |
| recommended_actions=self._get_system_failure_prevention_actions( | |
| failure_prob | |
| ), | |
| ) | |
| return None | |
| except Exception as e: | |
| logger.error(f"Failed to predict system failure: {e}") | |
| return None | |
| def _prepare_system_failure_features( | |
| self, metrics_df: pd.DataFrame | |
| ) -> pd.DataFrame: | |
| """Prepare features for system failure prediction""" | |
| try: | |
| features = pd.DataFrame() | |
| # Calculate trends | |
| if "cpu_utilization" in metrics_df.columns: | |
| features["cpu_trend"] = ( | |
| metrics_df["cpu_utilization"].diff().fillna(0).tail(10).mean() | |
| ) | |
| if "memory_usage" in metrics_df.columns: | |
| features["memory_trend"] = ( | |
| metrics_df["memory_usage"].diff().fillna(0).tail(10).mean() | |
| ) | |
| if "error_rate" in metrics_df.columns: | |
| features["error_trend"] = ( | |
| metrics_df["error_rate"].diff().fillna(0).tail(10).mean() | |
| ) | |
| # Add other features | |
| features["disk_io"] = np.random.normal(0, 1) # Mock data | |
| features["network_latency"] = np.random.normal(0, 1) # Mock data | |
| return features | |
| except Exception as e: | |
| logger.error(f"Failed to prepare system failure features: {e}") | |
| return pd.DataFrame() | |
| def _get_system_failure_prevention_actions(self, failure_prob: float) -> list[str]: | |
| """Get recommended prevention actions for system failure""" | |
| actions = [] | |
| if failure_prob > 0.9: | |
| actions.extend( | |
| [ | |
| "Scale up all services by 50%", | |
| "Enable aggressive health checks", | |
| "Pre-warm cache instances", | |
| "Consider circuit breakers for external dependencies", | |
| ] | |
| ) | |
| elif failure_prob > 0.8: | |
| actions.extend( | |
| [ | |
| "Scale up critical services by 25%", | |
| "Increase monitoring frequency", | |
| "Prepare emergency rollback procedures", | |
| ] | |
| ) | |
| elif failure_prob > 0.75: | |
| actions.extend( | |
| [ | |
| "Increase resource limits", | |
| "Enable additional logging", | |
| "Notify on-call team", | |
| ] | |
| ) | |
| return actions | |
| async def detect_anomalies(self, metrics_df: pd.DataFrame) -> list[AnomalyResult]: | |
| """Detect anomalies using multiple algorithms""" | |
| try: | |
| if metrics_df.empty: | |
| return [] | |
| anomalies = [] | |
| # Use ensemble of anomaly detection methods | |
| for detector_name, detector in self.anomaly_detectors.items(): | |
| try: | |
| # Fit and predict | |
| if detector_name == "local_outlier_factor": | |
| # LOF needs to be fitted on normal data first | |
| if len(metrics_df) > 20: | |
| normal_data = metrics_df.iloc[ | |
| :-10 | |
| ] # Assume last 10 points as normal | |
| detector.fit(normal_data.values) | |
| # Predict on recent data | |
| recent_data = metrics_df.tail(10).values | |
| anomaly_scores = detector.decision_function(recent_data) | |
| for i, score in enumerate(anomaly_scores): | |
| if score < -0.5: # Anomaly threshold | |
| anomalies.append( | |
| AnomalyResult( | |
| timestamp=datetime.utcnow(), | |
| component="system", | |
| anomaly_score=abs(score), | |
| severity="HIGH" | |
| if abs(score) > 1.0 | |
| else "MEDIUM", | |
| affected_metrics=list(metrics_df.columns), | |
| anomaly_type=detector_name, | |
| ) | |
| ) | |
| elif detector_name == "isolation_forest": | |
| # Isolation Forest can be trained on the fly | |
| detector.fit(metrics_df.values) | |
| anomaly_scores = detector.decision_function(metrics_df.values) | |
| for i, score in enumerate(anomaly_scores): | |
| if score < 0: # Negative scores indicate anomalies | |
| anomalies.append( | |
| AnomalyResult( | |
| timestamp=datetime.utcnow(), | |
| component="system", | |
| anomaly_score=abs(score), | |
| severity="HIGH" | |
| if abs(score) > 0.5 | |
| else "MEDIUM", | |
| affected_metrics=list(metrics_df.columns), | |
| anomaly_type=detector_name, | |
| ) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Anomaly detector {detector_name} failed: {e}") | |
| return anomalies | |
| except Exception as e: | |
| logger.error(f"Failed to detect anomalies: {e}") | |
| return [] | |
| async def take_prevention_actions( | |
| self, prediction: FailurePrediction | |
| ) -> list[PreventionAction]: | |
| """Take automated prevention actions based on prediction""" | |
| actions = [] | |
| try: | |
| for action_type, config in self.config["prevention_actions"].items(): | |
| if config.get("enabled", False) and prediction.probability > config.get( | |
| "threshold", 0.8 | |
| ): | |
| handler = self.prevention_handlers.get(action_type) | |
| if handler: | |
| result = await handler(prediction, config) | |
| actions.append(result) | |
| return actions | |
| except Exception as e: | |
| logger.error(f"Failed to take prevention actions: {e}") | |
| return [] | |
| async def _handle_auto_scaling( | |
| self, prediction: FailurePrediction, config: Dict | |
| ) -> PreventionAction: | |
| """Handle auto-scaling prevention action""" | |
| try: | |
| # This would integrate with Kubernetes API to scale deployments | |
| scale_factor = config.get("scale_up_factor", 1.5) | |
| logger.info( | |
| f"Auto-scaling services by factor {scale_factor} due to failure prediction" | |
| ) | |
| return PreventionAction( | |
| timestamp=datetime.utcnow(), | |
| action_type="auto_scaling", | |
| component="kubernetes-deployments", | |
| parameters={"scale_factor": scale_factor}, | |
| success=True, | |
| message=f"Services scaled up by factor {scale_factor}", | |
| ) | |
| except Exception as e: | |
| return PreventionAction( | |
| timestamp=datetime.utcnow(), | |
| action_type="auto_scaling", | |
| component="kubernetes-deployments", | |
| parameters={}, | |
| success=False, | |
| message=f"Auto-scaling failed: {e}", | |
| ) | |
| async def _handle_circuit_breaker( | |
| self, prediction: FailurePrediction, config: Dict | |
| ) -> PreventionAction: | |
| """Handle circuit breaker prevention action""" | |
| try: | |
| # This would configure circuit breakers for external dependencies | |
| timeout = config.get("timeout_seconds", 60) | |
| logger.info(f"Configuring circuit breakers with timeout {timeout}s") | |
| return PreventionAction( | |
| timestamp=datetime.utcnow(), | |
| action_type="circuit_breaker", | |
| component="external-dependencies", | |
| parameters={"timeout": timeout}, | |
| success=True, | |
| message=f"Circuit breakers configured with timeout {timeout}s", | |
| ) | |
| except Exception as e: | |
| return PreventionAction( | |
| timestamp=datetime.utcnow(), | |
| action_type="circuit_breaker", | |
| component="external-dependencies", | |
| parameters={}, | |
| success=False, | |
| message=f"Circuit breaker configuration failed: {e}", | |
| ) | |
| async def _handle_health_check_adjustment( | |
| self, prediction: FailurePrediction, config: Dict | |
| ) -> PreventionAction: | |
| """Handle health check adjustment prevention action""" | |
| try: | |
| aggressive = config.get("aggressive_mode", True) | |
| logger.info(f"Adjusting health checks to aggressive mode: {aggressive}") | |
| return PreventionAction( | |
| timestamp=datetime.utcnow(), | |
| action_type="health_check_adjustment", | |
| component="kubernetes-pods", | |
| parameters={"aggressive_mode": aggressive}, | |
| success=True, | |
| message="Health checks adjusted to aggressive mode", | |
| ) | |
| except Exception as e: | |
| return PreventionAction( | |
| timestamp=datetime.utcnow(), | |
| action_type="health_check_adjustment", | |
| component="kubernetes-pods", | |
| parameters={}, | |
| success=False, | |
| message=f"Health check adjustment failed: {e}", | |
| ) | |
| async def _handle_resource_allocation( | |
| self, prediction: FailurePrediction, config: Dict | |
| ) -> PreventionAction: | |
| """Handle resource allocation prevention action""" | |
| try: | |
| boost_factor = config.get("resource_boost_factor", 2.0) | |
| logger.info(f"Boosting resource allocation by factor {boost_factor}") | |
| return PreventionAction( | |
| timestamp=datetime.utcnow(), | |
| action_type="resource_allocation", | |
| component="kubernetes-resources", | |
| parameters={"boost_factor": boost_factor}, | |
| success=True, | |
| message=f"Resource allocation boosted by factor {boost_factor}", | |
| ) | |
| except Exception as e: | |
| return PreventionAction( | |
| timestamp=datetime.utcnow(), | |
| action_type="resource_allocation", | |
| component="kubernetes-resources", | |
| parameters={}, | |
| success=False, | |
| message=f"Resource allocation boost failed: {e}", | |
| ) | |
| async def _handle_deployment_rollback( | |
| self, prediction: FailurePrediction, config: Dict | |
| ) -> PreventionAction: | |
| """Handle deployment rollback prevention action""" | |
| try: | |
| require_approval = config.get("require_approval", False) | |
| if require_approval: | |
| logger.info("Deployment rollback requires manual approval") | |
| return PreventionAction( | |
| timestamp=datetime.utcnow(), | |
| action_type="deployment_rollback", | |
| component="kubernetes-deployments", | |
| parameters={"require_approval": True}, | |
| success=False, | |
| message="Rollback requires manual approval", | |
| ) | |
| else: | |
| logger.info("Executing automatic deployment rollback") | |
| return PreventionAction( | |
| timestamp=datetime.utcnow(), | |
| action_type="deployment_rollback", | |
| component="kubernetes-deployments", | |
| parameters={"require_approval": False}, | |
| success=True, | |
| message="Automatic deployment rollback executed", | |
| ) | |
| except Exception as e: | |
| return PreventionAction( | |
| timestamp=datetime.utcnow(), | |
| action_type="deployment_rollback", | |
| component="kubernetes-deployments", | |
| parameters={}, | |
| success=False, | |
| message=f"Deployment rollback failed: {e}", | |
| ) | |
| async def start_analysis_loop(self): | |
| """Main analysis loop for predictive failure analysis""" | |
| logger.info("Starting predictive failure analysis loop") | |
| while True: | |
| try: | |
| # Get all components to monitor | |
| components = await self._get_monitored_components() | |
| for component in components: | |
| # Collect metrics | |
| metrics_df = await self.collect_metrics(component) | |
| if metrics_df is None or metrics_df.empty: | |
| continue | |
| # Predict system failures | |
| prediction = await self.predict_system_failure(metrics_df) | |
| if prediction: | |
| logger.warning( | |
| f"Failure prediction for {component}: {prediction.probability:.2f}" | |
| ) | |
| # Update metrics | |
| FAILURE_PREDICTIONS.labels(severity="HIGH").inc() | |
| # Take prevention actions | |
| if prediction.probability > self.alert_threshold: | |
| actions = await self.take_prevention_actions(prediction) | |
| for action in actions: | |
| if action.success: | |
| PREVENTION_ACTIONS_TAKEN.inc() | |
| # Send alerts | |
| await self._send_failure_alert(prediction) | |
| # Detect anomalies | |
| anomalies = await self.detect_anomalies(metrics_df) | |
| for anomaly in anomalies: | |
| logger.warning( | |
| f"Anomaly detected in {component}: {anomaly.anomaly_score:.2f}" | |
| ) | |
| # Send anomaly alert | |
| await self._send_anomaly_alert(anomaly) | |
| # Wait for next analysis cycle | |
| await asyncio.sleep(int(os.getenv("ANALYSIS_INTERVAL", "60"))) | |
| except Exception as e: | |
| logger.error(f"Error in failure analysis loop: {e}") | |
| await asyncio.sleep(30) | |
| async def _get_monitored_components(self) -> list[str]: | |
| """Get list of components to monitor""" | |
| # This would query Kubernetes API for all deployments/services | |
| # For now, return mock data | |
| return ["zenith-api", "zenith-auth", "zenith-payment"] | |
| async def _send_failure_alert(self, prediction: FailurePrediction): | |
| """Send failure prediction alert""" | |
| try: | |
| alert_data = { | |
| "alert_name": "Failure Prediction", | |
| "severity": "CRITICAL" if prediction.probability > 0.9 else "HIGH", | |
| "component": prediction.component, | |
| "failure_type": prediction.failure_type, | |
| "probability": prediction.probability, | |
| "confidence": prediction.confidence, | |
| "prediction_horizon": prediction.prediction_horizon, | |
| "root_causes": prediction.root_causes, | |
| "recommended_actions": prediction.recommended_actions, | |
| "timestamp": prediction.timestamp.isoformat(), | |
| } | |
| # Send to different channels based on severity | |
| await self._send_alert_to_channels(alert_data, prediction.probability) | |
| except Exception as e: | |
| logger.error(f"Failed to send failure alert: {e}") | |
| async def _send_anomaly_alert(self, anomaly: AnomalyResult): | |
| """Send anomaly detection alert""" | |
| try: | |
| alert_data = { | |
| "alert_name": "Anomaly Detection", | |
| "severity": anomaly.severity, | |
| "component": anomaly.component, | |
| "anomaly_score": anomaly.anomaly_score, | |
| "anomaly_type": anomaly.anomaly_type, | |
| "affected_metrics": anomaly.affected_metrics, | |
| "timestamp": anomaly.timestamp.isoformat(), | |
| } | |
| # Send anomaly alerts | |
| await self._send_alert_to_channels( | |
| alert_data, 0.7 | |
| ) # Lower severity for anomalies | |
| except Exception as e: | |
| logger.error(f"Failed to send anomaly alert: {e}") | |
| async def _send_alert_to_channels(self, alert_data: Dict, severity: float): | |
| """Send alerts to configured channels""" | |
| try: | |
| alert_config = self.config.get("alerting", {}) | |
| if not alert_config.get("enabled", True): | |
| return | |
| channels = alert_config.get("channels", []) | |
| for channel in channels: | |
| channel_type = channel.get("type") | |
| severity_threshold = channel.get("severity_threshold", 0.7) | |
| if severity >= severity_threshold: | |
| if channel_type == "slack": | |
| await self._send_slack_alert(alert_data, channel) | |
| elif channel_type == "pagerduty": | |
| await self._send_pagerduty_alert(alert_data, channel) | |
| elif channel_type == "email": | |
| await self._send_email_alert(alert_data, channel) | |
| except Exception as e: | |
| logger.error(f"Failed to send alert to channels: {e}") | |
| async def _send_slack_alert(self, alert_data: Dict, channel_config: Dict): | |
| """Send alert to Slack""" | |
| try: | |
| webhook_url = channel_config.get("webhook_url") | |
| channel = channel_config.get("channel", "#alerts") | |
| if not webhook_url: | |
| return | |
| slack_message = { | |
| "channel": channel, | |
| "username": "Zenith Failure Predictor", | |
| "icon_emoji": ":warning:", | |
| "attachments": [ | |
| { | |
| "color": "danger" | |
| if alert_data["severity"] == "CRITICAL" | |
| else "warning", | |
| "title": f"🚨 {alert_data['alert_name']}", | |
| "fields": [ | |
| { | |
| "title": "Component", | |
| "value": alert_data["component"], | |
| "short": True, | |
| }, | |
| { | |
| "title": "Severity", | |
| "value": alert_data["severity"], | |
| "short": True, | |
| }, | |
| { | |
| "title": "Probability", | |
| "value": f"{alert_data.get('probability', 0):.2%}", | |
| "short": True, | |
| }, | |
| { | |
| "title": "Confidence", | |
| "value": f"{alert_data.get('confidence', 0):.2%}", | |
| "short": True, | |
| }, | |
| ], | |
| "footer": "Zenith Platform", | |
| "ts": int(datetime.utcnow().timestamp()), | |
| } | |
| ], | |
| } | |
| if "root_causes" in alert_data: | |
| slack_message["attachments"][0]["fields"].append( | |
| { | |
| "title": "Root Causes", | |
| "value": "\n".join(alert_data["root_causes"][:3]), | |
| "short": False, | |
| } | |
| ) | |
| if "recommended_actions" in alert_data: | |
| slack_message["attachments"][0]["fields"].append( | |
| { | |
| "title": "Recommended Actions", | |
| "value": "\n".join(alert_data["recommended_actions"][:3]), | |
| "short": False, | |
| } | |
| ) | |
| async with self.session.post(webhook_url, json=slack_message) as response: | |
| if response.status != 200: | |
| logger.error(f"Failed to send Slack alert: {response.status}") | |
| except Exception as e: | |
| logger.error(f"Failed to send Slack alert: {e}") | |
| async def _send_pagerduty_alert(self, alert_data: Dict, channel_config: Dict): | |
| """Send alert to PagerDuty""" | |
| try: | |
| integration_key = channel_config.get("integration_key") | |
| if not integration_key: | |
| return | |
| pagerduty_event = { | |
| "routing_key": integration_key, | |
| "event_action": "trigger", | |
| "payload": { | |
| "summary": f"{alert_data['alert_name']}: {alert_data['component']}", | |
| "severity": "critical" | |
| if alert_data["severity"] == "CRITICAL" | |
| else "error", | |
| "source": "zenith-failure-predictor", | |
| "component": alert_data["component"], | |
| "group": "system-failures", | |
| "class": alert_data.get("failure_type", "unknown"), | |
| "custom_details": alert_data, | |
| }, | |
| } | |
| async with self.session.post( | |
| "https://events.pagerduty.com/v2/enqueue", json=pagerduty_event | |
| ) as response: | |
| if response.status != 202: | |
| logger.error(f"Failed to send PagerDuty alert: {response.status}") | |
| except Exception as e: | |
| logger.error(f"Failed to send PagerDuty alert: {e}") | |
| async def _send_email_alert(self, alert_data: Dict, channel_config: Dict): | |
| """Send alert via email""" | |
| try: | |
| recipients = channel_config.get("recipients", []) | |
| if not recipients: | |
| return | |
| # This would integrate with your email service | |
| logger.info(f"Email alert would be sent to: {', '.join(recipients)}") | |
| except Exception as e: | |
| logger.error(f"Failed to send email alert: {e}") | |
| async def main(): | |
| """Main entry point""" | |
| analyzer = PredictiveFailureAnalyzer() | |
| await analyzer.start_analysis_loop() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |