zenith-backend / app /services /ai /ops /src /predictive_failure_analyzer.py
teoat's picture
fix(backend): fix port and health check robustness
d29a5a0 verified
#!/usr/bin/env python3
"""
Predictive Failure Analysis System
Predicts and prevents system failures using machine learning
"""
import asyncio
import logging
import os
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Optional
import aiohttp
import joblib
import numpy as np
import pandas as pd
from prometheus_client import Counter, Gauge, Histogram
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Prometheus Metrics
PREDICTION_ACCURACY = Gauge(
"failure_prediction_accuracy", "Failure prediction accuracy"
)
FALSE_POSITIVE_RATE = Gauge("failure_false_positive_rate", "False positive rate")
FALSE_NEGATIVE_RATE = Gauge("failure_false_negative_rate", "False negative rate")
MEAN_TIME_TO_PREDICTION = Histogram(
"failure_mean_time_to_prediction_seconds", "Time to predict failure"
)
PREVENTION_ACTIONS_TAKEN = Counter(
"failure_prevention_actions_total", "Total prevention actions taken"
)
FAILURE_PREDICTIONS = Counter(
"failure_predictions_total", "Total failure predictions", ["severity"]
)
@dataclass
class FailurePrediction:
"""Failure prediction result"""
timestamp: datetime
component: str
failure_type: str
probability: float
confidence: float
prediction_horizon: int
affected_services: list[str]
root_causes: list[str]
recommended_actions: list[str]
@dataclass
class AnomalyResult:
"""Anomaly detection result"""
timestamp: datetime
component: str
anomaly_score: float
severity: str
affected_metrics: list[str]
anomaly_type: str
@dataclass
class PreventionAction:
"""Prevention action result"""
timestamp: datetime
action_type: str
component: str
parameters: dict[str, Any]
success: bool
message: str
class PredictiveFailureAnalyzer:
"""Predictive failure analysis system"""
def __init__(self, config_path: str = "/config/prediction-config.yaml"):
self.config = self._load_config(config_path)
self.models = {}
self.scalers = {}
self.anomaly_detectors = {}
self.prometheus_url = os.getenv(
"PROMETHEUS_URL", "http://prometheus.monitoring.svc.cluster.local:9090"
)
self.elasticsearch_url = os.getenv(
"ELASTICSEARCH_URL", "http://elasticsearch.logging.svc.cluster.local:9200"
)
self.jaeger_url = os.getenv(
"JAEGER_ENDPOINT",
"http://jaeger-collector.istio-system.svc.cluster.local:14268/api/traces",
)
self.prediction_horizon = int(
os.getenv("PREDICTION_HORIZON", "1800")
) # 30 minutes
self.confidence_threshold = float(os.getenv("CONFIDENCE_THRESHOLD", "0.75"))
self.alert_threshold = float(os.getenv("ALERT_THRESHOLD", "0.85"))
# Load models
self._load_models()
# Initialize anomaly detectors
self._initialize_anomaly_detectors()
# Initialize HTTP session
self.session = aiohttp.ClientSession()
# Prevention action handlers
self.prevention_handlers = {
"auto_scaling": self._handle_auto_scaling,
"circuit_breaker": self._handle_circuit_breaker,
"health_check_adjustment": self._handle_health_check_adjustment,
"resource_allocation": self._handle_resource_allocation,
"deployment_rollback": self._handle_deployment_rollback,
}
def _load_config(self, config_path: str) -> Dict:
"""Load prediction configuration"""
try:
import yaml
with open(config_path, "r") as f:
return yaml.safe_load(f)
except Exception as e:
logger.error(f"Failed to load config: {e}")
return self._default_config()
def _default_config(self) -> Dict:
"""Default configuration"""
return {
"models": {
"system_failure": {"type": "random_forest"},
"application_failure": {"type": "gradient_boosting"},
"network_failure": {"type": "lstm"},
},
"prevention_actions": {
"auto_scaling": {"enabled": True, "threshold": 0.8},
"circuit_breaker": {"enabled": True, "threshold": 0.75},
},
}
def _load_models(self):
"""Load pre-trained failure prediction models"""
model_files = {
"system_failure": "system_failure_latest.pkl",
"application_failure": "application_failure_latest.pkl",
"network_failure": "network_failure_latest.pkl",
}
for model_name, filename in model_files.items():
try:
model_path = os.path.join("/models", filename)
if os.path.exists(model_path):
self.models[model_name] = joblib.load(model_path)
logger.info(f"Loaded failure model: {model_name}")
else:
logger.warning(f"Failure model not found: {model_path}")
except Exception as e:
logger.error(f"Failed to load failure model {model_name}: {e}")
def _initialize_anomaly_detectors(self):
"""Initialize anomaly detection algorithms"""
try:
# Isolation Forest
self.anomaly_detectors["isolation_forest"] = IsolationForest(
contamination=0.1, n_estimators=100, random_state=42
)
# Local Outlier Factor
self.anomaly_detectors["local_outlier_factor"] = LocalOutlierFactor(
n_neighbors=20, contamination=0.1, novelty=True
)
logger.info("Anomaly detectors initialized")
except Exception as e:
logger.error(f"Failed to initialize anomaly detectors: {e}")
async def collect_metrics(
self, component: str, lookback_hours: int = 24
) -> Optional[pd.DataFrame]:
"""Collect metrics from multiple sources for failure prediction"""
try:
# Collect Prometheus metrics
prometheus_metrics = await self._collect_prometheus_metrics(
component, lookback_hours
)
# Collect Elasticsearch logs
elasticsearch_metrics = await self._collect_elasticsearch_metrics(
component, lookback_hours
)
# Collect Jaeger traces
jaeger_metrics = await self._collect_jaeger_metrics(
component, lookback_hours
)
# Merge all metrics
merged_data = self._merge_metrics(
prometheus_metrics, elasticsearch_metrics, jaeger_metrics
)
return merged_data
except Exception as e:
logger.error(f"Failed to collect metrics for {component}: {e}")
return None
async def _collect_prometheus_metrics(
self, component: str, lookback_hours: int
) -> pd.DataFrame:
"""Collect time series metrics from Prometheus"""
try:
metrics_config = self.config["data_sources"]["prometheus"]["metrics"]
data = {}
for metric_name, query in metrics_config.items():
# Add component filter to query
component_query = query.replace(
"))", f',instance=~".*{component}.*"}}))'
)
result = await self._query_prometheus_range(
component_query, lookback_hours
)
if result is not None:
data[metric_name] = result
return pd.DataFrame(data)
except Exception as e:
logger.error(f"Failed to collect Prometheus metrics: {e}")
return pd.DataFrame()
async def _collect_elasticsearch_metrics(
self, component: str, lookback_hours: int
) -> pd.DataFrame:
"""Collect log-based metrics from Elasticsearch"""
try:
# Query Elasticsearch for error logs, warning logs, etc.
query = {
"query": {
"bool": {
"must": [
{"term": {"component.keyword": component}},
{
"range": {
"@timestamp": {"gte": f"now-{lookback_hours}h"}
}
},
]
}
},
"aggs": {
"error_rate": {
"terms": {"field": "log.level.keyword"},
"aggs": {"count": {"value_count": {"field": "@timestamp"}}},
}
},
}
async with self.session.post(
f"{self.elasticsearch_url}/zenith-logs-*/_search",
json=query,
timeout=30,
) as response:
if response.status == 200:
data = await response.json()
return self._process_elasticsearch_response(data)
return pd.DataFrame()
except Exception as e:
logger.error(f"Failed to collect Elasticsearch metrics: {e}")
return pd.DataFrame()
async def _collect_jaeger_metrics(
self, component: str, lookback_hours: int
) -> pd.DataFrame:
"""Collect distributed tracing metrics from Jaeger"""
try:
# Query Jaeger for trace metrics
params = {
"service": component,
"lookback": f"{lookback_hours}h",
"limit": 1000,
}
async with self.session.get(
f"{self.jaeger_url}/api/traces", params=params, timeout=30
) as response:
if response.status == 200:
data = await response.json()
return self._process_jaeger_response(data)
return pd.DataFrame()
except Exception as e:
logger.error(f"Failed to collect Jaeger metrics: {e}")
return pd.DataFrame()
async def _query_prometheus_range(
self, query: str, lookback_hours: int
) -> Optional[List]:
"""Query Prometheus for time series data"""
try:
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=lookback_hours)
params = {
"query": query,
"start": start_time.timestamp(),
"end": end_time.timestamp(),
"step": "60", # 1 minute resolution
}
async with self.session.get(
f"{self.prometheus_url}/api/v1/query_range", params=params, timeout=10
) as response:
if response.status == 200:
data = await response.json()
if data["data"]["result"]:
values = data["data"]["result"][0]["values"]
return [float(v[1]) for v in values]
return None
except Exception as e:
logger.error(f"Prometheus range query failed: {e}")
return None
def _merge_metrics(
self,
prometheus_df: pd.DataFrame,
elasticsearch_df: pd.DataFrame,
jaeger_df: pd.DataFrame,
) -> pd.DataFrame:
"""Merge metrics from different sources"""
try:
# Simple merge on index (time)
merged = prometheus_df.copy()
if not elasticsearch_df.empty:
merged = pd.concat([merged, elasticsearch_df], axis=1)
if not jaeger_df.empty:
merged = pd.concat([merged, jaeger_df], axis=1)
# Fill missing values
merged = merged.fillna(method="ffill").fillna(0)
return merged
except Exception as e:
logger.error(f"Failed to merge metrics: {e}")
return pd.DataFrame()
def _process_elasticsearch_response(self, data: Dict) -> pd.DataFrame:
"""Process Elasticsearch response into DataFrame"""
try:
# Extract metrics from Elasticsearch aggregation
metrics = {}
if "aggregations" in data:
error_rate_agg = data["aggregations"]["error_rate"]
for bucket in error_rate_agg["buckets"]:
level = bucket["key"]
count = bucket["count"]["value"]
metrics[f"log_{level}_count"] = [count]
return pd.DataFrame(metrics)
except Exception as e:
logger.error(f"Failed to process Elasticsearch response: {e}")
return pd.DataFrame()
def _process_jaeger_response(self, data: Dict) -> pd.DataFrame:
"""Process Jaeger response into DataFrame"""
try:
traces = data.get("data", [])
metrics = {
"trace_count": [len(traces)],
"avg_duration": [
np.mean([t["duration"] for t in traces]) if traces else 0
],
"error_rate": [
len(
[
t
for t in traces
if any(
s.get("tags", {}).get("error", False)
for s in t["spans"]
)
]
)
/ len(traces)
if traces
else 0
],
"span_count": [sum(len(t["spans"]) for t in traces)],
}
return pd.DataFrame(metrics)
except Exception as e:
logger.error(f"Failed to process Jaeger response: {e}")
return pd.DataFrame()
async def predict_system_failure(
self, metrics_df: pd.DataFrame
) -> Optional[FailurePrediction]:
"""Predict system-level failures"""
try:
if "system_failure" not in self.models or metrics_df.empty:
return None
model = self.models["system_failure"]
# Prepare features
features = self._prepare_system_failure_features(metrics_df)
if features.empty:
return None
# Predict failure probability
failure_prob = model.predict_proba(features.tail(1))[0][1]
if failure_prob > self.confidence_threshold:
# Get feature importance for root cause analysis
if hasattr(model, "feature_importances_"):
feature_names = [
"cpu_trend",
"memory_trend",
"disk_io",
"network_latency",
"error_trend",
]
importance = dict(zip(feature_names, model.feature_importances_))
root_causes = sorted(
importance.items(), key=lambda x: x[1], reverse=True
)[:3]
else:
root_causes = []
return FailurePrediction(
timestamp=datetime.utcnow(),
component="system",
failure_type="system_failure",
probability=failure_prob,
confidence=0.8,
prediction_horizon=self.prediction_horizon,
affected_services=["zenith-api", "zenith-db", "zenith-redis"],
root_causes=[
f"{cause}: {score:.2f}" for cause, score in root_causes
],
recommended_actions=self._get_system_failure_prevention_actions(
failure_prob
),
)
return None
except Exception as e:
logger.error(f"Failed to predict system failure: {e}")
return None
def _prepare_system_failure_features(
self, metrics_df: pd.DataFrame
) -> pd.DataFrame:
"""Prepare features for system failure prediction"""
try:
features = pd.DataFrame()
# Calculate trends
if "cpu_utilization" in metrics_df.columns:
features["cpu_trend"] = (
metrics_df["cpu_utilization"].diff().fillna(0).tail(10).mean()
)
if "memory_usage" in metrics_df.columns:
features["memory_trend"] = (
metrics_df["memory_usage"].diff().fillna(0).tail(10).mean()
)
if "error_rate" in metrics_df.columns:
features["error_trend"] = (
metrics_df["error_rate"].diff().fillna(0).tail(10).mean()
)
# Add other features
features["disk_io"] = np.random.normal(0, 1) # Mock data
features["network_latency"] = np.random.normal(0, 1) # Mock data
return features
except Exception as e:
logger.error(f"Failed to prepare system failure features: {e}")
return pd.DataFrame()
def _get_system_failure_prevention_actions(self, failure_prob: float) -> list[str]:
"""Get recommended prevention actions for system failure"""
actions = []
if failure_prob > 0.9:
actions.extend(
[
"Scale up all services by 50%",
"Enable aggressive health checks",
"Pre-warm cache instances",
"Consider circuit breakers for external dependencies",
]
)
elif failure_prob > 0.8:
actions.extend(
[
"Scale up critical services by 25%",
"Increase monitoring frequency",
"Prepare emergency rollback procedures",
]
)
elif failure_prob > 0.75:
actions.extend(
[
"Increase resource limits",
"Enable additional logging",
"Notify on-call team",
]
)
return actions
async def detect_anomalies(self, metrics_df: pd.DataFrame) -> list[AnomalyResult]:
"""Detect anomalies using multiple algorithms"""
try:
if metrics_df.empty:
return []
anomalies = []
# Use ensemble of anomaly detection methods
for detector_name, detector in self.anomaly_detectors.items():
try:
# Fit and predict
if detector_name == "local_outlier_factor":
# LOF needs to be fitted on normal data first
if len(metrics_df) > 20:
normal_data = metrics_df.iloc[
:-10
] # Assume last 10 points as normal
detector.fit(normal_data.values)
# Predict on recent data
recent_data = metrics_df.tail(10).values
anomaly_scores = detector.decision_function(recent_data)
for i, score in enumerate(anomaly_scores):
if score < -0.5: # Anomaly threshold
anomalies.append(
AnomalyResult(
timestamp=datetime.utcnow(),
component="system",
anomaly_score=abs(score),
severity="HIGH"
if abs(score) > 1.0
else "MEDIUM",
affected_metrics=list(metrics_df.columns),
anomaly_type=detector_name,
)
)
elif detector_name == "isolation_forest":
# Isolation Forest can be trained on the fly
detector.fit(metrics_df.values)
anomaly_scores = detector.decision_function(metrics_df.values)
for i, score in enumerate(anomaly_scores):
if score < 0: # Negative scores indicate anomalies
anomalies.append(
AnomalyResult(
timestamp=datetime.utcnow(),
component="system",
anomaly_score=abs(score),
severity="HIGH"
if abs(score) > 0.5
else "MEDIUM",
affected_metrics=list(metrics_df.columns),
anomaly_type=detector_name,
)
)
except Exception as e:
logger.error(f"Anomaly detector {detector_name} failed: {e}")
return anomalies
except Exception as e:
logger.error(f"Failed to detect anomalies: {e}")
return []
async def take_prevention_actions(
self, prediction: FailurePrediction
) -> list[PreventionAction]:
"""Take automated prevention actions based on prediction"""
actions = []
try:
for action_type, config in self.config["prevention_actions"].items():
if config.get("enabled", False) and prediction.probability > config.get(
"threshold", 0.8
):
handler = self.prevention_handlers.get(action_type)
if handler:
result = await handler(prediction, config)
actions.append(result)
return actions
except Exception as e:
logger.error(f"Failed to take prevention actions: {e}")
return []
async def _handle_auto_scaling(
self, prediction: FailurePrediction, config: Dict
) -> PreventionAction:
"""Handle auto-scaling prevention action"""
try:
# This would integrate with Kubernetes API to scale deployments
scale_factor = config.get("scale_up_factor", 1.5)
logger.info(
f"Auto-scaling services by factor {scale_factor} due to failure prediction"
)
return PreventionAction(
timestamp=datetime.utcnow(),
action_type="auto_scaling",
component="kubernetes-deployments",
parameters={"scale_factor": scale_factor},
success=True,
message=f"Services scaled up by factor {scale_factor}",
)
except Exception as e:
return PreventionAction(
timestamp=datetime.utcnow(),
action_type="auto_scaling",
component="kubernetes-deployments",
parameters={},
success=False,
message=f"Auto-scaling failed: {e}",
)
async def _handle_circuit_breaker(
self, prediction: FailurePrediction, config: Dict
) -> PreventionAction:
"""Handle circuit breaker prevention action"""
try:
# This would configure circuit breakers for external dependencies
timeout = config.get("timeout_seconds", 60)
logger.info(f"Configuring circuit breakers with timeout {timeout}s")
return PreventionAction(
timestamp=datetime.utcnow(),
action_type="circuit_breaker",
component="external-dependencies",
parameters={"timeout": timeout},
success=True,
message=f"Circuit breakers configured with timeout {timeout}s",
)
except Exception as e:
return PreventionAction(
timestamp=datetime.utcnow(),
action_type="circuit_breaker",
component="external-dependencies",
parameters={},
success=False,
message=f"Circuit breaker configuration failed: {e}",
)
async def _handle_health_check_adjustment(
self, prediction: FailurePrediction, config: Dict
) -> PreventionAction:
"""Handle health check adjustment prevention action"""
try:
aggressive = config.get("aggressive_mode", True)
logger.info(f"Adjusting health checks to aggressive mode: {aggressive}")
return PreventionAction(
timestamp=datetime.utcnow(),
action_type="health_check_adjustment",
component="kubernetes-pods",
parameters={"aggressive_mode": aggressive},
success=True,
message="Health checks adjusted to aggressive mode",
)
except Exception as e:
return PreventionAction(
timestamp=datetime.utcnow(),
action_type="health_check_adjustment",
component="kubernetes-pods",
parameters={},
success=False,
message=f"Health check adjustment failed: {e}",
)
async def _handle_resource_allocation(
self, prediction: FailurePrediction, config: Dict
) -> PreventionAction:
"""Handle resource allocation prevention action"""
try:
boost_factor = config.get("resource_boost_factor", 2.0)
logger.info(f"Boosting resource allocation by factor {boost_factor}")
return PreventionAction(
timestamp=datetime.utcnow(),
action_type="resource_allocation",
component="kubernetes-resources",
parameters={"boost_factor": boost_factor},
success=True,
message=f"Resource allocation boosted by factor {boost_factor}",
)
except Exception as e:
return PreventionAction(
timestamp=datetime.utcnow(),
action_type="resource_allocation",
component="kubernetes-resources",
parameters={},
success=False,
message=f"Resource allocation boost failed: {e}",
)
async def _handle_deployment_rollback(
self, prediction: FailurePrediction, config: Dict
) -> PreventionAction:
"""Handle deployment rollback prevention action"""
try:
require_approval = config.get("require_approval", False)
if require_approval:
logger.info("Deployment rollback requires manual approval")
return PreventionAction(
timestamp=datetime.utcnow(),
action_type="deployment_rollback",
component="kubernetes-deployments",
parameters={"require_approval": True},
success=False,
message="Rollback requires manual approval",
)
else:
logger.info("Executing automatic deployment rollback")
return PreventionAction(
timestamp=datetime.utcnow(),
action_type="deployment_rollback",
component="kubernetes-deployments",
parameters={"require_approval": False},
success=True,
message="Automatic deployment rollback executed",
)
except Exception as e:
return PreventionAction(
timestamp=datetime.utcnow(),
action_type="deployment_rollback",
component="kubernetes-deployments",
parameters={},
success=False,
message=f"Deployment rollback failed: {e}",
)
async def start_analysis_loop(self):
"""Main analysis loop for predictive failure analysis"""
logger.info("Starting predictive failure analysis loop")
while True:
try:
# Get all components to monitor
components = await self._get_monitored_components()
for component in components:
# Collect metrics
metrics_df = await self.collect_metrics(component)
if metrics_df is None or metrics_df.empty:
continue
# Predict system failures
prediction = await self.predict_system_failure(metrics_df)
if prediction:
logger.warning(
f"Failure prediction for {component}: {prediction.probability:.2f}"
)
# Update metrics
FAILURE_PREDICTIONS.labels(severity="HIGH").inc()
# Take prevention actions
if prediction.probability > self.alert_threshold:
actions = await self.take_prevention_actions(prediction)
for action in actions:
if action.success:
PREVENTION_ACTIONS_TAKEN.inc()
# Send alerts
await self._send_failure_alert(prediction)
# Detect anomalies
anomalies = await self.detect_anomalies(metrics_df)
for anomaly in anomalies:
logger.warning(
f"Anomaly detected in {component}: {anomaly.anomaly_score:.2f}"
)
# Send anomaly alert
await self._send_anomaly_alert(anomaly)
# Wait for next analysis cycle
await asyncio.sleep(int(os.getenv("ANALYSIS_INTERVAL", "60")))
except Exception as e:
logger.error(f"Error in failure analysis loop: {e}")
await asyncio.sleep(30)
async def _get_monitored_components(self) -> list[str]:
"""Get list of components to monitor"""
# This would query Kubernetes API for all deployments/services
# For now, return mock data
return ["zenith-api", "zenith-auth", "zenith-payment"]
async def _send_failure_alert(self, prediction: FailurePrediction):
"""Send failure prediction alert"""
try:
alert_data = {
"alert_name": "Failure Prediction",
"severity": "CRITICAL" if prediction.probability > 0.9 else "HIGH",
"component": prediction.component,
"failure_type": prediction.failure_type,
"probability": prediction.probability,
"confidence": prediction.confidence,
"prediction_horizon": prediction.prediction_horizon,
"root_causes": prediction.root_causes,
"recommended_actions": prediction.recommended_actions,
"timestamp": prediction.timestamp.isoformat(),
}
# Send to different channels based on severity
await self._send_alert_to_channels(alert_data, prediction.probability)
except Exception as e:
logger.error(f"Failed to send failure alert: {e}")
async def _send_anomaly_alert(self, anomaly: AnomalyResult):
"""Send anomaly detection alert"""
try:
alert_data = {
"alert_name": "Anomaly Detection",
"severity": anomaly.severity,
"component": anomaly.component,
"anomaly_score": anomaly.anomaly_score,
"anomaly_type": anomaly.anomaly_type,
"affected_metrics": anomaly.affected_metrics,
"timestamp": anomaly.timestamp.isoformat(),
}
# Send anomaly alerts
await self._send_alert_to_channels(
alert_data, 0.7
) # Lower severity for anomalies
except Exception as e:
logger.error(f"Failed to send anomaly alert: {e}")
async def _send_alert_to_channels(self, alert_data: Dict, severity: float):
"""Send alerts to configured channels"""
try:
alert_config = self.config.get("alerting", {})
if not alert_config.get("enabled", True):
return
channels = alert_config.get("channels", [])
for channel in channels:
channel_type = channel.get("type")
severity_threshold = channel.get("severity_threshold", 0.7)
if severity >= severity_threshold:
if channel_type == "slack":
await self._send_slack_alert(alert_data, channel)
elif channel_type == "pagerduty":
await self._send_pagerduty_alert(alert_data, channel)
elif channel_type == "email":
await self._send_email_alert(alert_data, channel)
except Exception as e:
logger.error(f"Failed to send alert to channels: {e}")
async def _send_slack_alert(self, alert_data: Dict, channel_config: Dict):
"""Send alert to Slack"""
try:
webhook_url = channel_config.get("webhook_url")
channel = channel_config.get("channel", "#alerts")
if not webhook_url:
return
slack_message = {
"channel": channel,
"username": "Zenith Failure Predictor",
"icon_emoji": ":warning:",
"attachments": [
{
"color": "danger"
if alert_data["severity"] == "CRITICAL"
else "warning",
"title": f"🚨 {alert_data['alert_name']}",
"fields": [
{
"title": "Component",
"value": alert_data["component"],
"short": True,
},
{
"title": "Severity",
"value": alert_data["severity"],
"short": True,
},
{
"title": "Probability",
"value": f"{alert_data.get('probability', 0):.2%}",
"short": True,
},
{
"title": "Confidence",
"value": f"{alert_data.get('confidence', 0):.2%}",
"short": True,
},
],
"footer": "Zenith Platform",
"ts": int(datetime.utcnow().timestamp()),
}
],
}
if "root_causes" in alert_data:
slack_message["attachments"][0]["fields"].append(
{
"title": "Root Causes",
"value": "\n".join(alert_data["root_causes"][:3]),
"short": False,
}
)
if "recommended_actions" in alert_data:
slack_message["attachments"][0]["fields"].append(
{
"title": "Recommended Actions",
"value": "\n".join(alert_data["recommended_actions"][:3]),
"short": False,
}
)
async with self.session.post(webhook_url, json=slack_message) as response:
if response.status != 200:
logger.error(f"Failed to send Slack alert: {response.status}")
except Exception as e:
logger.error(f"Failed to send Slack alert: {e}")
async def _send_pagerduty_alert(self, alert_data: Dict, channel_config: Dict):
"""Send alert to PagerDuty"""
try:
integration_key = channel_config.get("integration_key")
if not integration_key:
return
pagerduty_event = {
"routing_key": integration_key,
"event_action": "trigger",
"payload": {
"summary": f"{alert_data['alert_name']}: {alert_data['component']}",
"severity": "critical"
if alert_data["severity"] == "CRITICAL"
else "error",
"source": "zenith-failure-predictor",
"component": alert_data["component"],
"group": "system-failures",
"class": alert_data.get("failure_type", "unknown"),
"custom_details": alert_data,
},
}
async with self.session.post(
"https://events.pagerduty.com/v2/enqueue", json=pagerduty_event
) as response:
if response.status != 202:
logger.error(f"Failed to send PagerDuty alert: {response.status}")
except Exception as e:
logger.error(f"Failed to send PagerDuty alert: {e}")
async def _send_email_alert(self, alert_data: Dict, channel_config: Dict):
"""Send alert via email"""
try:
recipients = channel_config.get("recipients", [])
if not recipients:
return
# This would integrate with your email service
logger.info(f"Email alert would be sent to: {', '.join(recipients)}")
except Exception as e:
logger.error(f"Failed to send email alert: {e}")
async def main():
"""Main entry point"""
analyzer = PredictiveFailureAnalyzer()
await analyzer.start_analysis_loop()
if __name__ == "__main__":
asyncio.run(main())