| """ |
| Monitoring & Alerting System |
| - System health monitoring |
| - Performance metrics |
| - Uptime tracking |
| """ |
|
|
| import sqlite3 |
| import json |
| import os |
| from datetime import datetime, timedelta |
| from pathlib import Path |
| from typing import Dict, List, Any, Optional |
| import psutil |
| import threading |
| import time |
|
|
| MONITOR_DB = Path(os.environ.get('OUTPUT_DIR', './output')) / 'monitoring.db' |
|
|
| def init_monitoring_db(): |
| """Initialize monitoring database""" |
| conn = sqlite3.connect(str(MONITOR_DB)) |
| |
| conn.execute("""CREATE TABLE IF NOT EXISTS health_checks ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| service TEXT, |
| status TEXT, |
| response_time_ms FLOAT, |
| details TEXT |
| )""") |
| |
| conn.execute("""CREATE TABLE IF NOT EXISTS system_metrics ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| cpu_percent FLOAT, |
| memory_percent FLOAT, |
| disk_percent FLOAT, |
| active_connections INTEGER, |
| request_count INTEGER |
| )""") |
| |
| conn.execute("""CREATE TABLE IF NOT EXISTS alerts ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| alert_type TEXT, |
| severity TEXT, |
| message TEXT, |
| resolved BOOLEAN DEFAULT 0, |
| resolved_at TIMESTAMP |
| )""") |
| |
| conn.execute("""CREATE TABLE IF NOT EXISTS uptime_log ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| service TEXT, |
| status TEXT, |
| downtime_seconds INTEGER |
| )""") |
| |
| conn.commit() |
| conn.close() |
|
|
| init_monitoring_db() |
|
|
| class HealthChecker: |
| """Health check for services""" |
| |
| @staticmethod |
| def check_api() -> Dict[str, Any]: |
| """Check API health""" |
| try: |
| import requests |
| start = time.time() |
| resp = requests.get('http://localhost:7860/health', timeout=5) |
| response_time = (time.time() - start) * 1000 |
| |
| status = 'healthy' if resp.status_code == 200 else 'degraded' |
| return { |
| 'service': 'api', |
| 'status': status, |
| 'response_time_ms': response_time, |
| 'status_code': resp.status_code |
| } |
| except Exception as e: |
| return { |
| 'service': 'api', |
| 'status': 'unhealthy', |
| 'error': str(e) |
| } |
| |
| @staticmethod |
| def check_database() -> Dict[str, Any]: |
| """Check database health""" |
| try: |
| start = time.time() |
| conn = sqlite3.connect(str(MONITOR_DB)) |
| conn.execute('SELECT 1') |
| conn.close() |
| response_time = (time.time() - start) * 1000 |
| |
| return { |
| 'service': 'database', |
| 'status': 'healthy', |
| 'response_time_ms': response_time |
| } |
| except Exception as e: |
| return { |
| 'service': 'database', |
| 'status': 'unhealthy', |
| 'error': str(e) |
| } |
| |
| @staticmethod |
| def check_redis() -> Dict[str, Any]: |
| """Check Redis health""" |
| try: |
| from server.cache_manager import redis_client, REDIS_AVAILABLE |
| |
| if not REDIS_AVAILABLE: |
| return { |
| 'service': 'redis', |
| 'status': 'unavailable', |
| 'message': 'Redis not configured' |
| } |
| |
| start = time.time() |
| redis_client.ping() |
| response_time = (time.time() - start) * 1000 |
| |
| return { |
| 'service': 'redis', |
| 'status': 'healthy', |
| 'response_time_ms': response_time |
| } |
| except Exception as e: |
| return { |
| 'service': 'redis', |
| 'status': 'unhealthy', |
| 'error': str(e) |
| } |
| |
| @staticmethod |
| def check_celery() -> Dict[str, Any]: |
| """Check Celery health""" |
| try: |
| from server.task_queue import get_worker_stats |
| |
| stats = get_worker_stats() |
| if stats: |
| return { |
| 'service': 'celery', |
| 'status': 'healthy', |
| 'workers': len(stats) |
| } |
| else: |
| return { |
| 'service': 'celery', |
| 'status': 'degraded', |
| 'message': 'No workers available' |
| } |
| except Exception as e: |
| return { |
| 'service': 'celery', |
| 'status': 'unhealthy', |
| 'error': str(e) |
| } |
| |
| @staticmethod |
| def check_all() -> Dict[str, Any]: |
| """Check all services""" |
| checks = { |
| 'api': HealthChecker.check_api(), |
| 'database': HealthChecker.check_database(), |
| 'redis': HealthChecker.check_redis(), |
| 'celery': HealthChecker.check_celery(), |
| } |
| |
| |
| conn = sqlite3.connect(str(MONITOR_DB)) |
| for service, result in checks.items(): |
| conn.execute("""INSERT INTO health_checks |
| (service, status, response_time_ms, details) |
| VALUES (?, ?, ?, ?)""", |
| (service, result.get('status'), result.get('response_time_ms'), json.dumps(result))) |
| conn.commit() |
| conn.close() |
| |
| return checks |
|
|
| class SystemMetrics: |
| """System metrics collection""" |
| |
| @staticmethod |
| def collect() -> Dict[str, Any]: |
| """Collect system metrics""" |
| try: |
| cpu_percent = psutil.cpu_percent(interval=1) |
| memory = psutil.virtual_memory() |
| disk = psutil.disk_usage('/') |
| |
| metrics = { |
| 'cpu_percent': cpu_percent, |
| 'memory_percent': memory.percent, |
| 'memory_available_mb': memory.available / (1024 * 1024), |
| 'disk_percent': disk.percent, |
| 'disk_free_gb': disk.free / (1024 * 1024 * 1024), |
| 'timestamp': datetime.utcnow().isoformat() |
| } |
| |
| |
| conn = sqlite3.connect(str(MONITOR_DB)) |
| conn.execute("""INSERT INTO system_metrics |
| (cpu_percent, memory_percent, disk_percent) |
| VALUES (?, ?, ?)""", |
| (cpu_percent, memory.percent, disk.percent)) |
| conn.commit() |
| conn.close() |
| |
| return metrics |
| except Exception as e: |
| return {'error': str(e)} |
| |
| @staticmethod |
| def get_average(hours: int = 1) -> Dict[str, float]: |
| """Get average metrics over time period""" |
| conn = sqlite3.connect(str(MONITOR_DB)) |
| |
| row = conn.execute("""SELECT |
| AVG(cpu_percent) as avg_cpu, |
| AVG(memory_percent) as avg_memory, |
| AVG(disk_percent) as avg_disk |
| FROM system_metrics |
| WHERE timestamp > datetime('now', '-' || ? || ' hours')""", |
| (hours,)).fetchone() |
| |
| conn.close() |
| |
| if row: |
| return { |
| 'avg_cpu_percent': row[0] or 0, |
| 'avg_memory_percent': row[1] or 0, |
| 'avg_disk_percent': row[2] or 0, |
| 'period_hours': hours |
| } |
| return {} |
|
|
| class AlertManager: |
| """Alert management""" |
| |
| THRESHOLDS = { |
| 'cpu_percent': 80, |
| 'memory_percent': 85, |
| 'disk_percent': 90, |
| 'response_time_ms': 5000, |
| } |
| |
| @staticmethod |
| def check_thresholds(): |
| """Check if metrics exceed thresholds""" |
| metrics = SystemMetrics.collect() |
| alerts = [] |
| |
| if metrics.get('cpu_percent', 0) > AlertManager.THRESHOLDS['cpu_percent']: |
| alerts.append({ |
| 'type': 'HIGH_CPU', |
| 'severity': 'warning', |
| 'message': f"CPU usage at {metrics['cpu_percent']:.1f}%" |
| }) |
| |
| if metrics.get('memory_percent', 0) > AlertManager.THRESHOLDS['memory_percent']: |
| alerts.append({ |
| 'type': 'HIGH_MEMORY', |
| 'severity': 'warning', |
| 'message': f"Memory usage at {metrics['memory_percent']:.1f}%" |
| }) |
| |
| if metrics.get('disk_percent', 0) > AlertManager.THRESHOLDS['disk_percent']: |
| alerts.append({ |
| 'type': 'HIGH_DISK', |
| 'severity': 'critical', |
| 'message': f"Disk usage at {metrics['disk_percent']:.1f}%" |
| }) |
| |
| |
| conn = sqlite3.connect(str(MONITOR_DB)) |
| for alert in alerts: |
| conn.execute("""INSERT INTO alerts |
| (alert_type, severity, message) |
| VALUES (?, ?, ?)""", |
| (alert['type'], alert['severity'], alert['message'])) |
| conn.commit() |
| conn.close() |
| |
| return alerts |
| |
| @staticmethod |
| def get_active_alerts() -> List[Dict[str, Any]]: |
| """Get active alerts""" |
| conn = sqlite3.connect(str(MONITOR_DB)) |
| conn.row_factory = sqlite3.Row |
| |
| rows = conn.execute("""SELECT * FROM alerts |
| WHERE resolved = 0 |
| ORDER BY timestamp DESC |
| LIMIT 50""").fetchall() |
| |
| conn.close() |
| return [dict(r) for r in rows] |
| |
| @staticmethod |
| def resolve_alert(alert_id: int): |
| """Resolve alert""" |
| conn = sqlite3.connect(str(MONITOR_DB)) |
| conn.execute("""UPDATE alerts |
| SET resolved = 1, resolved_at = CURRENT_TIMESTAMP |
| WHERE id = ?""", (alert_id,)) |
| conn.commit() |
| conn.close() |
|
|
| class UptimeTracker: |
| """Track service uptime""" |
| |
| @staticmethod |
| def log_status(service: str, status: str): |
| """Log service status""" |
| conn = sqlite3.connect(str(MONITOR_DB)) |
| conn.execute("""INSERT INTO uptime_log |
| (service, status) |
| VALUES (?, ?)""", |
| (service, status)) |
| conn.commit() |
| conn.close() |
| |
| @staticmethod |
| def get_uptime(service: str, hours: int = 24) -> Dict[str, Any]: |
| """Get uptime percentage""" |
| conn = sqlite3.connect(str(MONITOR_DB)) |
| |
| total = conn.execute("""SELECT COUNT(*) FROM uptime_log |
| WHERE service = ? AND timestamp > datetime('now', '-' || ? || ' hours')""", |
| (service, hours)).fetchone()[0] |
| |
| healthy = conn.execute("""SELECT COUNT(*) FROM uptime_log |
| WHERE service = ? AND status = 'healthy' |
| AND timestamp > datetime('now', '-' || ? || ' hours')""", |
| (service, hours)).fetchone()[0] |
| |
| conn.close() |
| |
| if total == 0: |
| return {'uptime_percent': 100, 'total_checks': 0} |
| |
| uptime_percent = (healthy / total) * 100 |
| return { |
| 'service': service, |
| 'uptime_percent': uptime_percent, |
| 'healthy_checks': healthy, |
| 'total_checks': total, |
| 'period_hours': hours |
| } |
| |
| @staticmethod |
| def get_all_uptime(hours: int = 24) -> List[Dict[str, Any]]: |
| """Get uptime for all services""" |
| conn = sqlite3.connect(str(MONITOR_DB)) |
| |
| services = conn.execute("""SELECT DISTINCT service FROM uptime_log""").fetchall() |
| conn.close() |
| |
| uptime_data = [] |
| for (service,) in services: |
| uptime_data.append(UptimeTracker.get_uptime(service, hours)) |
| |
| return uptime_data |
|
|
| class MonitoringDaemon: |
| """Background monitoring daemon""" |
| |
| def __init__(self, interval: int = 60): |
| self.interval = interval |
| self.running = False |
| self.thread = None |
| |
| def start(self): |
| """Start monitoring daemon""" |
| if self.running: |
| return |
| |
| self.running = True |
| self.thread = threading.Thread(target=self._run, daemon=True) |
| self.thread.start() |
| print("✓ Monitoring daemon started") |
| |
| def stop(self): |
| """Stop monitoring daemon""" |
| self.running = False |
| if self.thread: |
| self.thread.join(timeout=5) |
| print("✓ Monitoring daemon stopped") |
| |
| def _run(self): |
| """Main monitoring loop""" |
| while self.running: |
| try: |
| |
| SystemMetrics.collect() |
| |
| |
| AlertManager.check_thresholds() |
| |
| |
| checks = HealthChecker.check_all() |
| for service, result in checks.items(): |
| status = result.get('status', 'unknown') |
| UptimeTracker.log_status(service, status) |
| |
| time.sleep(self.interval) |
| except Exception as e: |
| print(f"Monitoring error: {e}") |
| time.sleep(self.interval) |
|
|
| |
| _daemon = MonitoringDaemon() |
|
|
| def start_monitoring(): |
| """Start monitoring""" |
| _daemon.start() |
|
|
| def stop_monitoring(): |
| """Stop monitoring""" |
| _daemon.stop() |
|
|
| def get_system_status() -> Dict[str, Any]: |
| """Get complete system status""" |
| return { |
| 'timestamp': datetime.utcnow().isoformat(), |
| 'health': HealthChecker.check_all(), |
| 'metrics': SystemMetrics.collect(), |
| 'alerts': AlertManager.get_active_alerts(), |
| 'uptime': UptimeTracker.get_all_uptime(24) |
| } |
|
|