""" System monitoring and health check service """ import psutil import platform from datetime import datetime, timedelta from typing import Dict, List from .models import SystemHealth, Alert, db class MonitoringService: def __init__(self): self.alert_thresholds = { 'cpu_usage': 90, 'memory_usage': 90, 'disk_usage': 90, 'failed_login_attempts': 10 } def get_system_health(self) -> Dict: """Get current system health metrics""" cpu_usage = psutil.cpu_percent(interval=1) memory = psutil.virtual_memory() disk = psutil.disk_usage('/') health = SystemHealth( cpu_usage=cpu_usage, memory_usage=memory.percent, disk_usage=disk.percent, timestamp=datetime.utcnow() ) db.session.add(health) db.session.commit() # Check for alerts self._check_alerts(health) return { 'cpu_usage': cpu_usage, 'memory_usage': memory.percent, 'disk_usage': disk.percent, 'memory_total': memory.total, 'memory_available': memory.available, 'disk_total': disk.total, 'disk_free': disk.free, 'system_time': datetime.utcnow().isoformat(), 'system_uptime': self.get_system_uptime() } def get_system_uptime(self) -> str: """Get system uptime in human readable format""" uptime = datetime.now() - datetime.fromtimestamp(psutil.boot_time()) days = uptime.days hours, remainder = divmod(uptime.seconds, 3600) minutes, seconds = divmod(remainder, 60) parts = [] if days > 0: parts.append(f"{days}d") if hours > 0: parts.append(f"{hours}h") if minutes > 0: parts.append(f"{minutes}m") if seconds > 0 or not parts: parts.append(f"{seconds}s") return " ".join(parts) def _check_alerts(self, health: SystemHealth) -> None: """Check system health metrics and create alerts if needed""" # CPU Usage Alert if health.cpu_usage >= self.alert_thresholds['cpu_usage']: self._create_alert( 'High CPU Usage', f'CPU usage is at {health.cpu_usage}%', 'warning' ) # Memory Usage Alert if health.memory_usage >= self.alert_thresholds['memory_usage']: self._create_alert( 'High Memory Usage', f'Memory usage is at {health.memory_usage}%', 'warning' ) # Disk Usage Alert if health.disk_usage >= self.alert_thresholds['disk_usage']: self._create_alert( 'High Disk Usage', f'Disk usage is at {health.disk_usage}%', 'warning' ) def _create_alert(self, title: str, message: str, severity: str) -> None: """Create a new alert if one doesn't already exist""" # Check if a similar active alert exists existing_alert = Alert.query.filter_by( title=title, status='active' ).first() if not existing_alert: alert = Alert( title=title, message=message, severity=severity, status='active', created_at=datetime.utcnow() ) db.session.add(alert) db.session.commit() def get_active_alerts(self) -> List[Alert]: """Get list of active alerts""" return Alert.query.filter_by(status='active').all() def resolve_alert(self, alert_id: int) -> None: """Mark an alert as resolved""" alert = Alert.query.get(alert_id) if alert: alert.status = 'resolved' alert.resolved_at = datetime.utcnow() db.session.commit() def restart_services(self) -> None: """Restart VPN services after configuration change""" # TODO: Implement service restart logic pass monitoring_service = MonitoringService()