import time import psutil import GPUtil from typing import List, Dict, Any, Optional import numpy as np import logging import threading from concurrent.futures import ThreadPoolExecutor, as_completed logger = logging.getLogger(__name__) class SystemEvaluator: def __init__(self): self.monitoring = False self.metrics = [] self.monitor_thread = None def start_monitoring(self): """Start system monitoring""" self.monitoring = True self.metrics = [] self.monitor_thread = threading.Thread(target=self._monitor_system) self.monitor_thread.start() logger.info("Started system monitoring") def stop_monitoring(self): """Stop system monitoring""" self.monitoring = False if self.monitor_thread: self.monitor_thread.join() logger.info("Stopped system monitoring") def _monitor_system(self): """Monitor system resources""" while self.monitoring: try: # CPU usage cpu_percent = psutil.cpu_percent(interval=1) # Memory usage memory = psutil.virtual_memory() memory_percent = memory.percent memory_used_gb = memory.used / (1024**3) # GPU usage (if available) gpu_metrics = self._get_gpu_metrics() # Disk usage disk = psutil.disk_usage('/') disk_percent = disk.percent metric = { 'timestamp': time.time(), 'cpu_percent': cpu_percent, 'memory_percent': memory_percent, 'memory_used_gb': memory_used_gb, 'disk_percent': disk_percent, **gpu_metrics } self.metrics.append(metric) except Exception as e: logger.error(f"Error monitoring system: {e}") time.sleep(1) # Monitor every second def _get_gpu_metrics(self) -> Dict[str, Any]: """Get GPU metrics""" try: gpus = GPUtil.getGPUs() if gpus: gpu = gpus[0] # Use first GPU return { 'gpu_utilization': gpu.load * 100, 'gpu_memory_used': gpu.memoryUsed, 'gpu_memory_total': gpu.memoryTotal, 'gpu_memory_percent': (gpu.memoryUsed / gpu.memoryTotal) * 100, 'gpu_temperature': gpu.temperature } except: pass return { 'gpu_utilization': 0, 'gpu_memory_used': 0, 'gpu_memory_total': 0, 'gpu_memory_percent': 0, 'gpu_temperature': 0 } def measure_throughput(self, func, args_list: List[tuple], max_workers: int = 4) -> Dict[str, Any]: """Measure throughput of a function""" start_time = time.time() # Execute function with different concurrency levels results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [executor.submit(func, *args) for args in args_list] for future in as_completed(futures): try: result = future.result() results.append(result) except Exception as e: logger.error(f"Error in throughput measurement: {e}") end_time = time.time() total_time = end_time - start_time throughput = len(results) / total_time # queries per second return { 'total_queries': len(args_list), 'successful_queries': len(results), 'total_time': total_time, 'throughput_qps': throughput, 'avg_time_per_query': total_time / len(args_list) if args_list else 0 } def measure_latency(self, func, args: tuple, num_runs: int = 10) -> Dict[str, Any]: """Measure latency of a function""" latencies = [] for _ in range(num_runs): start_time = time.time() try: result = func(*args) end_time = time.time() latency = end_time - start_time latencies.append(latency) except Exception as e: logger.error(f"Error in latency measurement: {e}") latencies.append(float('inf')) # Remove infinite latencies latencies = [l for l in latencies if l != float('inf')] if not latencies: return { 'avg_latency': 0, 'p50_latency': 0, 'p95_latency': 0, 'p99_latency': 0, 'min_latency': 0, 'max_latency': 0, 'std_latency': 0 } latencies = np.array(latencies) return { 'avg_latency': np.mean(latencies), 'p50_latency': np.percentile(latencies, 50), 'p95_latency': np.percentile(latencies, 95), 'p99_latency': np.percentile(latencies, 99), 'min_latency': np.min(latencies), 'max_latency': np.max(latencies), 'std_latency': np.std(latencies) } def measure_batch_latency(self, func, args_list: List[tuple], batch_sizes: List[int] = [1, 4, 8, 16]) -> Dict[str, Any]: """Measure latency for different batch sizes""" results = {} for batch_size in batch_sizes: batch_latencies = [] # Process in batches for i in range(0, len(args_list), batch_size): batch_args = args_list[i:i + batch_size] start_time = time.time() try: batch_results = [func(*args) for args in batch_args] end_time = time.time() batch_latency = end_time - start_time batch_latencies.append(batch_latency) except Exception as e: logger.error(f"Error in batch latency measurement: {e}") if batch_latencies: results[f'batch_size_{batch_size}'] = { 'avg_latency': np.mean(batch_latencies), 'p95_latency': np.percentile(batch_latencies, 95), 'throughput': batch_size / np.mean(batch_latencies) } return results def get_system_stats(self) -> Dict[str, Any]: """Get current system statistics""" if not self.metrics: return {} # Calculate statistics from monitoring data cpu_values = [m['cpu_percent'] for m in self.metrics] memory_values = [m['memory_percent'] for m in self.metrics] gpu_values = [m.get('gpu_utilization', 0) for m in self.metrics] return { 'monitoring_duration': len(self.metrics), 'cpu': { 'avg': np.mean(cpu_values), 'max': np.max(cpu_values), 'min': np.min(cpu_values), 'std': np.std(cpu_values) }, 'memory': { 'avg': np.mean(memory_values), 'max': np.max(memory_values), 'min': np.min(memory_values), 'std': np.std(memory_values) }, 'gpu': { 'avg': np.mean(gpu_values), 'max': np.max(gpu_values), 'min': np.min(gpu_values), 'std': np.std(gpu_values) } } def evaluate_retrieval_performance(self, retriever, queries: List[str], k: int = 10) -> Dict[str, Any]: """Evaluate retrieval performance""" # Measure latency latency_stats = self.measure_latency( retriever.retrieve_single, (queries[0], k), num_runs=5 ) # Measure throughput throughput_stats = self.measure_throughput( retriever.retrieve_single, [(query, k) for query in queries[:10]], # Limit for throughput test max_workers=4 ) return { 'latency': latency_stats, 'throughput': throughput_stats } def evaluate_generation_performance(self, generator, questions: List[str], passages_list: List[List[Dict[str, Any]]]) -> Dict[str, Any]: """Evaluate generation performance""" # Measure latency latency_stats = self.measure_latency( generator.generate_with_strategy, (questions[0], passages_list[0]), num_runs=5 ) # Measure throughput throughput_stats = self.measure_throughput( generator.generate_with_strategy, list(zip(questions[:5], passages_list[:5])), # Limit for throughput test max_workers=2 ) return { 'latency': latency_stats, 'throughput': throughput_stats } def evaluate_end_to_end_performance(self, rag_system, queries: List[str]) -> Dict[str, Any]: """Evaluate end-to-end RAG performance""" # Measure latency latency_stats = self.measure_latency( rag_system.query, (queries[0],), num_runs=5 ) # Measure throughput throughput_stats = self.measure_throughput( rag_system.query, [(query,) for query in queries[:10]], # Limit for throughput test max_workers=2 ) return { 'latency': latency_stats, 'throughput': throughput_stats }