Spaces:
Runtime error
Runtime error
| """ | |
| System monitoring and health check service | |
| """ | |
| import psutil | |
| import platform | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List | |
| from .models import SystemHealth, Alert, db | |
| class MonitoringService: | |
| def __init__(self): | |
| self.alert_thresholds = { | |
| 'cpu_usage': 90, | |
| 'memory_usage': 90, | |
| 'disk_usage': 90, | |
| 'failed_login_attempts': 10 | |
| } | |
| def get_system_health(self) -> Dict: | |
| """Get current system health metrics""" | |
| cpu_usage = psutil.cpu_percent(interval=1) | |
| memory = psutil.virtual_memory() | |
| disk = psutil.disk_usage('/') | |
| health = SystemHealth( | |
| cpu_usage=cpu_usage, | |
| memory_usage=memory.percent, | |
| disk_usage=disk.percent, | |
| timestamp=datetime.utcnow() | |
| ) | |
| db.session.add(health) | |
| db.session.commit() | |
| # Check for alerts | |
| self._check_alerts(health) | |
| return { | |
| 'cpu_usage': cpu_usage, | |
| 'memory_usage': memory.percent, | |
| 'disk_usage': disk.percent, | |
| 'memory_total': memory.total, | |
| 'memory_available': memory.available, | |
| 'disk_total': disk.total, | |
| 'disk_free': disk.free, | |
| 'system_time': datetime.utcnow().isoformat(), | |
| 'system_uptime': self.get_system_uptime() | |
| } | |
| def get_system_uptime(self) -> str: | |
| """Get system uptime in human readable format""" | |
| uptime = datetime.now() - datetime.fromtimestamp(psutil.boot_time()) | |
| days = uptime.days | |
| hours, remainder = divmod(uptime.seconds, 3600) | |
| minutes, seconds = divmod(remainder, 60) | |
| parts = [] | |
| if days > 0: | |
| parts.append(f"{days}d") | |
| if hours > 0: | |
| parts.append(f"{hours}h") | |
| if minutes > 0: | |
| parts.append(f"{minutes}m") | |
| if seconds > 0 or not parts: | |
| parts.append(f"{seconds}s") | |
| return " ".join(parts) | |
| def _check_alerts(self, health: SystemHealth) -> None: | |
| """Check system health metrics and create alerts if needed""" | |
| # CPU Usage Alert | |
| if health.cpu_usage >= self.alert_thresholds['cpu_usage']: | |
| self._create_alert( | |
| 'High CPU Usage', | |
| f'CPU usage is at {health.cpu_usage}%', | |
| 'warning' | |
| ) | |
| # Memory Usage Alert | |
| if health.memory_usage >= self.alert_thresholds['memory_usage']: | |
| self._create_alert( | |
| 'High Memory Usage', | |
| f'Memory usage is at {health.memory_usage}%', | |
| 'warning' | |
| ) | |
| # Disk Usage Alert | |
| if health.disk_usage >= self.alert_thresholds['disk_usage']: | |
| self._create_alert( | |
| 'High Disk Usage', | |
| f'Disk usage is at {health.disk_usage}%', | |
| 'warning' | |
| ) | |
| def _create_alert(self, title: str, message: str, severity: str) -> None: | |
| """Create a new alert if one doesn't already exist""" | |
| # Check if a similar active alert exists | |
| existing_alert = Alert.query.filter_by( | |
| title=title, | |
| status='active' | |
| ).first() | |
| if not existing_alert: | |
| alert = Alert( | |
| title=title, | |
| message=message, | |
| severity=severity, | |
| status='active', | |
| created_at=datetime.utcnow() | |
| ) | |
| db.session.add(alert) | |
| db.session.commit() | |
| def get_active_alerts(self) -> List[Alert]: | |
| """Get list of active alerts""" | |
| return Alert.query.filter_by(status='active').all() | |
| def resolve_alert(self, alert_id: int) -> None: | |
| """Mark an alert as resolved""" | |
| alert = Alert.query.get(alert_id) | |
| if alert: | |
| alert.status = 'resolved' | |
| alert.resolved_at = datetime.utcnow() | |
| db.session.commit() | |
| def restart_services(self) -> None: | |
| """Restart VPN services after configuration change""" | |
| # TODO: Implement service restart logic | |
| pass | |
| monitoring_service = MonitoringService() | |