JRNET / web /services /monitoring_service.py
Factor Studios
Upload 96 files
6a5b8d8 verified
"""
System monitoring and health check service
"""
import psutil
import platform
from datetime import datetime, timedelta
from typing import Dict, List
from .models import SystemHealth, Alert, db
class MonitoringService:
def __init__(self):
self.alert_thresholds = {
'cpu_usage': 90,
'memory_usage': 90,
'disk_usage': 90,
'failed_login_attempts': 10
}
def get_system_health(self) -> Dict:
"""Get current system health metrics"""
cpu_usage = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
health = SystemHealth(
cpu_usage=cpu_usage,
memory_usage=memory.percent,
disk_usage=disk.percent,
timestamp=datetime.utcnow()
)
db.session.add(health)
db.session.commit()
# Check for alerts
self._check_alerts(health)
return {
'cpu_usage': cpu_usage,
'memory_usage': memory.percent,
'disk_usage': disk.percent,
'memory_total': memory.total,
'memory_available': memory.available,
'disk_total': disk.total,
'disk_free': disk.free,
'system_time': datetime.utcnow().isoformat(),
'system_uptime': self.get_system_uptime()
}
def get_system_uptime(self) -> str:
"""Get system uptime in human readable format"""
uptime = datetime.now() - datetime.fromtimestamp(psutil.boot_time())
days = uptime.days
hours, remainder = divmod(uptime.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
parts = []
if days > 0:
parts.append(f"{days}d")
if hours > 0:
parts.append(f"{hours}h")
if minutes > 0:
parts.append(f"{minutes}m")
if seconds > 0 or not parts:
parts.append(f"{seconds}s")
return " ".join(parts)
def _check_alerts(self, health: SystemHealth) -> None:
"""Check system health metrics and create alerts if needed"""
# CPU Usage Alert
if health.cpu_usage >= self.alert_thresholds['cpu_usage']:
self._create_alert(
'High CPU Usage',
f'CPU usage is at {health.cpu_usage}%',
'warning'
)
# Memory Usage Alert
if health.memory_usage >= self.alert_thresholds['memory_usage']:
self._create_alert(
'High Memory Usage',
f'Memory usage is at {health.memory_usage}%',
'warning'
)
# Disk Usage Alert
if health.disk_usage >= self.alert_thresholds['disk_usage']:
self._create_alert(
'High Disk Usage',
f'Disk usage is at {health.disk_usage}%',
'warning'
)
def _create_alert(self, title: str, message: str, severity: str) -> None:
"""Create a new alert if one doesn't already exist"""
# Check if a similar active alert exists
existing_alert = Alert.query.filter_by(
title=title,
status='active'
).first()
if not existing_alert:
alert = Alert(
title=title,
message=message,
severity=severity,
status='active',
created_at=datetime.utcnow()
)
db.session.add(alert)
db.session.commit()
def get_active_alerts(self) -> List[Alert]:
"""Get list of active alerts"""
return Alert.query.filter_by(status='active').all()
def resolve_alert(self, alert_id: int) -> None:
"""Mark an alert as resolved"""
alert = Alert.query.get(alert_id)
if alert:
alert.status = 'resolved'
alert.resolved_at = datetime.utcnow()
db.session.commit()
def restart_services(self) -> None:
"""Restart VPN services after configuration change"""
# TODO: Implement service restart logic
pass
monitoring_service = MonitoringService()