Spaces:
Sleeping
Sleeping
| """ | |
| System performance metrics implementation. | |
| Implements latency, throughput, error rate, and other system performance | |
| metrics for RAG system evaluation. | |
| """ | |
| import statistics | |
| import time | |
| from typing import Dict, List, Optional | |
| class LatencyTracker: | |
| """Track and analyze response latencies.""" | |
| def __init__(self): | |
| self.latencies: List[float] = [] | |
| self.start_times: Dict[str, float] = {} | |
| def start_timing(self, request_id: str) -> None: | |
| """Start timing a request.""" | |
| self.start_times[request_id] = time.time() | |
| def end_timing(self, request_id: str) -> Optional[float]: | |
| """End timing a request and return latency.""" | |
| if request_id not in self.start_times: | |
| return None | |
| latency = time.time() - self.start_times[request_id] | |
| self.latencies.append(latency) | |
| del self.start_times[request_id] | |
| return latency | |
| def add_measurement(self, latency: float) -> None: | |
| """Add a latency measurement directly.""" | |
| self.latencies.append(latency) | |
| def get_average(self) -> float: | |
| """Get average latency.""" | |
| return statistics.mean(self.latencies) if self.latencies else 0.0 | |
| def get_latency_stats(self) -> Dict[str, float]: | |
| """Get latency statistics.""" | |
| if not self.latencies: | |
| return { | |
| "p50": 0.0, | |
| "p95": 0.0, | |
| "p99": 0.0, | |
| "mean": 0.0, | |
| "min": 0.0, | |
| "max": 0.0, | |
| } | |
| sorted_latencies = sorted(self.latencies) | |
| n = len(sorted_latencies) | |
| return { | |
| "p50": sorted_latencies[int(n * 0.5)], | |
| "p95": sorted_latencies[int(n * 0.95)], | |
| "p99": sorted_latencies[int(n * 0.99)], | |
| "mean": statistics.mean(self.latencies), | |
| "min": min(self.latencies), | |
| "max": max(self.latencies), | |
| } | |
| def reset(self) -> None: | |
| """Reset all tracked latencies.""" | |
| self.latencies.clear() | |
| self.start_times.clear() | |
| class ThroughputTracker: | |
| """Track and analyze system throughput.""" | |
| def __init__(self, window_size: int = 60): | |
| self.window_size = window_size # seconds | |
| self.request_timestamps: List[float] = [] | |
| def record_request(self) -> None: | |
| """Record a new request timestamp.""" | |
| current_time = time.time() | |
| self.request_timestamps.append(current_time) | |
| # Clean old timestamps outside window | |
| cutoff_time = current_time - self.window_size | |
| self.request_timestamps = [ts for ts in self.request_timestamps if ts > cutoff_time] | |
| def add_request(self) -> None: | |
| """Alias for record_request for compatibility.""" | |
| self.record_request() | |
| def get_throughput(self) -> float: | |
| """Get current requests per second.""" | |
| if len(self.request_timestamps) <= 1: | |
| return 0.0 | |
| current_time = time.time() | |
| cutoff_time = current_time - self.window_size | |
| # Count requests in the last window | |
| recent_requests = sum(1 for ts in self.request_timestamps if ts > cutoff_time) | |
| return recent_requests / self.window_size | |
| def reset(self) -> None: | |
| """Reset all tracked requests.""" | |
| self.request_timestamps.clear() | |
| class ErrorTracker: | |
| """Track and analyze system errors.""" | |
| def __init__(self): | |
| self.total_requests = 0 | |
| self.error_count = 0 | |
| self.error_types: Dict[str, int] = {} | |
| def record_request(self, success: bool, error_type: Optional[str] = None) -> None: | |
| """Record a request outcome.""" | |
| self.total_requests += 1 | |
| if not success: | |
| self.error_count += 1 | |
| if error_type: | |
| self.error_types[error_type] = self.error_types.get(error_type, 0) + 1 | |
| def add_request(self) -> None: | |
| """Add a successful request for compatibility.""" | |
| self.record_request(success=True) | |
| def add_error(self, error_type: Optional[str] = None) -> None: | |
| """Add an error for compatibility.""" | |
| self.record_request(success=False, error_type=error_type) | |
| def get_error_rate(self) -> float: | |
| """Get current error rate (0.0 to 1.0).""" | |
| if self.total_requests == 0: | |
| return 0.0 | |
| return self.error_count / self.total_requests | |
| def get_error_breakdown(self) -> Dict[str, float]: | |
| """Get breakdown of error types.""" | |
| if self.error_count == 0: | |
| return {} | |
| return {error_type: count / self.error_count for error_type, count in self.error_types.items()} | |
| def reset(self) -> None: | |
| """Reset all tracked errors.""" | |
| self.total_requests = 0 | |
| self.error_count = 0 | |
| self.error_types.clear() | |
| def calculate_system_metrics( | |
| latency_tracker: LatencyTracker, | |
| throughput_tracker: ThroughputTracker, | |
| error_tracker: ErrorTracker, | |
| ) -> Dict[str, float]: | |
| """ | |
| Calculate comprehensive system performance metrics. | |
| Args: | |
| latency_tracker: Latency tracking instance | |
| throughput_tracker: Throughput tracking instance | |
| error_tracker: Error tracking instance | |
| Returns: | |
| Dictionary containing all system metrics | |
| """ | |
| latency_stats = latency_tracker.get_latency_stats() | |
| return { | |
| # Latency metrics | |
| "latency_p50": latency_stats["p50"], | |
| "latency_p95": latency_stats["p95"], | |
| "latency_p99": latency_stats["p99"], | |
| "latency_mean": latency_stats["mean"], | |
| "latency_min": latency_stats["min"], | |
| "latency_max": latency_stats["max"], | |
| # Throughput metrics | |
| "throughput_rps": throughput_tracker.get_throughput(), | |
| # Error metrics | |
| "error_rate": error_tracker.get_error_rate(), | |
| "total_requests": float(error_tracker.total_requests), | |
| "error_count": float(error_tracker.error_count), | |
| } | |