GitHub Action
Clean deployment without binary files
f884e6e
"""
System performance metrics implementation.
Implements latency, throughput, error rate, and other system performance
metrics for RAG system evaluation.
"""
import statistics
import time
from typing import Dict, List, Optional
class LatencyTracker:
"""Track and analyze response latencies."""
def __init__(self):
self.latencies: List[float] = []
self.start_times: Dict[str, float] = {}
def start_timing(self, request_id: str) -> None:
"""Start timing a request."""
self.start_times[request_id] = time.time()
def end_timing(self, request_id: str) -> Optional[float]:
"""End timing a request and return latency."""
if request_id not in self.start_times:
return None
latency = time.time() - self.start_times[request_id]
self.latencies.append(latency)
del self.start_times[request_id]
return latency
def add_measurement(self, latency: float) -> None:
"""Add a latency measurement directly."""
self.latencies.append(latency)
def get_average(self) -> float:
"""Get average latency."""
return statistics.mean(self.latencies) if self.latencies else 0.0
def get_latency_stats(self) -> Dict[str, float]:
"""Get latency statistics."""
if not self.latencies:
return {
"p50": 0.0,
"p95": 0.0,
"p99": 0.0,
"mean": 0.0,
"min": 0.0,
"max": 0.0,
}
sorted_latencies = sorted(self.latencies)
n = len(sorted_latencies)
return {
"p50": sorted_latencies[int(n * 0.5)],
"p95": sorted_latencies[int(n * 0.95)],
"p99": sorted_latencies[int(n * 0.99)],
"mean": statistics.mean(self.latencies),
"min": min(self.latencies),
"max": max(self.latencies),
}
def reset(self) -> None:
"""Reset all tracked latencies."""
self.latencies.clear()
self.start_times.clear()
class ThroughputTracker:
"""Track and analyze system throughput."""
def __init__(self, window_size: int = 60):
self.window_size = window_size # seconds
self.request_timestamps: List[float] = []
def record_request(self) -> None:
"""Record a new request timestamp."""
current_time = time.time()
self.request_timestamps.append(current_time)
# Clean old timestamps outside window
cutoff_time = current_time - self.window_size
self.request_timestamps = [ts for ts in self.request_timestamps if ts > cutoff_time]
def add_request(self) -> None:
"""Alias for record_request for compatibility."""
self.record_request()
def get_throughput(self) -> float:
"""Get current requests per second."""
if len(self.request_timestamps) <= 1:
return 0.0
current_time = time.time()
cutoff_time = current_time - self.window_size
# Count requests in the last window
recent_requests = sum(1 for ts in self.request_timestamps if ts > cutoff_time)
return recent_requests / self.window_size
def reset(self) -> None:
"""Reset all tracked requests."""
self.request_timestamps.clear()
class ErrorTracker:
"""Track and analyze system errors."""
def __init__(self):
self.total_requests = 0
self.error_count = 0
self.error_types: Dict[str, int] = {}
def record_request(self, success: bool, error_type: Optional[str] = None) -> None:
"""Record a request outcome."""
self.total_requests += 1
if not success:
self.error_count += 1
if error_type:
self.error_types[error_type] = self.error_types.get(error_type, 0) + 1
def add_request(self) -> None:
"""Add a successful request for compatibility."""
self.record_request(success=True)
def add_error(self, error_type: Optional[str] = None) -> None:
"""Add an error for compatibility."""
self.record_request(success=False, error_type=error_type)
def get_error_rate(self) -> float:
"""Get current error rate (0.0 to 1.0)."""
if self.total_requests == 0:
return 0.0
return self.error_count / self.total_requests
def get_error_breakdown(self) -> Dict[str, float]:
"""Get breakdown of error types."""
if self.error_count == 0:
return {}
return {error_type: count / self.error_count for error_type, count in self.error_types.items()}
def reset(self) -> None:
"""Reset all tracked errors."""
self.total_requests = 0
self.error_count = 0
self.error_types.clear()
def calculate_system_metrics(
latency_tracker: LatencyTracker,
throughput_tracker: ThroughputTracker,
error_tracker: ErrorTracker,
) -> Dict[str, float]:
"""
Calculate comprehensive system performance metrics.
Args:
latency_tracker: Latency tracking instance
throughput_tracker: Throughput tracking instance
error_tracker: Error tracking instance
Returns:
Dictionary containing all system metrics
"""
latency_stats = latency_tracker.get_latency_stats()
return {
# Latency metrics
"latency_p50": latency_stats["p50"],
"latency_p95": latency_stats["p95"],
"latency_p99": latency_stats["p99"],
"latency_mean": latency_stats["mean"],
"latency_min": latency_stats["min"],
"latency_max": latency_stats["max"],
# Throughput metrics
"throughput_rps": throughput_tracker.get_throughput(),
# Error metrics
"error_rate": error_tracker.get_error_rate(),
"total_requests": float(error_tracker.total_requests),
"error_count": float(error_tracker.error_count),
}