""" Monitoring & Alerting System - System health monitoring - Performance metrics - Uptime tracking """ import sqlite3 import json import os from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Any, Optional import psutil import threading import time MONITOR_DB = Path(os.environ.get('OUTPUT_DIR', './output')) / 'monitoring.db' def init_monitoring_db(): """Initialize monitoring database""" conn = sqlite3.connect(str(MONITOR_DB)) conn.execute("""CREATE TABLE IF NOT EXISTS health_checks ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, service TEXT, status TEXT, response_time_ms FLOAT, details TEXT )""") conn.execute("""CREATE TABLE IF NOT EXISTS system_metrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, cpu_percent FLOAT, memory_percent FLOAT, disk_percent FLOAT, active_connections INTEGER, request_count INTEGER )""") conn.execute("""CREATE TABLE IF NOT EXISTS alerts ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, alert_type TEXT, severity TEXT, message TEXT, resolved BOOLEAN DEFAULT 0, resolved_at TIMESTAMP )""") conn.execute("""CREATE TABLE IF NOT EXISTS uptime_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, service TEXT, status TEXT, downtime_seconds INTEGER )""") conn.commit() conn.close() init_monitoring_db() class HealthChecker: """Health check for services""" @staticmethod def check_api() -> Dict[str, Any]: """Check API health""" try: import requests start = time.time() resp = requests.get('http://localhost:7860/health', timeout=5) response_time = (time.time() - start) * 1000 status = 'healthy' if resp.status_code == 200 else 'degraded' return { 'service': 'api', 'status': status, 'response_time_ms': response_time, 'status_code': resp.status_code } except Exception as e: return { 'service': 'api', 'status': 'unhealthy', 'error': str(e) } @staticmethod def check_database() -> Dict[str, Any]: """Check database health""" try: start = time.time() conn = sqlite3.connect(str(MONITOR_DB)) conn.execute('SELECT 1') conn.close() response_time = (time.time() - start) * 1000 return { 'service': 'database', 'status': 'healthy', 'response_time_ms': response_time } except Exception as e: return { 'service': 'database', 'status': 'unhealthy', 'error': str(e) } @staticmethod def check_redis() -> Dict[str, Any]: """Check Redis health""" try: from server.cache_manager import redis_client, REDIS_AVAILABLE if not REDIS_AVAILABLE: return { 'service': 'redis', 'status': 'unavailable', 'message': 'Redis not configured' } start = time.time() redis_client.ping() response_time = (time.time() - start) * 1000 return { 'service': 'redis', 'status': 'healthy', 'response_time_ms': response_time } except Exception as e: return { 'service': 'redis', 'status': 'unhealthy', 'error': str(e) } @staticmethod def check_celery() -> Dict[str, Any]: """Check Celery health""" try: from server.task_queue import get_worker_stats stats = get_worker_stats() if stats: return { 'service': 'celery', 'status': 'healthy', 'workers': len(stats) } else: return { 'service': 'celery', 'status': 'degraded', 'message': 'No workers available' } except Exception as e: return { 'service': 'celery', 'status': 'unhealthy', 'error': str(e) } @staticmethod def check_all() -> Dict[str, Any]: """Check all services""" checks = { 'api': HealthChecker.check_api(), 'database': HealthChecker.check_database(), 'redis': HealthChecker.check_redis(), 'celery': HealthChecker.check_celery(), } # Log checks conn = sqlite3.connect(str(MONITOR_DB)) for service, result in checks.items(): conn.execute("""INSERT INTO health_checks (service, status, response_time_ms, details) VALUES (?, ?, ?, ?)""", (service, result.get('status'), result.get('response_time_ms'), json.dumps(result))) conn.commit() conn.close() return checks class SystemMetrics: """System metrics collection""" @staticmethod def collect() -> Dict[str, Any]: """Collect system metrics""" try: cpu_percent = psutil.cpu_percent(interval=1) memory = psutil.virtual_memory() disk = psutil.disk_usage('/') metrics = { 'cpu_percent': cpu_percent, 'memory_percent': memory.percent, 'memory_available_mb': memory.available / (1024 * 1024), 'disk_percent': disk.percent, 'disk_free_gb': disk.free / (1024 * 1024 * 1024), 'timestamp': datetime.utcnow().isoformat() } # Log metrics conn = sqlite3.connect(str(MONITOR_DB)) conn.execute("""INSERT INTO system_metrics (cpu_percent, memory_percent, disk_percent) VALUES (?, ?, ?)""", (cpu_percent, memory.percent, disk.percent)) conn.commit() conn.close() return metrics except Exception as e: return {'error': str(e)} @staticmethod def get_average(hours: int = 1) -> Dict[str, float]: """Get average metrics over time period""" conn = sqlite3.connect(str(MONITOR_DB)) row = conn.execute("""SELECT AVG(cpu_percent) as avg_cpu, AVG(memory_percent) as avg_memory, AVG(disk_percent) as avg_disk FROM system_metrics WHERE timestamp > datetime('now', '-' || ? || ' hours')""", (hours,)).fetchone() conn.close() if row: return { 'avg_cpu_percent': row[0] or 0, 'avg_memory_percent': row[1] or 0, 'avg_disk_percent': row[2] or 0, 'period_hours': hours } return {} class AlertManager: """Alert management""" THRESHOLDS = { 'cpu_percent': 80, 'memory_percent': 85, 'disk_percent': 90, 'response_time_ms': 5000, } @staticmethod def check_thresholds(): """Check if metrics exceed thresholds""" metrics = SystemMetrics.collect() alerts = [] if metrics.get('cpu_percent', 0) > AlertManager.THRESHOLDS['cpu_percent']: alerts.append({ 'type': 'HIGH_CPU', 'severity': 'warning', 'message': f"CPU usage at {metrics['cpu_percent']:.1f}%" }) if metrics.get('memory_percent', 0) > AlertManager.THRESHOLDS['memory_percent']: alerts.append({ 'type': 'HIGH_MEMORY', 'severity': 'warning', 'message': f"Memory usage at {metrics['memory_percent']:.1f}%" }) if metrics.get('disk_percent', 0) > AlertManager.THRESHOLDS['disk_percent']: alerts.append({ 'type': 'HIGH_DISK', 'severity': 'critical', 'message': f"Disk usage at {metrics['disk_percent']:.1f}%" }) # Log alerts conn = sqlite3.connect(str(MONITOR_DB)) for alert in alerts: conn.execute("""INSERT INTO alerts (alert_type, severity, message) VALUES (?, ?, ?)""", (alert['type'], alert['severity'], alert['message'])) conn.commit() conn.close() return alerts @staticmethod def get_active_alerts() -> List[Dict[str, Any]]: """Get active alerts""" conn = sqlite3.connect(str(MONITOR_DB)) conn.row_factory = sqlite3.Row rows = conn.execute("""SELECT * FROM alerts WHERE resolved = 0 ORDER BY timestamp DESC LIMIT 50""").fetchall() conn.close() return [dict(r) for r in rows] @staticmethod def resolve_alert(alert_id: int): """Resolve alert""" conn = sqlite3.connect(str(MONITOR_DB)) conn.execute("""UPDATE alerts SET resolved = 1, resolved_at = CURRENT_TIMESTAMP WHERE id = ?""", (alert_id,)) conn.commit() conn.close() class UptimeTracker: """Track service uptime""" @staticmethod def log_status(service: str, status: str): """Log service status""" conn = sqlite3.connect(str(MONITOR_DB)) conn.execute("""INSERT INTO uptime_log (service, status) VALUES (?, ?)""", (service, status)) conn.commit() conn.close() @staticmethod def get_uptime(service: str, hours: int = 24) -> Dict[str, Any]: """Get uptime percentage""" conn = sqlite3.connect(str(MONITOR_DB)) total = conn.execute("""SELECT COUNT(*) FROM uptime_log WHERE service = ? AND timestamp > datetime('now', '-' || ? || ' hours')""", (service, hours)).fetchone()[0] healthy = conn.execute("""SELECT COUNT(*) FROM uptime_log WHERE service = ? AND status = 'healthy' AND timestamp > datetime('now', '-' || ? || ' hours')""", (service, hours)).fetchone()[0] conn.close() if total == 0: return {'uptime_percent': 100, 'total_checks': 0} uptime_percent = (healthy / total) * 100 return { 'service': service, 'uptime_percent': uptime_percent, 'healthy_checks': healthy, 'total_checks': total, 'period_hours': hours } @staticmethod def get_all_uptime(hours: int = 24) -> List[Dict[str, Any]]: """Get uptime for all services""" conn = sqlite3.connect(str(MONITOR_DB)) services = conn.execute("""SELECT DISTINCT service FROM uptime_log""").fetchall() conn.close() uptime_data = [] for (service,) in services: uptime_data.append(UptimeTracker.get_uptime(service, hours)) return uptime_data class MonitoringDaemon: """Background monitoring daemon""" def __init__(self, interval: int = 60): self.interval = interval self.running = False self.thread = None def start(self): """Start monitoring daemon""" if self.running: return self.running = True self.thread = threading.Thread(target=self._run, daemon=True) self.thread.start() print("✓ Monitoring daemon started") def stop(self): """Stop monitoring daemon""" self.running = False if self.thread: self.thread.join(timeout=5) print("✓ Monitoring daemon stopped") def _run(self): """Main monitoring loop""" while self.running: try: # Collect metrics SystemMetrics.collect() # Check thresholds AlertManager.check_thresholds() # Check health checks = HealthChecker.check_all() for service, result in checks.items(): status = result.get('status', 'unknown') UptimeTracker.log_status(service, status) time.sleep(self.interval) except Exception as e: print(f"Monitoring error: {e}") time.sleep(self.interval) # Global daemon instance _daemon = MonitoringDaemon() def start_monitoring(): """Start monitoring""" _daemon.start() def stop_monitoring(): """Stop monitoring""" _daemon.stop() def get_system_status() -> Dict[str, Any]: """Get complete system status""" return { 'timestamp': datetime.utcnow().isoformat(), 'health': HealthChecker.check_all(), 'metrics': SystemMetrics.collect(), 'alerts': AlertManager.get_active_alerts(), 'uptime': UptimeTracker.get_all_uptime(24) }