zenith-backend / app /infrastructure /monitoring_alerting.py
teoat's picture
fix(backend): fix port and health check robustness
d29a5a0 verified
"""
Advanced Monitoring and Alerting System
"""
import asyncio
import json
import logging
import os
import smtplib
import ssl
import time
from datetime import datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from enum import Enum
from typing import Any
import requests
logger = logging.getLogger(__name__)
class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
class AlertChannel(Enum):
EMAIL = "email"
SLACK = "slack"
WEBHOOK = "webhook"
SMS = "sms"
DASHBOARD = "dashboard"
class AlertConfig:
"""Alerting configuration"""
def __init__(self):
self.smtp_server = os.getenv("SMTP_SERVER", "smtp.gmail.com")
self.smtp_port = int(os.getenv("SMTP_PORT", "587"))
self.smtp_username = os.getenv("SMTP_USERNAME", "")
self.smtp_password = os.getenv("SMTP_PASSWORD", "")
self.smtp_use_tls = os.getenv("SMTP_USE_TLS", "true").lower() == "true"
self.admin_emails = os.getenv("ADMIN_EMAILS", "").split(",") if os.getenv("ADMIN_EMAILS") else []
self.slack_webhook = os.getenv("SLACK_WEBHOOK_URL", "")
self.webhook_url = os.getenv("ALERT_WEBHOOK_URL", "")
self.sms_api_key = os.getenv("SMS_API_KEY", "")
self.sms_api_url = os.getenv("SMS_API_URL", "")
self.alert_cooldown = int(os.getenv("ALERT_COOLDOWN", "300")) # 5 minutes
self.alert_history_limit = int(os.getenv("ALERT_HISTORY_LIMIT", "1000"))
self.dashboard_api_url = os.getenv("DASHBOARD_API_URL", "")
def get_email_recipients(self, level: AlertLevel) -> list[str]:
"""Get email recipients based on alert level"""
all_emails = self.admin_emails
if level == AlertLevel.CRITICAL:
return all_emails
elif level == AlertLevel.ERROR:
return all_emails
elif level == AlertLevel.WARNING:
return all_emails
else: # INFO
return [email for email in all_emails if "ops" in email or "admin" in email]
def should_send_alert(self, alert_key: str, level: AlertLevel) -> bool:
"""Check if alert should be sent (cooldown)"""
# This would be implemented with Redis or database
# For now, always send alerts
return True
class AlertManager:
"""Advanced alerting and notification system"""
def __init__(self, redis_client=None):
self.redis = redis_client
self.config = AlertConfig()
self.alert_history = []
self.alert_rules = self._load_alert_rules()
self.is_running = False
self.monitoring_task = None
def _load_alert_rules(self) -> list[dict[str, Any]]:
"""Load alerting rules from configuration"""
return [
{
"name": "high_cpu_usage",
"description": "High CPU usage detected",
"condition": {
"metric": "cpu_percent",
"operator": ">",
"threshold": 80,
"duration": 300, # 5 minutes
},
"level": AlertLevel.WARNING,
"channels": [AlertChannel.EMAIL, AlertChannel.SLACK],
"cooldown": 600,
},
{
"name": "critical_cpu_usage",
"description": "Critical CPU usage detected",
"condition": {
"metric": "cpu_percent",
"operator": ">",
"threshold": 95,
"duration": 120, # 2 minutes
},
"level": AlertLevel.CRITICAL,
"channels": [AlertChannel.EMAIL, AlertChannel.SLACK, AlertChannel.SMS],
"cooldown": 300,
},
{
"name": "high_memory_usage",
"description": "High memory usage detected",
"condition": {"metric": "memory_percent", "operator": ">", "threshold": 85, "duration": 300},
"level": AlertLevel.WARNING,
"channels": [AlertChannel.EMAIL, AlertChannel.SLACK],
"cooldown": 600,
},
{
"name": "critical_memory_usage",
"description": "Critical memory usage detected",
"condition": {"metric": "memory_percent", "operator": ">", "threshold": 95, "duration": 120},
"level": AlertLevel.CRITICAL,
"channels": [AlertChannel.EMAIL, AlertChannel.SLACK, AlertChannel.SMS],
"cooldown": 300,
},
{
"name": "service_down",
"description": "Service is down",
"condition": {
"metric": "service_status",
"operator": "==",
"threshold": "down",
"duration": 60, # 1 minute
},
"level": AlertLevel.CRITICAL,
"channels": [AlertChannel.EMAIL, AlertChannel.SLACK, AlertChannel.SMS],
"cooldown": 120,
},
{
"name": "high_error_rate",
"description": "High error rate detected",
"condition": {
"metric": "error_rate_percent",
"operator": ">",
"threshold": 5,
"duration": 300, # 5 minutes
},
"level": AlertLevel.ERROR,
"channels": [AlertChannel.EMAIL, AlertChannel.SLACK],
"cooldown": 600,
},
{
"name": "database_connection_failure",
"description": "Database connection failure",
"condition": {"metric": "database_status", "operator": "==", "threshold": "error", "duration": 60},
"level": AlertLevel.CRITICAL,
"channels": [AlertChannel.EMAIL, AlertChannel.SLACK, AlertChannel.SMS],
"cooldown": 120,
},
{
"name": "security_breach",
"description": "Security breach detected",
"condition": {
"metric": "security_alert",
"operator": "==",
"threshold": "true",
"duration": 1, # Immediate
},
"level": AlertLevel.CRITICAL,
"channels": [AlertChannel.EMAIL, AlertChannel.SLACK, AlertChannel.SMS],
"cooldown": 60,
},
]
def _check_alert_condition(self, rule: dict[str, Any], metrics: dict[str, Any]) -> bool:
"""Check if alert condition is met"""
condition = rule["condition"]
metric = metrics.get(condition["metric"])
if metric is None:
return False
# Simple condition checking
if condition["operator"] == ">":
return metric > condition["threshold"]
elif condition["operator"] == ">=":
return metric >= condition["threshold"]
elif condition["operator"] == "<":
return metric < condition["threshold"]
elif condition["operator"] == "<=":
return metric <= condition["threshold"]
elif condition["operator"] == "==":
return metric == condition["threshold"]
elif condition["operator"] == "!=":
return metric != condition["threshold"]
return False
async def _send_email_alert(self, alert: dict[str, Any]) -> bool:
"""Send email alert"""
try:
if not self.config.smtp_username or not self.config.smtp_password:
logger.error("SMTP credentials not configured")
return False
recipients = self.config.get_email_recipients(alert["level"])
if not recipients:
return False
# Create email message
msg = MIMEMultipart()
msg["From"] = self.config.smtp_username
msg["To"] = ", ".join(recipients)
msg["Subject"] = f"[{alert['level'].upper()}] {alert['title']} - Zenith Platform"
# Email body
body = f"""
Alert: {alert["title"]}
Level: {alert["level"].upper()}
Time: {alert.get("timestamp", "Unknown")}
Description:
{alert["description"]}
Details:
{json.dumps(alert.get("details", {}), indent=2)}
Zenith Platform Alerting System
"""
msg.attach(MIMEText(body, "plain"))
# Send email
server = smtplib.SMTP(self.config.smtp_server, self.config.smtp_port)
if self.config.smtp_use_tls:
context = ssl.create_default_context()
server.starttls(context=context)
if self.config.smtp_username and self.config.smtp_password:
server.login(self.config.smtp_username, self.config.smtp_password)
server.send_message(msg)
server.quit()
logger.info(f"Email alert sent for: {alert['title']}")
return True
except Exception as e:
logger.error(f"Failed to send email alert: {e}")
return False
async def _send_slack_alert(self, alert: dict[str, Any]) -> bool:
"""Send Slack webhook alert"""
try:
if not self.config.slack_webhook:
logger.warning("Slack webhook URL not configured")
return False
# Prepare Slack message
color = {
AlertLevel.INFO: "#36a64f",
AlertLevel.WARNING: "#ff9500",
AlertLevel.ERROR: "#ff0000",
AlertLevel.CRITICAL: "#8b0000",
}.get(alert["level"], "#808080")
payload = {
"text": f"{alert['level'].upper()}: {alert['title']}",
"attachments": [
{
"color": color,
"title": alert["title"],
"text": alert["description"],
"fields": [
{"title": "Level", "value": alert["level"].upper(), "short": True},
{"title": "Time", "value": alert.get("timestamp", "Unknown"), "short": True},
{
"title": "Details",
"value": json.dumps(alert.get("details", {}), indent=2),
"short": False,
},
],
"footer": "Zenith Platform Alerting System",
"ts": int(time.time()) if self.config.slack_webhook else None,
}
],
}
# Send webhook
response = requests.post(self.config.slack_webhook, json=payload, timeout=10)
if response.status_code == 200:
logger.info(f"Slack alert sent for: {alert['title']}")
return True
else:
logger.error(f"Failed to send Slack alert: {response.status_code}")
return False
except Exception as e:
logger.error(f"Failed to send Slack alert: {e}")
return False
async def _send_webhook_alert(self, alert: dict[str, Any]) -> bool:
"""Send webhook alert"""
try:
if not self.config.webhook_url:
logger.warning("Webhook URL not configured")
return False
payload = {"alert": alert, "timestamp": datetime.utcnow().isoformat(), "source": "zenith_platform"}
response = requests.post(
self.config.webhook_url, json=payload, timeout=10, headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
logger.info(f"Webhook alert sent for: {alert['title']}")
return True
else:
logger.error(f"Failed to send webhook alert: {response.status_code}")
return False
except Exception as e:
logger.error(f"Failed to send webhook alert: {e}")
return False
async def _send_sms_alert(self, alert: dict[str, Any]) -> bool:
"""Send SMS alert (placeholder)"""
try:
if not self.config.sms_api_key or not self.config.sms_api_url:
logger.warning("SMS API not configured")
return False
recipients = self.config.get_email_recipients(alert["level"])
if not recipients:
return False
# This would integrate with your SMS provider
# For now, just log the SMS
logger.info(f"SMS alert would be sent to {recipients}: {alert['title']} - {alert['description']}")
return True
except Exception as e:
logger.error(f"Failed to send SMS alert: {e}")
return False
async def send_alert(self, alert: dict[str, Any]) -> bool:
"""Send alert through configured channels"""
try:
# Check cooldown
alert_key = f"{alert['rule_name']}_{alert['level']}"
if not self.config.should_send_alert(alert_key, alert["level"]):
logger.info(f"Alert {alert['title']} is in cooldown period")
return True
# Send to all configured channels
channels = alert.get("channels", [])
success_count = 0
for channel in channels:
if channel == AlertChannel.EMAIL:
success = await self._send_email_alert(alert)
if success:
success_count += 1
elif channel == AlertChannel.SLACK:
success = await self._send_slack_alert(alert)
if success:
success_count += 1
elif channel == AlertChannel.WEBHOOK:
success = await self._send_webhook_alert(alert)
if success:
success_count += 1
elif channel == AlertChannel.SMS:
success = await self._send_sms_alert(alert)
if success:
success_count += 1
# Store alert in history
alert["sent_at"] = datetime.utcnow().isoformat()
alert["success_count"] = success_count
self.alert_history.append(alert)
# Keep only recent alerts
if len(self.alert_history) > self.config.alert_history_limit:
self.alert_history = self.alert_history[-self.config.alert_history_limit :]
# Store in Redis
if self.redis:
await self.redis.lpush("alert_history", json.dumps(alert))
await self.redis.ltrim("alert_history", 0, self.config.alert_history_limit)
await self.redis.setex(f"alert_{alert_key}", self.config.alert_cooldown, json.dumps(alert))
logger.info(f"Alert sent via {success_count}/{len(channels)} channels: {alert['title']}")
return success_count > 0
except Exception as e:
logger.error(f"Error sending alert: {e}")
return False
async def evaluate_metrics_and_alert(self, metrics: dict[str, Any]) -> int:
"""Evaluate metrics against alert rules and send alerts if needed"""
alerts_sent = 0
for rule in self.alert_rules:
if self._check_alert_condition(rule, metrics):
alert = {
"rule_name": rule["name"],
"title": rule["description"],
"level": rule["level"],
"description": f"Alert condition met: {rule['description']}",
"details": metrics,
"timestamp": datetime.utcnow().isoformat(),
"channels": rule["channels"],
"rule": rule,
}
if await self.send_alert(alert):
alerts_sent += 1
logger.warning(f"Alert triggered: {rule['name']} - {rule['description']}")
return alerts_sent
async def get_alert_history(self, limit: int = 50) -> list[dict[str, Any]]:
"""Get alert history"""
try:
if self.redis:
alert_history = await self.redis.lrange("alert_history", 0, limit)
history = [json.loads(alert) for alert in alert_history if alert]
return history
else:
return self.alert_history[-limit:]
except Exception as e:
logger.error(f"Error getting alert history: {e}")
return []
async def get_alert_statistics(self) -> dict[str, Any]:
"""Get alerting statistics"""
try:
# Calculate statistics from history
recent_alerts = await self.get_alert_history(1000)
if not recent_alerts:
return {"error": "No alert history"}
# Count alerts by level
level_counts = {}
rule_counts = {}
channel_success = {}
for alert in recent_alerts:
level = alert["level"]
rule = alert["rule_name"]
success_count = alert.get("success_count", 0)
level_counts[level] = level_counts.get(level, 0) + 1
rule_counts[rule] = rule_counts.get(rule, 0) + 1
# Channel success tracking
for channel in alert.get("channels", []):
if channel not in channel_success:
channel_success[channel] = channel_success.get(channel, 0)
channel_success[channel] += success_count
stats = {
"total_alerts": len(recent_alerts),
"alerts_by_level": level_counts,
"alerts_by_rule": rule_counts,
"channel_success_rate": {
channel: (channel_success.get(channel, 0) / sum(channel_success.values()) * 100)
if sum(channel_success.values()) > 0
else 0
for channel in set([c for alert in recent_alerts for c in alert.get("channels", [])])
},
"recent_alerts": recent_alerts[-20:],
"configuration": {
"smtp_configured": bool(self.config.smtp_username and self.config.smtp_password),
"slack_configured": bool(self.config.slack_webhook),
"webhook_configured": bool(self.config.webhook_url),
"sms_configured": bool(self.config.sms_api_key),
},
}
# Store statistics in Redis
if self.redis:
await self.redis.setex(
"alert_statistics",
300, # 5 minutes TTL
json.dumps(stats),
)
return stats
except Exception as e:
logger.error(f"Error getting alert statistics: {e}")
return {"error": str(e)}
async def start_monitoring(self):
"""Start continuous monitoring and alerting"""
async def monitoring_loop():
while self.is_running:
try:
# This would integrate with your metrics collection system
# For now, simulate with system metrics
import psutil
current_metrics = {
"cpu_percent": psutil.cpu_percent(interval=5),
"memory_percent": psutil.virtual_memory().percent,
"disk_usage_percent": psutil.disk_usage("/").percent,
"load_average": psutil.getloadavg()[0],
"timestamp": datetime.utcnow().isoformat(),
}
# Get application metrics from Redis (if available)
app_metrics = {}
if self.redis:
# Get recent application metrics
app_response_times = await self.redis.lrange("app_response_times", -100)
if app_response_times:
times = [float(rt) for rt in app_response_times]
app_metrics["avg_response_time_ms"] = sum(times) / len(times)
app_metrics["error_rate_percent"] = (
len([rt for rt in app_response_times if float(rt) > 1000])
/ len(app_response_times)
* 100
)
error_count = await self.redis.get("app_error_count")
if error_count:
app_metrics["error_rate_percent"] = float(error_count)
# Service status
service_status = await self.redis.get("service_status")
if service_status:
app_metrics["service_status"] = json.loads(service_status)
current_metrics.update(app_metrics)
# Evaluate and send alerts
alerts_sent = await self.evaluate_metrics_and_alert(current_metrics)
if alerts_sent > 0:
logger.info(f"Sent {alerts_sent} alerts during monitoring cycle")
await asyncio.sleep(60) # Check every minute
except Exception as e:
logger.error(f"Monitoring loop error: {e}")
await asyncio.sleep(60)
self.is_running = True
self.monitoring_task = asyncio.create_task(monitoring_loop())
logger.info("Advanced monitoring and alerting system started")
async def stop_monitoring(self):
"""Stop monitoring"""
self.is_running = False
if self.monitoring_task:
self.monitoring_task.cancel()
logger.info("Advanced monitoring and alerting system stopped")
class MetricsCollector:
"""Metrics collection for monitoring"""
def __init__(self, redis_client=None):
self.redis = redis_client
async def collect_system_metrics(self) -> dict[str, Any]:
"""Collect system metrics"""
try:
import psutil
# System metrics
cpu_info = psutil.cpu_percent(interval=1, percpu=True)
memory_info = psutil.virtual_memory()
disk_info = psutil.disk_usage("/")
network_info = psutil.net_io_counters()
load_avg = psutil.getloadavg()
# Process information
process = psutil.Process()
process_info = process.memory_info()
process_cpu = process.cpu_percent()
metrics = {
"timestamp": datetime.utcnow().isoformat(),
"system": {
"cpu_percent": psutil.cpu_percent(interval=1),
"cpu_per_core": list(cpu_info),
"memory": {
"total": memory_info.total,
"available": memory_info.available,
"percent": memory_info.percent,
"used": memory_info.used,
"free": memory_info.free,
},
"disk": {
"total": disk_info.total,
"used": disk_info.used,
"free": disk_info.free,
"percent": disk_info.percent,
},
"network": {
"bytes_sent": network_info.bytes_sent,
"bytes_recv": network_info.bytes_recv,
"packets_sent": network_info.packets_sent,
"packets_recv": network_info.packets_recv,
},
"load_average": load_avg,
"process": {
"memory_mb": process_info.rss / (1024**2),
"cpu_percent": process_cpu,
"num_threads": process.num_threads,
"create_time": process.create_time().isoformat(),
},
},
}
# Store metrics in Redis
if self.redis:
await self.redis.setex(
"system_metrics",
300, # 5 minutes TTL
json.dumps(metrics),
)
# Add to time series data
await self.redis.lpush("system_metrics_timeseries", json.dumps(metrics))
await self.redis.ltrim("system_metrics_timeseries", 0, 1000)
return metrics
except Exception as e:
logger.error(f"Error collecting system metrics: {e}")
return {}
async def collect_application_metrics(self) -> dict[str, Any]:
"""Collect application-specific metrics"""
try:
# This would integrate with your application
# For now, return basic application metrics
if self.redis:
# Get request count
request_count = await self.redis.get("total_requests")
error_count = await self.redis.get("total_errors")
# Get response times
response_times = await self.redis.lrange("app_response_times", -100)
metrics = {
"timestamp": datetime.utcnow().isoformat(),
"application": {
"total_requests": int(request_count) if request_count else 0,
"total_errors": int(error_count) if error_count else 0,
"error_rate_percent": (int(error_count) / int(request_count) * 100)
if (request_count and int(request_count) > 0)
else 0,
"avg_response_time_ms": sum([float(rt) for rt in response_times]) / len(response_times)
if response_times
else 0,
"p95_response_time_ms": sorted([float(rt) for rt in response_times])[
int(len(response_times) * 0.95)
]
if response_times
else 0,
"active_connections": await self.redis.get("active_connections") or 0,
},
}
# Store metrics in Redis
await self.redis.setex(
"application_metrics",
300, # 5 minutes TTL
json.dumps(metrics),
)
# Add to time series data
await self.redis.lpush("application_metrics_timeseries", json.dumps(metrics))
await self.redis.ltrim("application_metrics_timeseries", 0, 1000)
return metrics
except Exception as e:
logger.error(f"Error collecting application metrics: {e}")
return {}
# Global instances
alert_manager = AlertManager()
metrics_collector = MetricsCollector()
# Initialization function
async def initialize_monitoring_alerting(redis_client=None):
"""Initialize monitoring and alerting system"""
alert_manager = AlertManager(redis_client)
await alert_manager.start_monitoring()
logger.info("Advanced monitoring and alerting system initialized")
return {"alert_manager": alert_manager, "metrics_collector": metrics_collector}