l
File size: 10,325 Bytes
c089ca4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
#!/usr/bin/env python3
"""
Performance Monitor

Monitors system performance metrics for the NZ Legislation Loophole Analysis application.
Tracks memory usage, CPU utilization, processing times, and other performance indicators.
"""

import time
import threading
import psutil
from typing import Dict, Any, Optional, List
from collections import deque
import streamlit as st

class PerformanceMonitor:
    """Performance monitoring system"""

    def __init__(self, max_history: int = 1000):
        """
        Initialize performance monitor

        Args:
            max_history: Maximum number of historical data points to keep
        """
        self.max_history = max_history
        self.lock = threading.RLock()

        # Historical data storage
        self.memory_history = deque(maxlen=max_history)
        self.cpu_history = deque(maxlen=max_history)
        self.processing_times = deque(maxlen=max_history)

        # Current metrics
        self.current_metrics = {
            'memory_usage_mb': 0,
            'memory_percent': 0,
            'cpu_percent': 0,
            'active_threads': 0,
            'processing_time_avg': 0,
            'processing_time_max': 0,
            'processing_time_min': 0,
            'total_processed_chunks': 0,
            'chunks_per_second': 0
        }

        # Processing timing
        self.processing_start_time = None
        self.last_chunk_time = time.time()

        # Start monitoring thread
        self.monitoring = True
        self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
        self.monitor_thread.start()

    def _monitor_loop(self):
        """Background monitoring loop"""
        while self.monitoring:
            try:
                self._update_metrics()
                time.sleep(1)  # Update every second
            except Exception as e:
                print(f"Performance monitoring error: {e}")
                time.sleep(5)  # Wait longer on error

    def _update_metrics(self):
        """Update current performance metrics"""
        process = psutil.Process()

        with self.lock:
            # Memory metrics
            memory_info = process.memory_info()
            memory_usage_mb = memory_info.rss / 1024 / 1024
            memory_percent = process.memory_percent()

            # CPU metrics
            cpu_percent = process.cpu_percent(interval=0.1)

            # Thread count
            active_threads = len(process.threads())

            # Update current metrics
            self.current_metrics.update({
                'memory_usage_mb': memory_usage_mb,
                'memory_percent': memory_percent,
                'cpu_percent': cpu_percent,
                'active_threads': active_threads
            })

            # Store historical data
            current_time = time.time()
            self.memory_history.append((current_time, memory_usage_mb))
            self.cpu_history.append((current_time, cpu_percent))

    def start_processing_timer(self):
        """Start timing a processing operation"""
        self.processing_start_time = time.time()

    def end_processing_timer(self) -> float:
        """End timing and return elapsed time"""
        if self.processing_start_time is None:
            return 0

        elapsed = time.time() - self.processing_start_time
        self.processing_start_time = None

        with self.lock:
            self.processing_times.append(elapsed)

            # Update processing time statistics
            if self.processing_times:
                self.current_metrics['processing_time_avg'] = sum(self.processing_times) / len(self.processing_times)
                self.current_metrics['processing_time_max'] = max(self.processing_times)
                self.current_metrics['processing_time_min'] = min(self.processing_times)

        return elapsed

    def record_chunk_processing(self):
        """Record that a chunk has been processed"""
        current_time = time.time()

        with self.lock:
            self.current_metrics['total_processed_chunks'] += 1

            # Calculate chunks per second
            time_diff = current_time - self.last_chunk_time
            if time_diff > 0:
                current_cps = 1.0 / time_diff
                # Smooth the chunks per second calculation
                self.current_metrics['chunks_per_second'] = (
                    0.9 * self.current_metrics['chunks_per_second'] + 0.1 * current_cps
                )

            self.last_chunk_time = current_time

    def get_stats(self) -> Dict[str, Any]:
        """Get current performance statistics"""
        with self.lock:
            return self.current_metrics.copy()

    def get_memory_history(self, time_window_seconds: int = 300) -> List[tuple]:
        """Get memory usage history within time window"""
        current_time = time.time()
        cutoff_time = current_time - time_window_seconds

        with self.lock:
            return [(t, v) for t, v in self.memory_history if t >= cutoff_time]

    def get_cpu_history(self, time_window_seconds: int = 300) -> List[tuple]:
        """Get CPU usage history within time window"""
        current_time = time.time()
        cutoff_time = current_time - time_window_seconds

        with self.lock:
            return [(t, v) for t, v in self.cpu_history if t >= cutoff_time]

    def get_processing_time_stats(self) -> Dict[str, Any]:
        """Get processing time statistics"""
        with self.lock:
            if not self.processing_times:
                return {
                    'count': 0,
                    'average': 0,
                    'maximum': 0,
                    'minimum': 0,
                    'median': 0
                }

            sorted_times = sorted(self.processing_times)

            return {
                'count': len(self.processing_times),
                'average': sum(self.processing_times) / len(self.processing_times),
                'maximum': max(self.processing_times),
                'minimum': min(self.processing_times),
                'median': sorted_times[len(sorted_times) // 2]
            }

    def get_system_info(self) -> Dict[str, Any]:
        """Get system information"""
        return {
            'cpu_count': psutil.cpu_count(),
            'cpu_count_logical': psutil.cpu_count(logical=True),
            'total_memory_gb': psutil.virtual_memory().total / (1024**3),
            'available_memory_gb': psutil.virtual_memory().available / (1024**3),
            'python_version': f"{psutil.python_implementation()} {psutil.python_version()}",
            'platform': psutil.platform
        }

    def reset_stats(self):
        """Reset performance statistics"""
        with self.lock:
            self.processing_times.clear()
            self.current_metrics['total_processed_chunks'] = 0
            self.current_metrics['chunks_per_second'] = 0
            self.current_metrics['processing_time_avg'] = 0
            self.current_metrics['processing_time_max'] = 0
            self.current_metrics['processing_time_min'] = 0

    def cleanup(self):
        """Cleanup resources"""
        self.monitoring = False
        if self.monitor_thread.is_alive():
            self.monitor_thread.join(timeout=2)

    def get_performance_report(self) -> Dict[str, Any]:
        """Generate a comprehensive performance report"""
        return {
            'current_metrics': self.get_stats(),
            'processing_stats': self.get_processing_time_stats(),
            'system_info': self.get_system_info(),
            'memory_history_count': len(self.memory_history),
            'cpu_history_count': len(self.cpu_history),
            'processing_times_count': len(self.processing_times)
        }

    def check_memory_threshold(self, threshold_mb: int) -> bool:
        """Check if memory usage is above threshold"""
        return self.current_metrics['memory_usage_mb'] > threshold_mb

    def check_cpu_threshold(self, threshold_percent: float) -> bool:
        """Check if CPU usage is above threshold"""
        return self.current_metrics['cpu_percent'] > threshold_percent

    def get_recommendations(self) -> List[str]:
        """Get performance recommendations based on current metrics"""
        recommendations = []

        # Memory recommendations
        if self.current_metrics['memory_usage_mb'] > 7000:
            recommendations.append("High memory usage detected. Consider reducing batch size or chunk size.")
        elif self.current_metrics['memory_usage_mb'] > 5000:
            recommendations.append("Moderate memory usage. Monitor closely during processing.")

        # CPU recommendations
        if self.current_metrics['cpu_percent'] > 90:
            recommendations.append("High CPU usage. Consider reducing processing intensity.")
        elif self.current_metrics['cpu_percent'] > 70:
            recommendations.append("Moderate CPU usage. Processing is running optimally.")

        # Processing speed recommendations
        avg_time = self.current_metrics.get('processing_time_avg', 0)
        if avg_time > 10:
            recommendations.append("Slow processing detected. Consider using a more powerful model or optimizing settings.")
        elif avg_time > 5:
            recommendations.append("Moderate processing speed. Consider increasing batch size if memory allows.")

        # Cache recommendations
        # This would be integrated with cache manager stats
        chunks_per_second = self.current_metrics.get('chunks_per_second', 0)
        if chunks_per_second < 1:
            recommendations.append("Low processing throughput. Consider optimizing chunk size or model parameters.")

        if not recommendations:
            recommendations.append("Performance is optimal. All metrics are within normal ranges.")

        return recommendations

# Global performance monitor instance
_performance_instance = None
_performance_lock = threading.Lock()

def get_performance_monitor(max_history: int = 1000) -> PerformanceMonitor:
    """Get or create global performance monitor instance"""
    global _performance_instance

    with _performance_lock:
        if _performance_instance is None:
            _performance_instance = PerformanceMonitor(max_history)

    return _performance_instance