#!/usr/bin/env python3 """ Predictive Failure Analysis System Predicts and prevents system failures using machine learning """ import asyncio import logging import os from dataclasses import dataclass from datetime import datetime, timedelta from typing import Any, Optional import aiohttp import joblib import numpy as np import pandas as pd from prometheus_client import Counter, Gauge, Histogram from sklearn.ensemble import IsolationForest from sklearn.neighbors import LocalOutlierFactor # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Prometheus Metrics PREDICTION_ACCURACY = Gauge( "failure_prediction_accuracy", "Failure prediction accuracy" ) FALSE_POSITIVE_RATE = Gauge("failure_false_positive_rate", "False positive rate") FALSE_NEGATIVE_RATE = Gauge("failure_false_negative_rate", "False negative rate") MEAN_TIME_TO_PREDICTION = Histogram( "failure_mean_time_to_prediction_seconds", "Time to predict failure" ) PREVENTION_ACTIONS_TAKEN = Counter( "failure_prevention_actions_total", "Total prevention actions taken" ) FAILURE_PREDICTIONS = Counter( "failure_predictions_total", "Total failure predictions", ["severity"] ) @dataclass class FailurePrediction: """Failure prediction result""" timestamp: datetime component: str failure_type: str probability: float confidence: float prediction_horizon: int affected_services: list[str] root_causes: list[str] recommended_actions: list[str] @dataclass class AnomalyResult: """Anomaly detection result""" timestamp: datetime component: str anomaly_score: float severity: str affected_metrics: list[str] anomaly_type: str @dataclass class PreventionAction: """Prevention action result""" timestamp: datetime action_type: str component: str parameters: dict[str, Any] success: bool message: str class PredictiveFailureAnalyzer: """Predictive failure analysis system""" def __init__(self, config_path: str = "/config/prediction-config.yaml"): self.config = self._load_config(config_path) self.models = {} self.scalers = {} self.anomaly_detectors = {} self.prometheus_url = os.getenv( "PROMETHEUS_URL", "http://prometheus.monitoring.svc.cluster.local:9090" ) self.elasticsearch_url = os.getenv( "ELASTICSEARCH_URL", "http://elasticsearch.logging.svc.cluster.local:9200" ) self.jaeger_url = os.getenv( "JAEGER_ENDPOINT", "http://jaeger-collector.istio-system.svc.cluster.local:14268/api/traces", ) self.prediction_horizon = int( os.getenv("PREDICTION_HORIZON", "1800") ) # 30 minutes self.confidence_threshold = float(os.getenv("CONFIDENCE_THRESHOLD", "0.75")) self.alert_threshold = float(os.getenv("ALERT_THRESHOLD", "0.85")) # Load models self._load_models() # Initialize anomaly detectors self._initialize_anomaly_detectors() # Initialize HTTP session self.session = aiohttp.ClientSession() # Prevention action handlers self.prevention_handlers = { "auto_scaling": self._handle_auto_scaling, "circuit_breaker": self._handle_circuit_breaker, "health_check_adjustment": self._handle_health_check_adjustment, "resource_allocation": self._handle_resource_allocation, "deployment_rollback": self._handle_deployment_rollback, } def _load_config(self, config_path: str) -> Dict: """Load prediction configuration""" try: import yaml with open(config_path, "r") as f: return yaml.safe_load(f) except Exception as e: logger.error(f"Failed to load config: {e}") return self._default_config() def _default_config(self) -> Dict: """Default configuration""" return { "models": { "system_failure": {"type": "random_forest"}, "application_failure": {"type": "gradient_boosting"}, "network_failure": {"type": "lstm"}, }, "prevention_actions": { "auto_scaling": {"enabled": True, "threshold": 0.8}, "circuit_breaker": {"enabled": True, "threshold": 0.75}, }, } def _load_models(self): """Load pre-trained failure prediction models""" model_files = { "system_failure": "system_failure_latest.pkl", "application_failure": "application_failure_latest.pkl", "network_failure": "network_failure_latest.pkl", } for model_name, filename in model_files.items(): try: model_path = os.path.join("/models", filename) if os.path.exists(model_path): self.models[model_name] = joblib.load(model_path) logger.info(f"Loaded failure model: {model_name}") else: logger.warning(f"Failure model not found: {model_path}") except Exception as e: logger.error(f"Failed to load failure model {model_name}: {e}") def _initialize_anomaly_detectors(self): """Initialize anomaly detection algorithms""" try: # Isolation Forest self.anomaly_detectors["isolation_forest"] = IsolationForest( contamination=0.1, n_estimators=100, random_state=42 ) # Local Outlier Factor self.anomaly_detectors["local_outlier_factor"] = LocalOutlierFactor( n_neighbors=20, contamination=0.1, novelty=True ) logger.info("Anomaly detectors initialized") except Exception as e: logger.error(f"Failed to initialize anomaly detectors: {e}") async def collect_metrics( self, component: str, lookback_hours: int = 24 ) -> Optional[pd.DataFrame]: """Collect metrics from multiple sources for failure prediction""" try: # Collect Prometheus metrics prometheus_metrics = await self._collect_prometheus_metrics( component, lookback_hours ) # Collect Elasticsearch logs elasticsearch_metrics = await self._collect_elasticsearch_metrics( component, lookback_hours ) # Collect Jaeger traces jaeger_metrics = await self._collect_jaeger_metrics( component, lookback_hours ) # Merge all metrics merged_data = self._merge_metrics( prometheus_metrics, elasticsearch_metrics, jaeger_metrics ) return merged_data except Exception as e: logger.error(f"Failed to collect metrics for {component}: {e}") return None async def _collect_prometheus_metrics( self, component: str, lookback_hours: int ) -> pd.DataFrame: """Collect time series metrics from Prometheus""" try: metrics_config = self.config["data_sources"]["prometheus"]["metrics"] data = {} for metric_name, query in metrics_config.items(): # Add component filter to query component_query = query.replace( "))", f',instance=~".*{component}.*"}}))' ) result = await self._query_prometheus_range( component_query, lookback_hours ) if result is not None: data[metric_name] = result return pd.DataFrame(data) except Exception as e: logger.error(f"Failed to collect Prometheus metrics: {e}") return pd.DataFrame() async def _collect_elasticsearch_metrics( self, component: str, lookback_hours: int ) -> pd.DataFrame: """Collect log-based metrics from Elasticsearch""" try: # Query Elasticsearch for error logs, warning logs, etc. query = { "query": { "bool": { "must": [ {"term": {"component.keyword": component}}, { "range": { "@timestamp": {"gte": f"now-{lookback_hours}h"} } }, ] } }, "aggs": { "error_rate": { "terms": {"field": "log.level.keyword"}, "aggs": {"count": {"value_count": {"field": "@timestamp"}}}, } }, } async with self.session.post( f"{self.elasticsearch_url}/zenith-logs-*/_search", json=query, timeout=30, ) as response: if response.status == 200: data = await response.json() return self._process_elasticsearch_response(data) return pd.DataFrame() except Exception as e: logger.error(f"Failed to collect Elasticsearch metrics: {e}") return pd.DataFrame() async def _collect_jaeger_metrics( self, component: str, lookback_hours: int ) -> pd.DataFrame: """Collect distributed tracing metrics from Jaeger""" try: # Query Jaeger for trace metrics params = { "service": component, "lookback": f"{lookback_hours}h", "limit": 1000, } async with self.session.get( f"{self.jaeger_url}/api/traces", params=params, timeout=30 ) as response: if response.status == 200: data = await response.json() return self._process_jaeger_response(data) return pd.DataFrame() except Exception as e: logger.error(f"Failed to collect Jaeger metrics: {e}") return pd.DataFrame() async def _query_prometheus_range( self, query: str, lookback_hours: int ) -> Optional[List]: """Query Prometheus for time series data""" try: end_time = datetime.utcnow() start_time = end_time - timedelta(hours=lookback_hours) params = { "query": query, "start": start_time.timestamp(), "end": end_time.timestamp(), "step": "60", # 1 minute resolution } async with self.session.get( f"{self.prometheus_url}/api/v1/query_range", params=params, timeout=10 ) as response: if response.status == 200: data = await response.json() if data["data"]["result"]: values = data["data"]["result"][0]["values"] return [float(v[1]) for v in values] return None except Exception as e: logger.error(f"Prometheus range query failed: {e}") return None def _merge_metrics( self, prometheus_df: pd.DataFrame, elasticsearch_df: pd.DataFrame, jaeger_df: pd.DataFrame, ) -> pd.DataFrame: """Merge metrics from different sources""" try: # Simple merge on index (time) merged = prometheus_df.copy() if not elasticsearch_df.empty: merged = pd.concat([merged, elasticsearch_df], axis=1) if not jaeger_df.empty: merged = pd.concat([merged, jaeger_df], axis=1) # Fill missing values merged = merged.fillna(method="ffill").fillna(0) return merged except Exception as e: logger.error(f"Failed to merge metrics: {e}") return pd.DataFrame() def _process_elasticsearch_response(self, data: Dict) -> pd.DataFrame: """Process Elasticsearch response into DataFrame""" try: # Extract metrics from Elasticsearch aggregation metrics = {} if "aggregations" in data: error_rate_agg = data["aggregations"]["error_rate"] for bucket in error_rate_agg["buckets"]: level = bucket["key"] count = bucket["count"]["value"] metrics[f"log_{level}_count"] = [count] return pd.DataFrame(metrics) except Exception as e: logger.error(f"Failed to process Elasticsearch response: {e}") return pd.DataFrame() def _process_jaeger_response(self, data: Dict) -> pd.DataFrame: """Process Jaeger response into DataFrame""" try: traces = data.get("data", []) metrics = { "trace_count": [len(traces)], "avg_duration": [ np.mean([t["duration"] for t in traces]) if traces else 0 ], "error_rate": [ len( [ t for t in traces if any( s.get("tags", {}).get("error", False) for s in t["spans"] ) ] ) / len(traces) if traces else 0 ], "span_count": [sum(len(t["spans"]) for t in traces)], } return pd.DataFrame(metrics) except Exception as e: logger.error(f"Failed to process Jaeger response: {e}") return pd.DataFrame() async def predict_system_failure( self, metrics_df: pd.DataFrame ) -> Optional[FailurePrediction]: """Predict system-level failures""" try: if "system_failure" not in self.models or metrics_df.empty: return None model = self.models["system_failure"] # Prepare features features = self._prepare_system_failure_features(metrics_df) if features.empty: return None # Predict failure probability failure_prob = model.predict_proba(features.tail(1))[0][1] if failure_prob > self.confidence_threshold: # Get feature importance for root cause analysis if hasattr(model, "feature_importances_"): feature_names = [ "cpu_trend", "memory_trend", "disk_io", "network_latency", "error_trend", ] importance = dict(zip(feature_names, model.feature_importances_)) root_causes = sorted( importance.items(), key=lambda x: x[1], reverse=True )[:3] else: root_causes = [] return FailurePrediction( timestamp=datetime.utcnow(), component="system", failure_type="system_failure", probability=failure_prob, confidence=0.8, prediction_horizon=self.prediction_horizon, affected_services=["zenith-api", "zenith-db", "zenith-redis"], root_causes=[ f"{cause}: {score:.2f}" for cause, score in root_causes ], recommended_actions=self._get_system_failure_prevention_actions( failure_prob ), ) return None except Exception as e: logger.error(f"Failed to predict system failure: {e}") return None def _prepare_system_failure_features( self, metrics_df: pd.DataFrame ) -> pd.DataFrame: """Prepare features for system failure prediction""" try: features = pd.DataFrame() # Calculate trends if "cpu_utilization" in metrics_df.columns: features["cpu_trend"] = ( metrics_df["cpu_utilization"].diff().fillna(0).tail(10).mean() ) if "memory_usage" in metrics_df.columns: features["memory_trend"] = ( metrics_df["memory_usage"].diff().fillna(0).tail(10).mean() ) if "error_rate" in metrics_df.columns: features["error_trend"] = ( metrics_df["error_rate"].diff().fillna(0).tail(10).mean() ) # Add other features features["disk_io"] = np.random.normal(0, 1) # Mock data features["network_latency"] = np.random.normal(0, 1) # Mock data return features except Exception as e: logger.error(f"Failed to prepare system failure features: {e}") return pd.DataFrame() def _get_system_failure_prevention_actions(self, failure_prob: float) -> list[str]: """Get recommended prevention actions for system failure""" actions = [] if failure_prob > 0.9: actions.extend( [ "Scale up all services by 50%", "Enable aggressive health checks", "Pre-warm cache instances", "Consider circuit breakers for external dependencies", ] ) elif failure_prob > 0.8: actions.extend( [ "Scale up critical services by 25%", "Increase monitoring frequency", "Prepare emergency rollback procedures", ] ) elif failure_prob > 0.75: actions.extend( [ "Increase resource limits", "Enable additional logging", "Notify on-call team", ] ) return actions async def detect_anomalies(self, metrics_df: pd.DataFrame) -> list[AnomalyResult]: """Detect anomalies using multiple algorithms""" try: if metrics_df.empty: return [] anomalies = [] # Use ensemble of anomaly detection methods for detector_name, detector in self.anomaly_detectors.items(): try: # Fit and predict if detector_name == "local_outlier_factor": # LOF needs to be fitted on normal data first if len(metrics_df) > 20: normal_data = metrics_df.iloc[ :-10 ] # Assume last 10 points as normal detector.fit(normal_data.values) # Predict on recent data recent_data = metrics_df.tail(10).values anomaly_scores = detector.decision_function(recent_data) for i, score in enumerate(anomaly_scores): if score < -0.5: # Anomaly threshold anomalies.append( AnomalyResult( timestamp=datetime.utcnow(), component="system", anomaly_score=abs(score), severity="HIGH" if abs(score) > 1.0 else "MEDIUM", affected_metrics=list(metrics_df.columns), anomaly_type=detector_name, ) ) elif detector_name == "isolation_forest": # Isolation Forest can be trained on the fly detector.fit(metrics_df.values) anomaly_scores = detector.decision_function(metrics_df.values) for i, score in enumerate(anomaly_scores): if score < 0: # Negative scores indicate anomalies anomalies.append( AnomalyResult( timestamp=datetime.utcnow(), component="system", anomaly_score=abs(score), severity="HIGH" if abs(score) > 0.5 else "MEDIUM", affected_metrics=list(metrics_df.columns), anomaly_type=detector_name, ) ) except Exception as e: logger.error(f"Anomaly detector {detector_name} failed: {e}") return anomalies except Exception as e: logger.error(f"Failed to detect anomalies: {e}") return [] async def take_prevention_actions( self, prediction: FailurePrediction ) -> list[PreventionAction]: """Take automated prevention actions based on prediction""" actions = [] try: for action_type, config in self.config["prevention_actions"].items(): if config.get("enabled", False) and prediction.probability > config.get( "threshold", 0.8 ): handler = self.prevention_handlers.get(action_type) if handler: result = await handler(prediction, config) actions.append(result) return actions except Exception as e: logger.error(f"Failed to take prevention actions: {e}") return [] async def _handle_auto_scaling( self, prediction: FailurePrediction, config: Dict ) -> PreventionAction: """Handle auto-scaling prevention action""" try: # This would integrate with Kubernetes API to scale deployments scale_factor = config.get("scale_up_factor", 1.5) logger.info( f"Auto-scaling services by factor {scale_factor} due to failure prediction" ) return PreventionAction( timestamp=datetime.utcnow(), action_type="auto_scaling", component="kubernetes-deployments", parameters={"scale_factor": scale_factor}, success=True, message=f"Services scaled up by factor {scale_factor}", ) except Exception as e: return PreventionAction( timestamp=datetime.utcnow(), action_type="auto_scaling", component="kubernetes-deployments", parameters={}, success=False, message=f"Auto-scaling failed: {e}", ) async def _handle_circuit_breaker( self, prediction: FailurePrediction, config: Dict ) -> PreventionAction: """Handle circuit breaker prevention action""" try: # This would configure circuit breakers for external dependencies timeout = config.get("timeout_seconds", 60) logger.info(f"Configuring circuit breakers with timeout {timeout}s") return PreventionAction( timestamp=datetime.utcnow(), action_type="circuit_breaker", component="external-dependencies", parameters={"timeout": timeout}, success=True, message=f"Circuit breakers configured with timeout {timeout}s", ) except Exception as e: return PreventionAction( timestamp=datetime.utcnow(), action_type="circuit_breaker", component="external-dependencies", parameters={}, success=False, message=f"Circuit breaker configuration failed: {e}", ) async def _handle_health_check_adjustment( self, prediction: FailurePrediction, config: Dict ) -> PreventionAction: """Handle health check adjustment prevention action""" try: aggressive = config.get("aggressive_mode", True) logger.info(f"Adjusting health checks to aggressive mode: {aggressive}") return PreventionAction( timestamp=datetime.utcnow(), action_type="health_check_adjustment", component="kubernetes-pods", parameters={"aggressive_mode": aggressive}, success=True, message="Health checks adjusted to aggressive mode", ) except Exception as e: return PreventionAction( timestamp=datetime.utcnow(), action_type="health_check_adjustment", component="kubernetes-pods", parameters={}, success=False, message=f"Health check adjustment failed: {e}", ) async def _handle_resource_allocation( self, prediction: FailurePrediction, config: Dict ) -> PreventionAction: """Handle resource allocation prevention action""" try: boost_factor = config.get("resource_boost_factor", 2.0) logger.info(f"Boosting resource allocation by factor {boost_factor}") return PreventionAction( timestamp=datetime.utcnow(), action_type="resource_allocation", component="kubernetes-resources", parameters={"boost_factor": boost_factor}, success=True, message=f"Resource allocation boosted by factor {boost_factor}", ) except Exception as e: return PreventionAction( timestamp=datetime.utcnow(), action_type="resource_allocation", component="kubernetes-resources", parameters={}, success=False, message=f"Resource allocation boost failed: {e}", ) async def _handle_deployment_rollback( self, prediction: FailurePrediction, config: Dict ) -> PreventionAction: """Handle deployment rollback prevention action""" try: require_approval = config.get("require_approval", False) if require_approval: logger.info("Deployment rollback requires manual approval") return PreventionAction( timestamp=datetime.utcnow(), action_type="deployment_rollback", component="kubernetes-deployments", parameters={"require_approval": True}, success=False, message="Rollback requires manual approval", ) else: logger.info("Executing automatic deployment rollback") return PreventionAction( timestamp=datetime.utcnow(), action_type="deployment_rollback", component="kubernetes-deployments", parameters={"require_approval": False}, success=True, message="Automatic deployment rollback executed", ) except Exception as e: return PreventionAction( timestamp=datetime.utcnow(), action_type="deployment_rollback", component="kubernetes-deployments", parameters={}, success=False, message=f"Deployment rollback failed: {e}", ) async def start_analysis_loop(self): """Main analysis loop for predictive failure analysis""" logger.info("Starting predictive failure analysis loop") while True: try: # Get all components to monitor components = await self._get_monitored_components() for component in components: # Collect metrics metrics_df = await self.collect_metrics(component) if metrics_df is None or metrics_df.empty: continue # Predict system failures prediction = await self.predict_system_failure(metrics_df) if prediction: logger.warning( f"Failure prediction for {component}: {prediction.probability:.2f}" ) # Update metrics FAILURE_PREDICTIONS.labels(severity="HIGH").inc() # Take prevention actions if prediction.probability > self.alert_threshold: actions = await self.take_prevention_actions(prediction) for action in actions: if action.success: PREVENTION_ACTIONS_TAKEN.inc() # Send alerts await self._send_failure_alert(prediction) # Detect anomalies anomalies = await self.detect_anomalies(metrics_df) for anomaly in anomalies: logger.warning( f"Anomaly detected in {component}: {anomaly.anomaly_score:.2f}" ) # Send anomaly alert await self._send_anomaly_alert(anomaly) # Wait for next analysis cycle await asyncio.sleep(int(os.getenv("ANALYSIS_INTERVAL", "60"))) except Exception as e: logger.error(f"Error in failure analysis loop: {e}") await asyncio.sleep(30) async def _get_monitored_components(self) -> list[str]: """Get list of components to monitor""" # This would query Kubernetes API for all deployments/services # For now, return mock data return ["zenith-api", "zenith-auth", "zenith-payment"] async def _send_failure_alert(self, prediction: FailurePrediction): """Send failure prediction alert""" try: alert_data = { "alert_name": "Failure Prediction", "severity": "CRITICAL" if prediction.probability > 0.9 else "HIGH", "component": prediction.component, "failure_type": prediction.failure_type, "probability": prediction.probability, "confidence": prediction.confidence, "prediction_horizon": prediction.prediction_horizon, "root_causes": prediction.root_causes, "recommended_actions": prediction.recommended_actions, "timestamp": prediction.timestamp.isoformat(), } # Send to different channels based on severity await self._send_alert_to_channels(alert_data, prediction.probability) except Exception as e: logger.error(f"Failed to send failure alert: {e}") async def _send_anomaly_alert(self, anomaly: AnomalyResult): """Send anomaly detection alert""" try: alert_data = { "alert_name": "Anomaly Detection", "severity": anomaly.severity, "component": anomaly.component, "anomaly_score": anomaly.anomaly_score, "anomaly_type": anomaly.anomaly_type, "affected_metrics": anomaly.affected_metrics, "timestamp": anomaly.timestamp.isoformat(), } # Send anomaly alerts await self._send_alert_to_channels( alert_data, 0.7 ) # Lower severity for anomalies except Exception as e: logger.error(f"Failed to send anomaly alert: {e}") async def _send_alert_to_channels(self, alert_data: Dict, severity: float): """Send alerts to configured channels""" try: alert_config = self.config.get("alerting", {}) if not alert_config.get("enabled", True): return channels = alert_config.get("channels", []) for channel in channels: channel_type = channel.get("type") severity_threshold = channel.get("severity_threshold", 0.7) if severity >= severity_threshold: if channel_type == "slack": await self._send_slack_alert(alert_data, channel) elif channel_type == "pagerduty": await self._send_pagerduty_alert(alert_data, channel) elif channel_type == "email": await self._send_email_alert(alert_data, channel) except Exception as e: logger.error(f"Failed to send alert to channels: {e}") async def _send_slack_alert(self, alert_data: Dict, channel_config: Dict): """Send alert to Slack""" try: webhook_url = channel_config.get("webhook_url") channel = channel_config.get("channel", "#alerts") if not webhook_url: return slack_message = { "channel": channel, "username": "Zenith Failure Predictor", "icon_emoji": ":warning:", "attachments": [ { "color": "danger" if alert_data["severity"] == "CRITICAL" else "warning", "title": f"🚨 {alert_data['alert_name']}", "fields": [ { "title": "Component", "value": alert_data["component"], "short": True, }, { "title": "Severity", "value": alert_data["severity"], "short": True, }, { "title": "Probability", "value": f"{alert_data.get('probability', 0):.2%}", "short": True, }, { "title": "Confidence", "value": f"{alert_data.get('confidence', 0):.2%}", "short": True, }, ], "footer": "Zenith Platform", "ts": int(datetime.utcnow().timestamp()), } ], } if "root_causes" in alert_data: slack_message["attachments"][0]["fields"].append( { "title": "Root Causes", "value": "\n".join(alert_data["root_causes"][:3]), "short": False, } ) if "recommended_actions" in alert_data: slack_message["attachments"][0]["fields"].append( { "title": "Recommended Actions", "value": "\n".join(alert_data["recommended_actions"][:3]), "short": False, } ) async with self.session.post(webhook_url, json=slack_message) as response: if response.status != 200: logger.error(f"Failed to send Slack alert: {response.status}") except Exception as e: logger.error(f"Failed to send Slack alert: {e}") async def _send_pagerduty_alert(self, alert_data: Dict, channel_config: Dict): """Send alert to PagerDuty""" try: integration_key = channel_config.get("integration_key") if not integration_key: return pagerduty_event = { "routing_key": integration_key, "event_action": "trigger", "payload": { "summary": f"{alert_data['alert_name']}: {alert_data['component']}", "severity": "critical" if alert_data["severity"] == "CRITICAL" else "error", "source": "zenith-failure-predictor", "component": alert_data["component"], "group": "system-failures", "class": alert_data.get("failure_type", "unknown"), "custom_details": alert_data, }, } async with self.session.post( "https://events.pagerduty.com/v2/enqueue", json=pagerduty_event ) as response: if response.status != 202: logger.error(f"Failed to send PagerDuty alert: {response.status}") except Exception as e: logger.error(f"Failed to send PagerDuty alert: {e}") async def _send_email_alert(self, alert_data: Dict, channel_config: Dict): """Send alert via email""" try: recipients = channel_config.get("recipients", []) if not recipients: return # This would integrate with your email service logger.info(f"Email alert would be sent to: {', '.join(recipients)}") except Exception as e: logger.error(f"Failed to send email alert: {e}") async def main(): """Main entry point""" analyzer = PredictiveFailureAnalyzer() await analyzer.start_analysis_loop() if __name__ == "__main__": asyncio.run(main())