File size: 2,241 Bytes
2299bb4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""
Monitor - logs, queue size, errors tracking.
"""
from typing import Dict, List
from datetime import datetime
from collections import deque


class SystemMonitor:
    """
    System monitor - tracks metrics, logs, and errors.
    """
    
    def __init__(self, max_log_entries: int = 1000):
        self._queue_size = 0
        self._processed_count = 0
        self._batch_count = 0
        self._error_count = 0
        self._errors: deque = deque(maxlen=100)
        self._max_log = max_log_entries
    
    def increment_queue_size(self, delta: int = 1):
        """Increment queue size counter."""
        self._queue_size += delta
    
    def decrement_queue_size(self, delta: int = 1):
        """Decrement queue size counter."""
        self._queue_size = max(0, self._queue_size - delta)
    
    def increment_processed(self, delta: int = 1):
        """Increment processed messages counter."""
        self._processed_count += delta
    
    def increment_batches(self, delta: int = 1):
        """Increment batch count."""
        self._batch_count += delta
    
    def increment_errors(self, delta: int = 1):
        """Increment error counter."""
        self._error_count += delta
    
    def log_error(self, message: str):
        """Log an error message."""
        self._errors.append({
            "timestamp": datetime.utcnow().isoformat(),
            "message": message
        })
        self.increment_errors()
    
    def get_stats(self) -> Dict:
        """Get current statistics."""
        return {
            "queue_size": self._queue_size,
            "processed_total": self._processed_count,
            "batches_processed": self._batch_count,
            "error_count": self._error_count,
            "error_rate": self._error_count / max(1, self._processed_count)
        }
    
    def get_recent_errors(self, limit: int = 10) -> List[Dict]:
        """Get recent error entries."""
        return list(self._errors)[-limit:]
    
    def reset(self):
        """Reset all counters."""
        self._queue_size = 0
        self._processed_count = 0
        self._batch_count = 0
        self._error_count = 0
        self._errors.clear()


# Global monitor instance
system_monitor = SystemMonitor()