| """
|
| System monitoring and health check service
|
| """
|
| import psutil
|
| import platform
|
| from datetime import datetime, timedelta
|
| from typing import Dict, List
|
| from .models import SystemHealth, Alert, db
|
|
|
| class MonitoringService:
|
| def __init__(self):
|
| self.alert_thresholds = {
|
| 'cpu_usage': 90,
|
| 'memory_usage': 90,
|
| 'disk_usage': 90,
|
| 'failed_login_attempts': 10
|
| }
|
|
|
| def get_system_health(self) -> Dict:
|
| """Get current system health metrics"""
|
| cpu_usage = psutil.cpu_percent(interval=1)
|
| memory = psutil.virtual_memory()
|
| disk = psutil.disk_usage('/')
|
|
|
| health = SystemHealth(
|
| cpu_usage=cpu_usage,
|
| memory_usage=memory.percent,
|
| disk_usage=disk.percent,
|
| timestamp=datetime.utcnow()
|
| )
|
| db.session.add(health)
|
| db.session.commit()
|
|
|
|
|
| self._check_alerts(health)
|
|
|
| return {
|
| 'cpu_usage': cpu_usage,
|
| 'memory_usage': memory.percent,
|
| 'disk_usage': disk.percent,
|
| 'memory_total': memory.total,
|
| 'memory_available': memory.available,
|
| 'disk_total': disk.total,
|
| 'disk_free': disk.free,
|
| 'system_time': datetime.utcnow().isoformat(),
|
| 'system_uptime': self.get_system_uptime()
|
| }
|
|
|
| def get_system_uptime(self) -> str:
|
| """Get system uptime in human readable format"""
|
| uptime = datetime.now() - datetime.fromtimestamp(psutil.boot_time())
|
| days = uptime.days
|
| hours, remainder = divmod(uptime.seconds, 3600)
|
| minutes, seconds = divmod(remainder, 60)
|
|
|
| parts = []
|
| if days > 0:
|
| parts.append(f"{days}d")
|
| if hours > 0:
|
| parts.append(f"{hours}h")
|
| if minutes > 0:
|
| parts.append(f"{minutes}m")
|
| if seconds > 0 or not parts:
|
| parts.append(f"{seconds}s")
|
|
|
| return " ".join(parts)
|
|
|
| def _check_alerts(self, health: SystemHealth) -> None:
|
| """Check system health metrics and create alerts if needed"""
|
|
|
| if health.cpu_usage >= self.alert_thresholds['cpu_usage']:
|
| self._create_alert(
|
| 'High CPU Usage',
|
| f'CPU usage is at {health.cpu_usage}%',
|
| 'warning'
|
| )
|
|
|
|
|
| if health.memory_usage >= self.alert_thresholds['memory_usage']:
|
| self._create_alert(
|
| 'High Memory Usage',
|
| f'Memory usage is at {health.memory_usage}%',
|
| 'warning'
|
| )
|
|
|
|
|
| if health.disk_usage >= self.alert_thresholds['disk_usage']:
|
| self._create_alert(
|
| 'High Disk Usage',
|
| f'Disk usage is at {health.disk_usage}%',
|
| 'warning'
|
| )
|
|
|
| def _create_alert(self, title: str, message: str, severity: str) -> None:
|
| """Create a new alert if one doesn't already exist"""
|
|
|
| existing_alert = Alert.query.filter_by(
|
| title=title,
|
| status='active'
|
| ).first()
|
|
|
| if not existing_alert:
|
| alert = Alert(
|
| title=title,
|
| message=message,
|
| severity=severity,
|
| status='active',
|
| created_at=datetime.utcnow()
|
| )
|
| db.session.add(alert)
|
| db.session.commit()
|
|
|
| def get_active_alerts(self) -> List[Alert]:
|
| """Get list of active alerts"""
|
| return Alert.query.filter_by(status='active').all()
|
|
|
| def resolve_alert(self, alert_id: int) -> None:
|
| """Mark an alert as resolved"""
|
| alert = Alert.query.get(alert_id)
|
| if alert:
|
| alert.status = 'resolved'
|
| alert.resolved_at = datetime.utcnow()
|
| db.session.commit()
|
|
|
| def restart_services(self) -> None:
|
| """Restart VPN services after configuration change"""
|
|
|
| pass
|
|
|
| monitoring_service = MonitoringService()
|
|
|