Spaces:
Sleeping
Sleeping
| import time | |
| import psutil | |
| import GPUtil | |
| from typing import List, Dict, Any, Optional | |
| import numpy as np | |
| import logging | |
| import threading | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| logger = logging.getLogger(__name__) | |
| class SystemEvaluator: | |
| def __init__(self): | |
| self.monitoring = False | |
| self.metrics = [] | |
| self.monitor_thread = None | |
| def start_monitoring(self): | |
| """Start system monitoring""" | |
| self.monitoring = True | |
| self.metrics = [] | |
| self.monitor_thread = threading.Thread(target=self._monitor_system) | |
| self.monitor_thread.start() | |
| logger.info("Started system monitoring") | |
| def stop_monitoring(self): | |
| """Stop system monitoring""" | |
| self.monitoring = False | |
| if self.monitor_thread: | |
| self.monitor_thread.join() | |
| logger.info("Stopped system monitoring") | |
| def _monitor_system(self): | |
| """Monitor system resources""" | |
| while self.monitoring: | |
| try: | |
| # CPU usage | |
| cpu_percent = psutil.cpu_percent(interval=1) | |
| # Memory usage | |
| memory = psutil.virtual_memory() | |
| memory_percent = memory.percent | |
| memory_used_gb = memory.used / (1024**3) | |
| # GPU usage (if available) | |
| gpu_metrics = self._get_gpu_metrics() | |
| # Disk usage | |
| disk = psutil.disk_usage('/') | |
| disk_percent = disk.percent | |
| metric = { | |
| 'timestamp': time.time(), | |
| 'cpu_percent': cpu_percent, | |
| 'memory_percent': memory_percent, | |
| 'memory_used_gb': memory_used_gb, | |
| 'disk_percent': disk_percent, | |
| **gpu_metrics | |
| } | |
| self.metrics.append(metric) | |
| except Exception as e: | |
| logger.error(f"Error monitoring system: {e}") | |
| time.sleep(1) # Monitor every second | |
| def _get_gpu_metrics(self) -> Dict[str, Any]: | |
| """Get GPU metrics""" | |
| try: | |
| gpus = GPUtil.getGPUs() | |
| if gpus: | |
| gpu = gpus[0] # Use first GPU | |
| return { | |
| 'gpu_utilization': gpu.load * 100, | |
| 'gpu_memory_used': gpu.memoryUsed, | |
| 'gpu_memory_total': gpu.memoryTotal, | |
| 'gpu_memory_percent': (gpu.memoryUsed / gpu.memoryTotal) * 100, | |
| 'gpu_temperature': gpu.temperature | |
| } | |
| except: | |
| pass | |
| return { | |
| 'gpu_utilization': 0, | |
| 'gpu_memory_used': 0, | |
| 'gpu_memory_total': 0, | |
| 'gpu_memory_percent': 0, | |
| 'gpu_temperature': 0 | |
| } | |
| def measure_throughput(self, func, args_list: List[tuple], | |
| max_workers: int = 4) -> Dict[str, Any]: | |
| """Measure throughput of a function""" | |
| start_time = time.time() | |
| # Execute function with different concurrency levels | |
| results = [] | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| futures = [executor.submit(func, *args) for args in args_list] | |
| for future in as_completed(futures): | |
| try: | |
| result = future.result() | |
| results.append(result) | |
| except Exception as e: | |
| logger.error(f"Error in throughput measurement: {e}") | |
| end_time = time.time() | |
| total_time = end_time - start_time | |
| throughput = len(results) / total_time # queries per second | |
| return { | |
| 'total_queries': len(args_list), | |
| 'successful_queries': len(results), | |
| 'total_time': total_time, | |
| 'throughput_qps': throughput, | |
| 'avg_time_per_query': total_time / len(args_list) if args_list else 0 | |
| } | |
| def measure_latency(self, func, args: tuple, num_runs: int = 10) -> Dict[str, Any]: | |
| """Measure latency of a function""" | |
| latencies = [] | |
| for _ in range(num_runs): | |
| start_time = time.time() | |
| try: | |
| result = func(*args) | |
| end_time = time.time() | |
| latency = end_time - start_time | |
| latencies.append(latency) | |
| except Exception as e: | |
| logger.error(f"Error in latency measurement: {e}") | |
| latencies.append(float('inf')) | |
| # Remove infinite latencies | |
| latencies = [l for l in latencies if l != float('inf')] | |
| if not latencies: | |
| return { | |
| 'avg_latency': 0, | |
| 'p50_latency': 0, | |
| 'p95_latency': 0, | |
| 'p99_latency': 0, | |
| 'min_latency': 0, | |
| 'max_latency': 0, | |
| 'std_latency': 0 | |
| } | |
| latencies = np.array(latencies) | |
| return { | |
| 'avg_latency': np.mean(latencies), | |
| 'p50_latency': np.percentile(latencies, 50), | |
| 'p95_latency': np.percentile(latencies, 95), | |
| 'p99_latency': np.percentile(latencies, 99), | |
| 'min_latency': np.min(latencies), | |
| 'max_latency': np.max(latencies), | |
| 'std_latency': np.std(latencies) | |
| } | |
| def measure_batch_latency(self, func, args_list: List[tuple], | |
| batch_sizes: List[int] = [1, 4, 8, 16]) -> Dict[str, Any]: | |
| """Measure latency for different batch sizes""" | |
| results = {} | |
| for batch_size in batch_sizes: | |
| batch_latencies = [] | |
| # Process in batches | |
| for i in range(0, len(args_list), batch_size): | |
| batch_args = args_list[i:i + batch_size] | |
| start_time = time.time() | |
| try: | |
| batch_results = [func(*args) for args in batch_args] | |
| end_time = time.time() | |
| batch_latency = end_time - start_time | |
| batch_latencies.append(batch_latency) | |
| except Exception as e: | |
| logger.error(f"Error in batch latency measurement: {e}") | |
| if batch_latencies: | |
| results[f'batch_size_{batch_size}'] = { | |
| 'avg_latency': np.mean(batch_latencies), | |
| 'p95_latency': np.percentile(batch_latencies, 95), | |
| 'throughput': batch_size / np.mean(batch_latencies) | |
| } | |
| return results | |
| def get_system_stats(self) -> Dict[str, Any]: | |
| """Get current system statistics""" | |
| if not self.metrics: | |
| return {} | |
| # Calculate statistics from monitoring data | |
| cpu_values = [m['cpu_percent'] for m in self.metrics] | |
| memory_values = [m['memory_percent'] for m in self.metrics] | |
| gpu_values = [m.get('gpu_utilization', 0) for m in self.metrics] | |
| return { | |
| 'monitoring_duration': len(self.metrics), | |
| 'cpu': { | |
| 'avg': np.mean(cpu_values), | |
| 'max': np.max(cpu_values), | |
| 'min': np.min(cpu_values), | |
| 'std': np.std(cpu_values) | |
| }, | |
| 'memory': { | |
| 'avg': np.mean(memory_values), | |
| 'max': np.max(memory_values), | |
| 'min': np.min(memory_values), | |
| 'std': np.std(memory_values) | |
| }, | |
| 'gpu': { | |
| 'avg': np.mean(gpu_values), | |
| 'max': np.max(gpu_values), | |
| 'min': np.min(gpu_values), | |
| 'std': np.std(gpu_values) | |
| } | |
| } | |
| def evaluate_retrieval_performance(self, retriever, queries: List[str], | |
| k: int = 10) -> Dict[str, Any]: | |
| """Evaluate retrieval performance""" | |
| # Measure latency | |
| latency_stats = self.measure_latency( | |
| retriever.retrieve_single, | |
| (queries[0], k), | |
| num_runs=5 | |
| ) | |
| # Measure throughput | |
| throughput_stats = self.measure_throughput( | |
| retriever.retrieve_single, | |
| [(query, k) for query in queries[:10]], # Limit for throughput test | |
| max_workers=4 | |
| ) | |
| return { | |
| 'latency': latency_stats, | |
| 'throughput': throughput_stats | |
| } | |
| def evaluate_generation_performance(self, generator, questions: List[str], | |
| passages_list: List[List[Dict[str, Any]]]) -> Dict[str, Any]: | |
| """Evaluate generation performance""" | |
| # Measure latency | |
| latency_stats = self.measure_latency( | |
| generator.generate_with_strategy, | |
| (questions[0], passages_list[0]), | |
| num_runs=5 | |
| ) | |
| # Measure throughput | |
| throughput_stats = self.measure_throughput( | |
| generator.generate_with_strategy, | |
| list(zip(questions[:5], passages_list[:5])), # Limit for throughput test | |
| max_workers=2 | |
| ) | |
| return { | |
| 'latency': latency_stats, | |
| 'throughput': throughput_stats | |
| } | |
| def evaluate_end_to_end_performance(self, rag_system, queries: List[str]) -> Dict[str, Any]: | |
| """Evaluate end-to-end RAG performance""" | |
| # Measure latency | |
| latency_stats = self.measure_latency( | |
| rag_system.query, | |
| (queries[0],), | |
| num_runs=5 | |
| ) | |
| # Measure throughput | |
| throughput_stats = self.measure_throughput( | |
| rag_system.query, | |
| [(query,) for query in queries[:10]], # Limit for throughput test | |
| max_workers=2 | |
| ) | |
| return { | |
| 'latency': latency_stats, | |
| 'throughput': throughput_stats | |
| } | |