File size: 4,394 Bytes
6a5b8d8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""

System monitoring and health check service

"""
import psutil
import platform
from datetime import datetime, timedelta
from typing import Dict, List
from .models import SystemHealth, Alert, db

class MonitoringService:
    def __init__(self):
        self.alert_thresholds = {
            'cpu_usage': 90,
            'memory_usage': 90,
            'disk_usage': 90,
            'failed_login_attempts': 10
        }
    
    def get_system_health(self) -> Dict:
        """Get current system health metrics"""
        cpu_usage = psutil.cpu_percent(interval=1)
        memory = psutil.virtual_memory()
        disk = psutil.disk_usage('/')
        
        health = SystemHealth(
            cpu_usage=cpu_usage,
            memory_usage=memory.percent,
            disk_usage=disk.percent,
            timestamp=datetime.utcnow()
        )
        db.session.add(health)
        db.session.commit()
        
        # Check for alerts
        self._check_alerts(health)
        
        return {
            'cpu_usage': cpu_usage,
            'memory_usage': memory.percent,
            'disk_usage': disk.percent,
            'memory_total': memory.total,
            'memory_available': memory.available,
            'disk_total': disk.total,
            'disk_free': disk.free,
            'system_time': datetime.utcnow().isoformat(),
            'system_uptime': self.get_system_uptime()
        }
    
    def get_system_uptime(self) -> str:
        """Get system uptime in human readable format"""
        uptime = datetime.now() - datetime.fromtimestamp(psutil.boot_time())
        days = uptime.days
        hours, remainder = divmod(uptime.seconds, 3600)
        minutes, seconds = divmod(remainder, 60)
        
        parts = []
        if days > 0:
            parts.append(f"{days}d")
        if hours > 0:
            parts.append(f"{hours}h")
        if minutes > 0:
            parts.append(f"{minutes}m")
        if seconds > 0 or not parts:
            parts.append(f"{seconds}s")
        
        return " ".join(parts)
    
    def _check_alerts(self, health: SystemHealth) -> None:
        """Check system health metrics and create alerts if needed"""
        # CPU Usage Alert
        if health.cpu_usage >= self.alert_thresholds['cpu_usage']:
            self._create_alert(
                'High CPU Usage',
                f'CPU usage is at {health.cpu_usage}%',
                'warning'
            )
        
        # Memory Usage Alert
        if health.memory_usage >= self.alert_thresholds['memory_usage']:
            self._create_alert(
                'High Memory Usage',
                f'Memory usage is at {health.memory_usage}%',
                'warning'
            )
        
        # Disk Usage Alert
        if health.disk_usage >= self.alert_thresholds['disk_usage']:
            self._create_alert(
                'High Disk Usage',
                f'Disk usage is at {health.disk_usage}%',
                'warning'
            )
    
    def _create_alert(self, title: str, message: str, severity: str) -> None:
        """Create a new alert if one doesn't already exist"""
        # Check if a similar active alert exists
        existing_alert = Alert.query.filter_by(
            title=title,
            status='active'
        ).first()
        
        if not existing_alert:
            alert = Alert(
                title=title,
                message=message,
                severity=severity,
                status='active',
                created_at=datetime.utcnow()
            )
            db.session.add(alert)
            db.session.commit()
    
    def get_active_alerts(self) -> List[Alert]:
        """Get list of active alerts"""
        return Alert.query.filter_by(status='active').all()
    
    def resolve_alert(self, alert_id: int) -> None:
        """Mark an alert as resolved"""
        alert = Alert.query.get(alert_id)
        if alert:
            alert.status = 'resolved'
            alert.resolved_at = datetime.utcnow()
            db.session.commit()
    
    def restart_services(self) -> None:
        """Restart VPN services after configuration change"""
        # TODO: Implement service restart logic
        pass

monitoring_service = MonitoringService()