last_edit / server /monitoring.py
Moharek
Deploy Moharek GEO Platform
a74b879
"""
Monitoring & Alerting System
- System health monitoring
- Performance metrics
- Uptime tracking
"""
import sqlite3
import json
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Any, Optional
import psutil
import threading
import time
MONITOR_DB = Path(os.environ.get('OUTPUT_DIR', './output')) / 'monitoring.db'
def init_monitoring_db():
"""Initialize monitoring database"""
conn = sqlite3.connect(str(MONITOR_DB))
conn.execute("""CREATE TABLE IF NOT EXISTS health_checks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
service TEXT,
status TEXT,
response_time_ms FLOAT,
details TEXT
)""")
conn.execute("""CREATE TABLE IF NOT EXISTS system_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
cpu_percent FLOAT,
memory_percent FLOAT,
disk_percent FLOAT,
active_connections INTEGER,
request_count INTEGER
)""")
conn.execute("""CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
alert_type TEXT,
severity TEXT,
message TEXT,
resolved BOOLEAN DEFAULT 0,
resolved_at TIMESTAMP
)""")
conn.execute("""CREATE TABLE IF NOT EXISTS uptime_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
service TEXT,
status TEXT,
downtime_seconds INTEGER
)""")
conn.commit()
conn.close()
init_monitoring_db()
class HealthChecker:
"""Health check for services"""
@staticmethod
def check_api() -> Dict[str, Any]:
"""Check API health"""
try:
import requests
start = time.time()
resp = requests.get('http://localhost:7860/health', timeout=5)
response_time = (time.time() - start) * 1000
status = 'healthy' if resp.status_code == 200 else 'degraded'
return {
'service': 'api',
'status': status,
'response_time_ms': response_time,
'status_code': resp.status_code
}
except Exception as e:
return {
'service': 'api',
'status': 'unhealthy',
'error': str(e)
}
@staticmethod
def check_database() -> Dict[str, Any]:
"""Check database health"""
try:
start = time.time()
conn = sqlite3.connect(str(MONITOR_DB))
conn.execute('SELECT 1')
conn.close()
response_time = (time.time() - start) * 1000
return {
'service': 'database',
'status': 'healthy',
'response_time_ms': response_time
}
except Exception as e:
return {
'service': 'database',
'status': 'unhealthy',
'error': str(e)
}
@staticmethod
def check_redis() -> Dict[str, Any]:
"""Check Redis health"""
try:
from server.cache_manager import redis_client, REDIS_AVAILABLE
if not REDIS_AVAILABLE:
return {
'service': 'redis',
'status': 'unavailable',
'message': 'Redis not configured'
}
start = time.time()
redis_client.ping()
response_time = (time.time() - start) * 1000
return {
'service': 'redis',
'status': 'healthy',
'response_time_ms': response_time
}
except Exception as e:
return {
'service': 'redis',
'status': 'unhealthy',
'error': str(e)
}
@staticmethod
def check_celery() -> Dict[str, Any]:
"""Check Celery health"""
try:
from server.task_queue import get_worker_stats
stats = get_worker_stats()
if stats:
return {
'service': 'celery',
'status': 'healthy',
'workers': len(stats)
}
else:
return {
'service': 'celery',
'status': 'degraded',
'message': 'No workers available'
}
except Exception as e:
return {
'service': 'celery',
'status': 'unhealthy',
'error': str(e)
}
@staticmethod
def check_all() -> Dict[str, Any]:
"""Check all services"""
checks = {
'api': HealthChecker.check_api(),
'database': HealthChecker.check_database(),
'redis': HealthChecker.check_redis(),
'celery': HealthChecker.check_celery(),
}
# Log checks
conn = sqlite3.connect(str(MONITOR_DB))
for service, result in checks.items():
conn.execute("""INSERT INTO health_checks
(service, status, response_time_ms, details)
VALUES (?, ?, ?, ?)""",
(service, result.get('status'), result.get('response_time_ms'), json.dumps(result)))
conn.commit()
conn.close()
return checks
class SystemMetrics:
"""System metrics collection"""
@staticmethod
def collect() -> Dict[str, Any]:
"""Collect system metrics"""
try:
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
metrics = {
'cpu_percent': cpu_percent,
'memory_percent': memory.percent,
'memory_available_mb': memory.available / (1024 * 1024),
'disk_percent': disk.percent,
'disk_free_gb': disk.free / (1024 * 1024 * 1024),
'timestamp': datetime.utcnow().isoformat()
}
# Log metrics
conn = sqlite3.connect(str(MONITOR_DB))
conn.execute("""INSERT INTO system_metrics
(cpu_percent, memory_percent, disk_percent)
VALUES (?, ?, ?)""",
(cpu_percent, memory.percent, disk.percent))
conn.commit()
conn.close()
return metrics
except Exception as e:
return {'error': str(e)}
@staticmethod
def get_average(hours: int = 1) -> Dict[str, float]:
"""Get average metrics over time period"""
conn = sqlite3.connect(str(MONITOR_DB))
row = conn.execute("""SELECT
AVG(cpu_percent) as avg_cpu,
AVG(memory_percent) as avg_memory,
AVG(disk_percent) as avg_disk
FROM system_metrics
WHERE timestamp > datetime('now', '-' || ? || ' hours')""",
(hours,)).fetchone()
conn.close()
if row:
return {
'avg_cpu_percent': row[0] or 0,
'avg_memory_percent': row[1] or 0,
'avg_disk_percent': row[2] or 0,
'period_hours': hours
}
return {}
class AlertManager:
"""Alert management"""
THRESHOLDS = {
'cpu_percent': 80,
'memory_percent': 85,
'disk_percent': 90,
'response_time_ms': 5000,
}
@staticmethod
def check_thresholds():
"""Check if metrics exceed thresholds"""
metrics = SystemMetrics.collect()
alerts = []
if metrics.get('cpu_percent', 0) > AlertManager.THRESHOLDS['cpu_percent']:
alerts.append({
'type': 'HIGH_CPU',
'severity': 'warning',
'message': f"CPU usage at {metrics['cpu_percent']:.1f}%"
})
if metrics.get('memory_percent', 0) > AlertManager.THRESHOLDS['memory_percent']:
alerts.append({
'type': 'HIGH_MEMORY',
'severity': 'warning',
'message': f"Memory usage at {metrics['memory_percent']:.1f}%"
})
if metrics.get('disk_percent', 0) > AlertManager.THRESHOLDS['disk_percent']:
alerts.append({
'type': 'HIGH_DISK',
'severity': 'critical',
'message': f"Disk usage at {metrics['disk_percent']:.1f}%"
})
# Log alerts
conn = sqlite3.connect(str(MONITOR_DB))
for alert in alerts:
conn.execute("""INSERT INTO alerts
(alert_type, severity, message)
VALUES (?, ?, ?)""",
(alert['type'], alert['severity'], alert['message']))
conn.commit()
conn.close()
return alerts
@staticmethod
def get_active_alerts() -> List[Dict[str, Any]]:
"""Get active alerts"""
conn = sqlite3.connect(str(MONITOR_DB))
conn.row_factory = sqlite3.Row
rows = conn.execute("""SELECT * FROM alerts
WHERE resolved = 0
ORDER BY timestamp DESC
LIMIT 50""").fetchall()
conn.close()
return [dict(r) for r in rows]
@staticmethod
def resolve_alert(alert_id: int):
"""Resolve alert"""
conn = sqlite3.connect(str(MONITOR_DB))
conn.execute("""UPDATE alerts
SET resolved = 1, resolved_at = CURRENT_TIMESTAMP
WHERE id = ?""", (alert_id,))
conn.commit()
conn.close()
class UptimeTracker:
"""Track service uptime"""
@staticmethod
def log_status(service: str, status: str):
"""Log service status"""
conn = sqlite3.connect(str(MONITOR_DB))
conn.execute("""INSERT INTO uptime_log
(service, status)
VALUES (?, ?)""",
(service, status))
conn.commit()
conn.close()
@staticmethod
def get_uptime(service: str, hours: int = 24) -> Dict[str, Any]:
"""Get uptime percentage"""
conn = sqlite3.connect(str(MONITOR_DB))
total = conn.execute("""SELECT COUNT(*) FROM uptime_log
WHERE service = ? AND timestamp > datetime('now', '-' || ? || ' hours')""",
(service, hours)).fetchone()[0]
healthy = conn.execute("""SELECT COUNT(*) FROM uptime_log
WHERE service = ? AND status = 'healthy'
AND timestamp > datetime('now', '-' || ? || ' hours')""",
(service, hours)).fetchone()[0]
conn.close()
if total == 0:
return {'uptime_percent': 100, 'total_checks': 0}
uptime_percent = (healthy / total) * 100
return {
'service': service,
'uptime_percent': uptime_percent,
'healthy_checks': healthy,
'total_checks': total,
'period_hours': hours
}
@staticmethod
def get_all_uptime(hours: int = 24) -> List[Dict[str, Any]]:
"""Get uptime for all services"""
conn = sqlite3.connect(str(MONITOR_DB))
services = conn.execute("""SELECT DISTINCT service FROM uptime_log""").fetchall()
conn.close()
uptime_data = []
for (service,) in services:
uptime_data.append(UptimeTracker.get_uptime(service, hours))
return uptime_data
class MonitoringDaemon:
"""Background monitoring daemon"""
def __init__(self, interval: int = 60):
self.interval = interval
self.running = False
self.thread = None
def start(self):
"""Start monitoring daemon"""
if self.running:
return
self.running = True
self.thread = threading.Thread(target=self._run, daemon=True)
self.thread.start()
print("✓ Monitoring daemon started")
def stop(self):
"""Stop monitoring daemon"""
self.running = False
if self.thread:
self.thread.join(timeout=5)
print("✓ Monitoring daemon stopped")
def _run(self):
"""Main monitoring loop"""
while self.running:
try:
# Collect metrics
SystemMetrics.collect()
# Check thresholds
AlertManager.check_thresholds()
# Check health
checks = HealthChecker.check_all()
for service, result in checks.items():
status = result.get('status', 'unknown')
UptimeTracker.log_status(service, status)
time.sleep(self.interval)
except Exception as e:
print(f"Monitoring error: {e}")
time.sleep(self.interval)
# Global daemon instance
_daemon = MonitoringDaemon()
def start_monitoring():
"""Start monitoring"""
_daemon.start()
def stop_monitoring():
"""Stop monitoring"""
_daemon.stop()
def get_system_status() -> Dict[str, Any]:
"""Get complete system status"""
return {
'timestamp': datetime.utcnow().isoformat(),
'health': HealthChecker.check_all(),
'metrics': SystemMetrics.collect(),
'alerts': AlertManager.get_active_alerts(),
'uptime': UptimeTracker.get_all_uptime(24)
}