safe_rag / eval /eval_system.py
Tairun Meng
Initial commit: SafeRAG project ready for HF Spaces
db06013
import time
import psutil
import GPUtil
from typing import List, Dict, Any, Optional
import numpy as np
import logging
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
logger = logging.getLogger(__name__)
class SystemEvaluator:
def __init__(self):
self.monitoring = False
self.metrics = []
self.monitor_thread = None
def start_monitoring(self):
"""Start system monitoring"""
self.monitoring = True
self.metrics = []
self.monitor_thread = threading.Thread(target=self._monitor_system)
self.monitor_thread.start()
logger.info("Started system monitoring")
def stop_monitoring(self):
"""Stop system monitoring"""
self.monitoring = False
if self.monitor_thread:
self.monitor_thread.join()
logger.info("Stopped system monitoring")
def _monitor_system(self):
"""Monitor system resources"""
while self.monitoring:
try:
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
# Memory usage
memory = psutil.virtual_memory()
memory_percent = memory.percent
memory_used_gb = memory.used / (1024**3)
# GPU usage (if available)
gpu_metrics = self._get_gpu_metrics()
# Disk usage
disk = psutil.disk_usage('/')
disk_percent = disk.percent
metric = {
'timestamp': time.time(),
'cpu_percent': cpu_percent,
'memory_percent': memory_percent,
'memory_used_gb': memory_used_gb,
'disk_percent': disk_percent,
**gpu_metrics
}
self.metrics.append(metric)
except Exception as e:
logger.error(f"Error monitoring system: {e}")
time.sleep(1) # Monitor every second
def _get_gpu_metrics(self) -> Dict[str, Any]:
"""Get GPU metrics"""
try:
gpus = GPUtil.getGPUs()
if gpus:
gpu = gpus[0] # Use first GPU
return {
'gpu_utilization': gpu.load * 100,
'gpu_memory_used': gpu.memoryUsed,
'gpu_memory_total': gpu.memoryTotal,
'gpu_memory_percent': (gpu.memoryUsed / gpu.memoryTotal) * 100,
'gpu_temperature': gpu.temperature
}
except:
pass
return {
'gpu_utilization': 0,
'gpu_memory_used': 0,
'gpu_memory_total': 0,
'gpu_memory_percent': 0,
'gpu_temperature': 0
}
def measure_throughput(self, func, args_list: List[tuple],
max_workers: int = 4) -> Dict[str, Any]:
"""Measure throughput of a function"""
start_time = time.time()
# Execute function with different concurrency levels
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(func, *args) for args in args_list]
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
logger.error(f"Error in throughput measurement: {e}")
end_time = time.time()
total_time = end_time - start_time
throughput = len(results) / total_time # queries per second
return {
'total_queries': len(args_list),
'successful_queries': len(results),
'total_time': total_time,
'throughput_qps': throughput,
'avg_time_per_query': total_time / len(args_list) if args_list else 0
}
def measure_latency(self, func, args: tuple, num_runs: int = 10) -> Dict[str, Any]:
"""Measure latency of a function"""
latencies = []
for _ in range(num_runs):
start_time = time.time()
try:
result = func(*args)
end_time = time.time()
latency = end_time - start_time
latencies.append(latency)
except Exception as e:
logger.error(f"Error in latency measurement: {e}")
latencies.append(float('inf'))
# Remove infinite latencies
latencies = [l for l in latencies if l != float('inf')]
if not latencies:
return {
'avg_latency': 0,
'p50_latency': 0,
'p95_latency': 0,
'p99_latency': 0,
'min_latency': 0,
'max_latency': 0,
'std_latency': 0
}
latencies = np.array(latencies)
return {
'avg_latency': np.mean(latencies),
'p50_latency': np.percentile(latencies, 50),
'p95_latency': np.percentile(latencies, 95),
'p99_latency': np.percentile(latencies, 99),
'min_latency': np.min(latencies),
'max_latency': np.max(latencies),
'std_latency': np.std(latencies)
}
def measure_batch_latency(self, func, args_list: List[tuple],
batch_sizes: List[int] = [1, 4, 8, 16]) -> Dict[str, Any]:
"""Measure latency for different batch sizes"""
results = {}
for batch_size in batch_sizes:
batch_latencies = []
# Process in batches
for i in range(0, len(args_list), batch_size):
batch_args = args_list[i:i + batch_size]
start_time = time.time()
try:
batch_results = [func(*args) for args in batch_args]
end_time = time.time()
batch_latency = end_time - start_time
batch_latencies.append(batch_latency)
except Exception as e:
logger.error(f"Error in batch latency measurement: {e}")
if batch_latencies:
results[f'batch_size_{batch_size}'] = {
'avg_latency': np.mean(batch_latencies),
'p95_latency': np.percentile(batch_latencies, 95),
'throughput': batch_size / np.mean(batch_latencies)
}
return results
def get_system_stats(self) -> Dict[str, Any]:
"""Get current system statistics"""
if not self.metrics:
return {}
# Calculate statistics from monitoring data
cpu_values = [m['cpu_percent'] for m in self.metrics]
memory_values = [m['memory_percent'] for m in self.metrics]
gpu_values = [m.get('gpu_utilization', 0) for m in self.metrics]
return {
'monitoring_duration': len(self.metrics),
'cpu': {
'avg': np.mean(cpu_values),
'max': np.max(cpu_values),
'min': np.min(cpu_values),
'std': np.std(cpu_values)
},
'memory': {
'avg': np.mean(memory_values),
'max': np.max(memory_values),
'min': np.min(memory_values),
'std': np.std(memory_values)
},
'gpu': {
'avg': np.mean(gpu_values),
'max': np.max(gpu_values),
'min': np.min(gpu_values),
'std': np.std(gpu_values)
}
}
def evaluate_retrieval_performance(self, retriever, queries: List[str],
k: int = 10) -> Dict[str, Any]:
"""Evaluate retrieval performance"""
# Measure latency
latency_stats = self.measure_latency(
retriever.retrieve_single,
(queries[0], k),
num_runs=5
)
# Measure throughput
throughput_stats = self.measure_throughput(
retriever.retrieve_single,
[(query, k) for query in queries[:10]], # Limit for throughput test
max_workers=4
)
return {
'latency': latency_stats,
'throughput': throughput_stats
}
def evaluate_generation_performance(self, generator, questions: List[str],
passages_list: List[List[Dict[str, Any]]]) -> Dict[str, Any]:
"""Evaluate generation performance"""
# Measure latency
latency_stats = self.measure_latency(
generator.generate_with_strategy,
(questions[0], passages_list[0]),
num_runs=5
)
# Measure throughput
throughput_stats = self.measure_throughput(
generator.generate_with_strategy,
list(zip(questions[:5], passages_list[:5])), # Limit for throughput test
max_workers=2
)
return {
'latency': latency_stats,
'throughput': throughput_stats
}
def evaluate_end_to_end_performance(self, rag_system, queries: List[str]) -> Dict[str, Any]:
"""Evaluate end-to-end RAG performance"""
# Measure latency
latency_stats = self.measure_latency(
rag_system.query,
(queries[0],),
num_runs=5
)
# Measure throughput
throughput_stats = self.measure_throughput(
rag_system.query,
[(query,) for query in queries[:10]], # Limit for throughput test
max_workers=2
)
return {
'latency': latency_stats,
'throughput': throughput_stats
}