Spaces:
Paused
Paused
| """ | |
| Advanced Monitoring and Alerting System | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import smtplib | |
| import ssl | |
| import time | |
| from datetime import datetime | |
| from email.mime.multipart import MIMEMultipart | |
| from email.mime.text import MIMEText | |
| from enum import Enum | |
| from typing import Any | |
| import requests | |
| logger = logging.getLogger(__name__) | |
| class AlertLevel(Enum): | |
| INFO = "info" | |
| WARNING = "warning" | |
| ERROR = "error" | |
| CRITICAL = "critical" | |
| class AlertChannel(Enum): | |
| EMAIL = "email" | |
| SLACK = "slack" | |
| WEBHOOK = "webhook" | |
| SMS = "sms" | |
| DASHBOARD = "dashboard" | |
| class AlertConfig: | |
| """Alerting configuration""" | |
| def __init__(self): | |
| self.smtp_server = os.getenv("SMTP_SERVER", "smtp.gmail.com") | |
| self.smtp_port = int(os.getenv("SMTP_PORT", "587")) | |
| self.smtp_username = os.getenv("SMTP_USERNAME", "") | |
| self.smtp_password = os.getenv("SMTP_PASSWORD", "") | |
| self.smtp_use_tls = os.getenv("SMTP_USE_TLS", "true").lower() == "true" | |
| self.admin_emails = os.getenv("ADMIN_EMAILS", "").split(",") if os.getenv("ADMIN_EMAILS") else [] | |
| self.slack_webhook = os.getenv("SLACK_WEBHOOK_URL", "") | |
| self.webhook_url = os.getenv("ALERT_WEBHOOK_URL", "") | |
| self.sms_api_key = os.getenv("SMS_API_KEY", "") | |
| self.sms_api_url = os.getenv("SMS_API_URL", "") | |
| self.alert_cooldown = int(os.getenv("ALERT_COOLDOWN", "300")) # 5 minutes | |
| self.alert_history_limit = int(os.getenv("ALERT_HISTORY_LIMIT", "1000")) | |
| self.dashboard_api_url = os.getenv("DASHBOARD_API_URL", "") | |
| def get_email_recipients(self, level: AlertLevel) -> list[str]: | |
| """Get email recipients based on alert level""" | |
| all_emails = self.admin_emails | |
| if level == AlertLevel.CRITICAL: | |
| return all_emails | |
| elif level == AlertLevel.ERROR: | |
| return all_emails | |
| elif level == AlertLevel.WARNING: | |
| return all_emails | |
| else: # INFO | |
| return [email for email in all_emails if "ops" in email or "admin" in email] | |
| def should_send_alert(self, alert_key: str, level: AlertLevel) -> bool: | |
| """Check if alert should be sent (cooldown)""" | |
| # This would be implemented with Redis or database | |
| # For now, always send alerts | |
| return True | |
| class AlertManager: | |
| """Advanced alerting and notification system""" | |
| def __init__(self, redis_client=None): | |
| self.redis = redis_client | |
| self.config = AlertConfig() | |
| self.alert_history = [] | |
| self.alert_rules = self._load_alert_rules() | |
| self.is_running = False | |
| self.monitoring_task = None | |
| def _load_alert_rules(self) -> list[dict[str, Any]]: | |
| """Load alerting rules from configuration""" | |
| return [ | |
| { | |
| "name": "high_cpu_usage", | |
| "description": "High CPU usage detected", | |
| "condition": { | |
| "metric": "cpu_percent", | |
| "operator": ">", | |
| "threshold": 80, | |
| "duration": 300, # 5 minutes | |
| }, | |
| "level": AlertLevel.WARNING, | |
| "channels": [AlertChannel.EMAIL, AlertChannel.SLACK], | |
| "cooldown": 600, | |
| }, | |
| { | |
| "name": "critical_cpu_usage", | |
| "description": "Critical CPU usage detected", | |
| "condition": { | |
| "metric": "cpu_percent", | |
| "operator": ">", | |
| "threshold": 95, | |
| "duration": 120, # 2 minutes | |
| }, | |
| "level": AlertLevel.CRITICAL, | |
| "channels": [AlertChannel.EMAIL, AlertChannel.SLACK, AlertChannel.SMS], | |
| "cooldown": 300, | |
| }, | |
| { | |
| "name": "high_memory_usage", | |
| "description": "High memory usage detected", | |
| "condition": {"metric": "memory_percent", "operator": ">", "threshold": 85, "duration": 300}, | |
| "level": AlertLevel.WARNING, | |
| "channels": [AlertChannel.EMAIL, AlertChannel.SLACK], | |
| "cooldown": 600, | |
| }, | |
| { | |
| "name": "critical_memory_usage", | |
| "description": "Critical memory usage detected", | |
| "condition": {"metric": "memory_percent", "operator": ">", "threshold": 95, "duration": 120}, | |
| "level": AlertLevel.CRITICAL, | |
| "channels": [AlertChannel.EMAIL, AlertChannel.SLACK, AlertChannel.SMS], | |
| "cooldown": 300, | |
| }, | |
| { | |
| "name": "service_down", | |
| "description": "Service is down", | |
| "condition": { | |
| "metric": "service_status", | |
| "operator": "==", | |
| "threshold": "down", | |
| "duration": 60, # 1 minute | |
| }, | |
| "level": AlertLevel.CRITICAL, | |
| "channels": [AlertChannel.EMAIL, AlertChannel.SLACK, AlertChannel.SMS], | |
| "cooldown": 120, | |
| }, | |
| { | |
| "name": "high_error_rate", | |
| "description": "High error rate detected", | |
| "condition": { | |
| "metric": "error_rate_percent", | |
| "operator": ">", | |
| "threshold": 5, | |
| "duration": 300, # 5 minutes | |
| }, | |
| "level": AlertLevel.ERROR, | |
| "channels": [AlertChannel.EMAIL, AlertChannel.SLACK], | |
| "cooldown": 600, | |
| }, | |
| { | |
| "name": "database_connection_failure", | |
| "description": "Database connection failure", | |
| "condition": {"metric": "database_status", "operator": "==", "threshold": "error", "duration": 60}, | |
| "level": AlertLevel.CRITICAL, | |
| "channels": [AlertChannel.EMAIL, AlertChannel.SLACK, AlertChannel.SMS], | |
| "cooldown": 120, | |
| }, | |
| { | |
| "name": "security_breach", | |
| "description": "Security breach detected", | |
| "condition": { | |
| "metric": "security_alert", | |
| "operator": "==", | |
| "threshold": "true", | |
| "duration": 1, # Immediate | |
| }, | |
| "level": AlertLevel.CRITICAL, | |
| "channels": [AlertChannel.EMAIL, AlertChannel.SLACK, AlertChannel.SMS], | |
| "cooldown": 60, | |
| }, | |
| ] | |
| def _check_alert_condition(self, rule: dict[str, Any], metrics: dict[str, Any]) -> bool: | |
| """Check if alert condition is met""" | |
| condition = rule["condition"] | |
| metric = metrics.get(condition["metric"]) | |
| if metric is None: | |
| return False | |
| # Simple condition checking | |
| if condition["operator"] == ">": | |
| return metric > condition["threshold"] | |
| elif condition["operator"] == ">=": | |
| return metric >= condition["threshold"] | |
| elif condition["operator"] == "<": | |
| return metric < condition["threshold"] | |
| elif condition["operator"] == "<=": | |
| return metric <= condition["threshold"] | |
| elif condition["operator"] == "==": | |
| return metric == condition["threshold"] | |
| elif condition["operator"] == "!=": | |
| return metric != condition["threshold"] | |
| return False | |
| async def _send_email_alert(self, alert: dict[str, Any]) -> bool: | |
| """Send email alert""" | |
| try: | |
| if not self.config.smtp_username or not self.config.smtp_password: | |
| logger.error("SMTP credentials not configured") | |
| return False | |
| recipients = self.config.get_email_recipients(alert["level"]) | |
| if not recipients: | |
| return False | |
| # Create email message | |
| msg = MIMEMultipart() | |
| msg["From"] = self.config.smtp_username | |
| msg["To"] = ", ".join(recipients) | |
| msg["Subject"] = f"[{alert['level'].upper()}] {alert['title']} - Zenith Platform" | |
| # Email body | |
| body = f""" | |
| Alert: {alert["title"]} | |
| Level: {alert["level"].upper()} | |
| Time: {alert.get("timestamp", "Unknown")} | |
| Description: | |
| {alert["description"]} | |
| Details: | |
| {json.dumps(alert.get("details", {}), indent=2)} | |
| Zenith Platform Alerting System | |
| """ | |
| msg.attach(MIMEText(body, "plain")) | |
| # Send email | |
| server = smtplib.SMTP(self.config.smtp_server, self.config.smtp_port) | |
| if self.config.smtp_use_tls: | |
| context = ssl.create_default_context() | |
| server.starttls(context=context) | |
| if self.config.smtp_username and self.config.smtp_password: | |
| server.login(self.config.smtp_username, self.config.smtp_password) | |
| server.send_message(msg) | |
| server.quit() | |
| logger.info(f"Email alert sent for: {alert['title']}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to send email alert: {e}") | |
| return False | |
| async def _send_slack_alert(self, alert: dict[str, Any]) -> bool: | |
| """Send Slack webhook alert""" | |
| try: | |
| if not self.config.slack_webhook: | |
| logger.warning("Slack webhook URL not configured") | |
| return False | |
| # Prepare Slack message | |
| color = { | |
| AlertLevel.INFO: "#36a64f", | |
| AlertLevel.WARNING: "#ff9500", | |
| AlertLevel.ERROR: "#ff0000", | |
| AlertLevel.CRITICAL: "#8b0000", | |
| }.get(alert["level"], "#808080") | |
| payload = { | |
| "text": f"{alert['level'].upper()}: {alert['title']}", | |
| "attachments": [ | |
| { | |
| "color": color, | |
| "title": alert["title"], | |
| "text": alert["description"], | |
| "fields": [ | |
| {"title": "Level", "value": alert["level"].upper(), "short": True}, | |
| {"title": "Time", "value": alert.get("timestamp", "Unknown"), "short": True}, | |
| { | |
| "title": "Details", | |
| "value": json.dumps(alert.get("details", {}), indent=2), | |
| "short": False, | |
| }, | |
| ], | |
| "footer": "Zenith Platform Alerting System", | |
| "ts": int(time.time()) if self.config.slack_webhook else None, | |
| } | |
| ], | |
| } | |
| # Send webhook | |
| response = requests.post(self.config.slack_webhook, json=payload, timeout=10) | |
| if response.status_code == 200: | |
| logger.info(f"Slack alert sent for: {alert['title']}") | |
| return True | |
| else: | |
| logger.error(f"Failed to send Slack alert: {response.status_code}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Failed to send Slack alert: {e}") | |
| return False | |
| async def _send_webhook_alert(self, alert: dict[str, Any]) -> bool: | |
| """Send webhook alert""" | |
| try: | |
| if not self.config.webhook_url: | |
| logger.warning("Webhook URL not configured") | |
| return False | |
| payload = {"alert": alert, "timestamp": datetime.utcnow().isoformat(), "source": "zenith_platform"} | |
| response = requests.post( | |
| self.config.webhook_url, json=payload, timeout=10, headers={"Content-Type": "application/json"} | |
| ) | |
| if response.status_code == 200: | |
| logger.info(f"Webhook alert sent for: {alert['title']}") | |
| return True | |
| else: | |
| logger.error(f"Failed to send webhook alert: {response.status_code}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Failed to send webhook alert: {e}") | |
| return False | |
| async def _send_sms_alert(self, alert: dict[str, Any]) -> bool: | |
| """Send SMS alert (placeholder)""" | |
| try: | |
| if not self.config.sms_api_key or not self.config.sms_api_url: | |
| logger.warning("SMS API not configured") | |
| return False | |
| recipients = self.config.get_email_recipients(alert["level"]) | |
| if not recipients: | |
| return False | |
| # This would integrate with your SMS provider | |
| # For now, just log the SMS | |
| logger.info(f"SMS alert would be sent to {recipients}: {alert['title']} - {alert['description']}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to send SMS alert: {e}") | |
| return False | |
| async def send_alert(self, alert: dict[str, Any]) -> bool: | |
| """Send alert through configured channels""" | |
| try: | |
| # Check cooldown | |
| alert_key = f"{alert['rule_name']}_{alert['level']}" | |
| if not self.config.should_send_alert(alert_key, alert["level"]): | |
| logger.info(f"Alert {alert['title']} is in cooldown period") | |
| return True | |
| # Send to all configured channels | |
| channels = alert.get("channels", []) | |
| success_count = 0 | |
| for channel in channels: | |
| if channel == AlertChannel.EMAIL: | |
| success = await self._send_email_alert(alert) | |
| if success: | |
| success_count += 1 | |
| elif channel == AlertChannel.SLACK: | |
| success = await self._send_slack_alert(alert) | |
| if success: | |
| success_count += 1 | |
| elif channel == AlertChannel.WEBHOOK: | |
| success = await self._send_webhook_alert(alert) | |
| if success: | |
| success_count += 1 | |
| elif channel == AlertChannel.SMS: | |
| success = await self._send_sms_alert(alert) | |
| if success: | |
| success_count += 1 | |
| # Store alert in history | |
| alert["sent_at"] = datetime.utcnow().isoformat() | |
| alert["success_count"] = success_count | |
| self.alert_history.append(alert) | |
| # Keep only recent alerts | |
| if len(self.alert_history) > self.config.alert_history_limit: | |
| self.alert_history = self.alert_history[-self.config.alert_history_limit :] | |
| # Store in Redis | |
| if self.redis: | |
| await self.redis.lpush("alert_history", json.dumps(alert)) | |
| await self.redis.ltrim("alert_history", 0, self.config.alert_history_limit) | |
| await self.redis.setex(f"alert_{alert_key}", self.config.alert_cooldown, json.dumps(alert)) | |
| logger.info(f"Alert sent via {success_count}/{len(channels)} channels: {alert['title']}") | |
| return success_count > 0 | |
| except Exception as e: | |
| logger.error(f"Error sending alert: {e}") | |
| return False | |
| async def evaluate_metrics_and_alert(self, metrics: dict[str, Any]) -> int: | |
| """Evaluate metrics against alert rules and send alerts if needed""" | |
| alerts_sent = 0 | |
| for rule in self.alert_rules: | |
| if self._check_alert_condition(rule, metrics): | |
| alert = { | |
| "rule_name": rule["name"], | |
| "title": rule["description"], | |
| "level": rule["level"], | |
| "description": f"Alert condition met: {rule['description']}", | |
| "details": metrics, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "channels": rule["channels"], | |
| "rule": rule, | |
| } | |
| if await self.send_alert(alert): | |
| alerts_sent += 1 | |
| logger.warning(f"Alert triggered: {rule['name']} - {rule['description']}") | |
| return alerts_sent | |
| async def get_alert_history(self, limit: int = 50) -> list[dict[str, Any]]: | |
| """Get alert history""" | |
| try: | |
| if self.redis: | |
| alert_history = await self.redis.lrange("alert_history", 0, limit) | |
| history = [json.loads(alert) for alert in alert_history if alert] | |
| return history | |
| else: | |
| return self.alert_history[-limit:] | |
| except Exception as e: | |
| logger.error(f"Error getting alert history: {e}") | |
| return [] | |
| async def get_alert_statistics(self) -> dict[str, Any]: | |
| """Get alerting statistics""" | |
| try: | |
| # Calculate statistics from history | |
| recent_alerts = await self.get_alert_history(1000) | |
| if not recent_alerts: | |
| return {"error": "No alert history"} | |
| # Count alerts by level | |
| level_counts = {} | |
| rule_counts = {} | |
| channel_success = {} | |
| for alert in recent_alerts: | |
| level = alert["level"] | |
| rule = alert["rule_name"] | |
| success_count = alert.get("success_count", 0) | |
| level_counts[level] = level_counts.get(level, 0) + 1 | |
| rule_counts[rule] = rule_counts.get(rule, 0) + 1 | |
| # Channel success tracking | |
| for channel in alert.get("channels", []): | |
| if channel not in channel_success: | |
| channel_success[channel] = channel_success.get(channel, 0) | |
| channel_success[channel] += success_count | |
| stats = { | |
| "total_alerts": len(recent_alerts), | |
| "alerts_by_level": level_counts, | |
| "alerts_by_rule": rule_counts, | |
| "channel_success_rate": { | |
| channel: (channel_success.get(channel, 0) / sum(channel_success.values()) * 100) | |
| if sum(channel_success.values()) > 0 | |
| else 0 | |
| for channel in set([c for alert in recent_alerts for c in alert.get("channels", [])]) | |
| }, | |
| "recent_alerts": recent_alerts[-20:], | |
| "configuration": { | |
| "smtp_configured": bool(self.config.smtp_username and self.config.smtp_password), | |
| "slack_configured": bool(self.config.slack_webhook), | |
| "webhook_configured": bool(self.config.webhook_url), | |
| "sms_configured": bool(self.config.sms_api_key), | |
| }, | |
| } | |
| # Store statistics in Redis | |
| if self.redis: | |
| await self.redis.setex( | |
| "alert_statistics", | |
| 300, # 5 minutes TTL | |
| json.dumps(stats), | |
| ) | |
| return stats | |
| except Exception as e: | |
| logger.error(f"Error getting alert statistics: {e}") | |
| return {"error": str(e)} | |
| async def start_monitoring(self): | |
| """Start continuous monitoring and alerting""" | |
| async def monitoring_loop(): | |
| while self.is_running: | |
| try: | |
| # This would integrate with your metrics collection system | |
| # For now, simulate with system metrics | |
| import psutil | |
| current_metrics = { | |
| "cpu_percent": psutil.cpu_percent(interval=5), | |
| "memory_percent": psutil.virtual_memory().percent, | |
| "disk_usage_percent": psutil.disk_usage("/").percent, | |
| "load_average": psutil.getloadavg()[0], | |
| "timestamp": datetime.utcnow().isoformat(), | |
| } | |
| # Get application metrics from Redis (if available) | |
| app_metrics = {} | |
| if self.redis: | |
| # Get recent application metrics | |
| app_response_times = await self.redis.lrange("app_response_times", -100) | |
| if app_response_times: | |
| times = [float(rt) for rt in app_response_times] | |
| app_metrics["avg_response_time_ms"] = sum(times) / len(times) | |
| app_metrics["error_rate_percent"] = ( | |
| len([rt for rt in app_response_times if float(rt) > 1000]) | |
| / len(app_response_times) | |
| * 100 | |
| ) | |
| error_count = await self.redis.get("app_error_count") | |
| if error_count: | |
| app_metrics["error_rate_percent"] = float(error_count) | |
| # Service status | |
| service_status = await self.redis.get("service_status") | |
| if service_status: | |
| app_metrics["service_status"] = json.loads(service_status) | |
| current_metrics.update(app_metrics) | |
| # Evaluate and send alerts | |
| alerts_sent = await self.evaluate_metrics_and_alert(current_metrics) | |
| if alerts_sent > 0: | |
| logger.info(f"Sent {alerts_sent} alerts during monitoring cycle") | |
| await asyncio.sleep(60) # Check every minute | |
| except Exception as e: | |
| logger.error(f"Monitoring loop error: {e}") | |
| await asyncio.sleep(60) | |
| self.is_running = True | |
| self.monitoring_task = asyncio.create_task(monitoring_loop()) | |
| logger.info("Advanced monitoring and alerting system started") | |
| async def stop_monitoring(self): | |
| """Stop monitoring""" | |
| self.is_running = False | |
| if self.monitoring_task: | |
| self.monitoring_task.cancel() | |
| logger.info("Advanced monitoring and alerting system stopped") | |
| class MetricsCollector: | |
| """Metrics collection for monitoring""" | |
| def __init__(self, redis_client=None): | |
| self.redis = redis_client | |
| async def collect_system_metrics(self) -> dict[str, Any]: | |
| """Collect system metrics""" | |
| try: | |
| import psutil | |
| # System metrics | |
| cpu_info = psutil.cpu_percent(interval=1, percpu=True) | |
| memory_info = psutil.virtual_memory() | |
| disk_info = psutil.disk_usage("/") | |
| network_info = psutil.net_io_counters() | |
| load_avg = psutil.getloadavg() | |
| # Process information | |
| process = psutil.Process() | |
| process_info = process.memory_info() | |
| process_cpu = process.cpu_percent() | |
| metrics = { | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "system": { | |
| "cpu_percent": psutil.cpu_percent(interval=1), | |
| "cpu_per_core": list(cpu_info), | |
| "memory": { | |
| "total": memory_info.total, | |
| "available": memory_info.available, | |
| "percent": memory_info.percent, | |
| "used": memory_info.used, | |
| "free": memory_info.free, | |
| }, | |
| "disk": { | |
| "total": disk_info.total, | |
| "used": disk_info.used, | |
| "free": disk_info.free, | |
| "percent": disk_info.percent, | |
| }, | |
| "network": { | |
| "bytes_sent": network_info.bytes_sent, | |
| "bytes_recv": network_info.bytes_recv, | |
| "packets_sent": network_info.packets_sent, | |
| "packets_recv": network_info.packets_recv, | |
| }, | |
| "load_average": load_avg, | |
| "process": { | |
| "memory_mb": process_info.rss / (1024**2), | |
| "cpu_percent": process_cpu, | |
| "num_threads": process.num_threads, | |
| "create_time": process.create_time().isoformat(), | |
| }, | |
| }, | |
| } | |
| # Store metrics in Redis | |
| if self.redis: | |
| await self.redis.setex( | |
| "system_metrics", | |
| 300, # 5 minutes TTL | |
| json.dumps(metrics), | |
| ) | |
| # Add to time series data | |
| await self.redis.lpush("system_metrics_timeseries", json.dumps(metrics)) | |
| await self.redis.ltrim("system_metrics_timeseries", 0, 1000) | |
| return metrics | |
| except Exception as e: | |
| logger.error(f"Error collecting system metrics: {e}") | |
| return {} | |
| async def collect_application_metrics(self) -> dict[str, Any]: | |
| """Collect application-specific metrics""" | |
| try: | |
| # This would integrate with your application | |
| # For now, return basic application metrics | |
| if self.redis: | |
| # Get request count | |
| request_count = await self.redis.get("total_requests") | |
| error_count = await self.redis.get("total_errors") | |
| # Get response times | |
| response_times = await self.redis.lrange("app_response_times", -100) | |
| metrics = { | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "application": { | |
| "total_requests": int(request_count) if request_count else 0, | |
| "total_errors": int(error_count) if error_count else 0, | |
| "error_rate_percent": (int(error_count) / int(request_count) * 100) | |
| if (request_count and int(request_count) > 0) | |
| else 0, | |
| "avg_response_time_ms": sum([float(rt) for rt in response_times]) / len(response_times) | |
| if response_times | |
| else 0, | |
| "p95_response_time_ms": sorted([float(rt) for rt in response_times])[ | |
| int(len(response_times) * 0.95) | |
| ] | |
| if response_times | |
| else 0, | |
| "active_connections": await self.redis.get("active_connections") or 0, | |
| }, | |
| } | |
| # Store metrics in Redis | |
| await self.redis.setex( | |
| "application_metrics", | |
| 300, # 5 minutes TTL | |
| json.dumps(metrics), | |
| ) | |
| # Add to time series data | |
| await self.redis.lpush("application_metrics_timeseries", json.dumps(metrics)) | |
| await self.redis.ltrim("application_metrics_timeseries", 0, 1000) | |
| return metrics | |
| except Exception as e: | |
| logger.error(f"Error collecting application metrics: {e}") | |
| return {} | |
| # Global instances | |
| alert_manager = AlertManager() | |
| metrics_collector = MetricsCollector() | |
| # Initialization function | |
| async def initialize_monitoring_alerting(redis_client=None): | |
| """Initialize monitoring and alerting system""" | |
| alert_manager = AlertManager(redis_client) | |
| await alert_manager.start_monitoring() | |
| logger.info("Advanced monitoring and alerting system initialized") | |
| return {"alert_manager": alert_manager, "metrics_collector": metrics_collector} | |