Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Latency Monitoring and Testing Utilities | |
| Comprehensive tools for monitoring, testing, and benchmarking latency optimizations | |
| in the RAG pipeline. | |
| """ | |
| import json | |
| import logging | |
| import statistics | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from dataclasses import asdict, dataclass | |
| from typing import Any, Dict, List | |
| logger = logging.getLogger(__name__) | |
| class LatencyBenchmarkResult: | |
| """Results from latency benchmark tests.""" | |
| test_name: str | |
| total_requests: int | |
| successful_requests: int | |
| failed_requests: int | |
| # Timing statistics | |
| mean_latency: float | |
| median_latency: float | |
| p95_latency: float | |
| p99_latency: float | |
| min_latency: float | |
| max_latency: float | |
| # Optimization metrics | |
| cache_hit_rate: float | |
| compression_rate: float | |
| optimization_savings: float | |
| # Performance tiers | |
| fast_responses: int # < 1s | |
| normal_responses: int # 1-3s | |
| slow_responses: int # > 3s | |
| # Test metadata | |
| test_duration: float | |
| timestamp: float | |
| class LatencyMonitor: | |
| """Real-time latency monitoring and alerting.""" | |
| def __init__(self, alert_threshold: float = 5.0, warning_threshold: float = 3.0, sample_window: int = 100): | |
| """ | |
| Initialize latency monitor. | |
| Args: | |
| alert_threshold: Latency threshold for alerts (seconds) | |
| warning_threshold: Latency threshold for warnings (seconds) | |
| sample_window: Number of recent samples to keep for analysis | |
| """ | |
| self.alert_threshold = alert_threshold | |
| self.warning_threshold = warning_threshold | |
| self.sample_window = sample_window | |
| # Circular buffer for recent latencies | |
| self._latency_samples: List[float] = [] | |
| self._alert_count = 0 | |
| self._warning_count = 0 | |
| self._total_requests = 0 | |
| # Performance tracking | |
| self._start_time = time.time() | |
| self._optimization_metrics = { | |
| "cache_hits": 0, | |
| "cache_misses": 0, | |
| "compression_savings": 0.0, | |
| "fast_responses": 0, | |
| "normal_responses": 0, | |
| "slow_responses": 0, | |
| } | |
| logger.info(f"LatencyMonitor initialized (warn: {warning_threshold}s, alert: {alert_threshold}s)") | |
| def record_request( | |
| self, latency: float, cache_hit: bool = False, compressed: bool = False, compression_savings: float = 0.0 | |
| ): | |
| """ | |
| Record a request for latency monitoring. | |
| Args: | |
| latency: Request latency in seconds | |
| cache_hit: Whether the request was served from cache | |
| compressed: Whether context compression was used | |
| compression_savings: Amount of compression savings in characters | |
| """ | |
| self._total_requests += 1 | |
| # Add to circular buffer | |
| self._latency_samples.append(latency) | |
| if len(self._latency_samples) > self.sample_window: | |
| self._latency_samples.pop(0) | |
| # Update optimization metrics | |
| if cache_hit: | |
| self._optimization_metrics["cache_hits"] += 1 | |
| else: | |
| self._optimization_metrics["cache_misses"] += 1 | |
| if compressed: | |
| self._optimization_metrics["compression_savings"] += compression_savings | |
| # Performance tier tracking | |
| if latency < 1.0: | |
| self._optimization_metrics["fast_responses"] += 1 | |
| elif latency < 3.0: | |
| self._optimization_metrics["normal_responses"] += 1 | |
| else: | |
| self._optimization_metrics["slow_responses"] += 1 | |
| # Check thresholds | |
| if latency >= self.alert_threshold: | |
| self._alert_count += 1 | |
| logger.error(f"🚨 LATENCY ALERT: {latency:.2f}s (threshold: {self.alert_threshold}s)") | |
| elif latency >= self.warning_threshold: | |
| self._warning_count += 1 | |
| logger.warning(f"⚠️ LATENCY WARNING: {latency:.2f}s (threshold: {self.warning_threshold}s)") | |
| def get_current_stats(self) -> Dict[str, Any]: | |
| """Get current monitoring statistics.""" | |
| if not self._latency_samples: | |
| return {"status": "no_data"} | |
| samples = self._latency_samples.copy() | |
| return { | |
| "total_requests": self._total_requests, | |
| "sample_count": len(samples), | |
| "uptime": time.time() - self._start_time, | |
| # Latency statistics | |
| "current_mean": statistics.mean(samples), | |
| "current_median": statistics.median(samples), | |
| "current_p95": sorted(samples)[int(len(samples) * 0.95)] if samples else 0, | |
| "min_latency": min(samples), | |
| "max_latency": max(samples), | |
| # Alert statistics | |
| "alert_count": self._alert_count, | |
| "warning_count": self._warning_count, | |
| "alert_rate": self._alert_count / self._total_requests if self._total_requests > 0 else 0, | |
| # Optimization statistics | |
| "cache_hit_rate": ( | |
| self._optimization_metrics["cache_hits"] | |
| / (self._optimization_metrics["cache_hits"] + self._optimization_metrics["cache_misses"]) | |
| if (self._optimization_metrics["cache_hits"] + self._optimization_metrics["cache_misses"]) > 0 | |
| else 0 | |
| ), | |
| "compression_savings": self._optimization_metrics["compression_savings"], | |
| "performance_distribution": { | |
| "fast": self._optimization_metrics["fast_responses"], | |
| "normal": self._optimization_metrics["normal_responses"], | |
| "slow": self._optimization_metrics["slow_responses"], | |
| }, | |
| } | |
| def is_healthy(self) -> bool: | |
| """Check if current performance is healthy.""" | |
| if not self._latency_samples: | |
| return True # No data yet | |
| recent_samples = self._latency_samples[-10:] # Last 10 requests | |
| if not recent_samples: | |
| return True | |
| recent_mean = statistics.mean(recent_samples) | |
| recent_p95 = sorted(recent_samples)[int(len(recent_samples) * 0.95)] | |
| # Healthy if recent performance is good | |
| return recent_mean < self.warning_threshold and recent_p95 < self.alert_threshold | |
| def reset_stats(self): | |
| """Reset monitoring statistics.""" | |
| self._latency_samples.clear() | |
| self._alert_count = 0 | |
| self._warning_count = 0 | |
| self._total_requests = 0 | |
| self._start_time = time.time() | |
| self._optimization_metrics = { | |
| "cache_hits": 0, | |
| "cache_misses": 0, | |
| "compression_savings": 0.0, | |
| "fast_responses": 0, | |
| "normal_responses": 0, | |
| "slow_responses": 0, | |
| } | |
| logger.info("LatencyMonitor statistics reset") | |
| class LatencyBenchmark: | |
| """Comprehensive latency benchmarking and testing.""" | |
| def __init__(self, rag_pipeline=None): | |
| """ | |
| Initialize benchmark runner. | |
| Args: | |
| rag_pipeline: RAG pipeline instance to benchmark | |
| """ | |
| self.rag_pipeline = rag_pipeline | |
| self.monitor = LatencyMonitor() | |
| def run_single_query_benchmark(self, query: str, iterations: int = 10, warm_up: int = 2) -> Dict[str, Any]: | |
| """ | |
| Benchmark a single query with multiple iterations. | |
| Args: | |
| query: Query to benchmark | |
| iterations: Number of benchmark iterations | |
| warm_up: Number of warm-up iterations (not counted) | |
| Returns: | |
| Benchmark results dictionary | |
| """ | |
| logger.info(f"Running single query benchmark: '{query[:50]}...' ({iterations} iterations)") | |
| # Warm-up iterations | |
| if warm_up > 0: | |
| logger.debug(f"Running {warm_up} warm-up iterations...") | |
| for _ in range(warm_up): | |
| try: | |
| if self.rag_pipeline: | |
| self.rag_pipeline.generate_answer(query) | |
| else: | |
| time.sleep(0.1) # Mock processing | |
| except Exception as e: | |
| logger.warning(f"Warm-up iteration failed: {e}") | |
| # Actual benchmark iterations | |
| latencies = [] | |
| cache_hits = 0 | |
| compressions = 0 | |
| failures = 0 | |
| start_time = time.time() | |
| for i in range(iterations): | |
| try: | |
| iter_start = time.time() | |
| if self.rag_pipeline: | |
| response = self.rag_pipeline.generate_answer(query) | |
| # Extract optimization metadata if available | |
| if hasattr(response, "cache_hit") and response.cache_hit: | |
| cache_hits += 1 | |
| if hasattr(response, "context_compressed") and response.context_compressed: | |
| compressions += 1 | |
| else: | |
| # Mock processing with some variation | |
| time.sleep(0.5 + (i % 3) * 0.1) | |
| latency = time.time() - iter_start | |
| latencies.append(latency) | |
| # Record in monitor | |
| self.monitor.record_request( | |
| latency=latency, | |
| cache_hit=(i > 0 and i % 3 == 0), # Mock cache hits | |
| compressed=(i % 2 == 0), # Mock compression | |
| compression_savings=100.0 if i % 2 == 0 else 0.0, | |
| ) | |
| logger.debug(f"Iteration {i+1}/{iterations}: {latency:.3f}s") | |
| except Exception as e: | |
| failures += 1 | |
| logger.error(f"Benchmark iteration {i+1} failed: {e}") | |
| total_time = time.time() - start_time | |
| if not latencies: | |
| return {"error": "No successful iterations"} | |
| # Calculate statistics | |
| latencies.sort() | |
| return { | |
| "query": query, | |
| "iterations": iterations, | |
| "successful_iterations": len(latencies), | |
| "failed_iterations": failures, | |
| "total_time": total_time, | |
| # Latency statistics | |
| "mean_latency": statistics.mean(latencies), | |
| "median_latency": statistics.median(latencies), | |
| "p95_latency": latencies[int(len(latencies) * 0.95)], | |
| "p99_latency": latencies[int(len(latencies) * 0.99)], | |
| "min_latency": min(latencies), | |
| "max_latency": max(latencies), | |
| # Optimization statistics | |
| "cache_hit_rate": cache_hits / len(latencies), | |
| "compression_rate": compressions / len(latencies), | |
| # Raw data | |
| "latencies": latencies, | |
| } | |
| def run_multi_query_benchmark( | |
| self, queries: List[str], concurrent_users: int = 1, iterations_per_query: int = 5 | |
| ) -> LatencyBenchmarkResult: | |
| """ | |
| Benchmark multiple queries with optional concurrency. | |
| Args: | |
| queries: List of queries to benchmark | |
| concurrent_users: Number of concurrent users to simulate | |
| iterations_per_query: Iterations per query | |
| Returns: | |
| LatencyBenchmarkResult with comprehensive statistics | |
| """ | |
| logger.info( | |
| f"Running multi-query benchmark: {len(queries)} queries, " | |
| f"{concurrent_users} concurrent users, {iterations_per_query} iterations each" | |
| ) | |
| all_latencies = [] | |
| successful_requests = 0 | |
| failed_requests = 0 | |
| cache_hits = 0 | |
| compressions = 0 | |
| start_time = time.time() | |
| if concurrent_users == 1: | |
| # Sequential execution | |
| for query in queries: | |
| result = self.run_single_query_benchmark(query, iterations_per_query, warm_up=0) | |
| if "latencies" in result: | |
| all_latencies.extend(result["latencies"]) | |
| successful_requests += result["successful_iterations"] | |
| failed_requests += result["failed_iterations"] | |
| cache_hits += int(result["cache_hit_rate"] * result["successful_iterations"]) | |
| compressions += int(result["compression_rate"] * result["successful_iterations"]) | |
| else: | |
| # Concurrent execution | |
| with ThreadPoolExecutor(max_workers=concurrent_users) as executor: | |
| # Submit all query-iteration combinations | |
| futures = [] | |
| for query in queries: | |
| for _ in range(iterations_per_query): | |
| future = executor.submit(self._execute_single_query, query) | |
| futures.append(future) | |
| # Collect results | |
| for future in as_completed(futures): | |
| try: | |
| result = future.result(timeout=30) | |
| all_latencies.append(result["latency"]) | |
| successful_requests += 1 | |
| if result.get("cache_hit"): | |
| cache_hits += 1 | |
| if result.get("compressed"): | |
| compressions += 1 | |
| except Exception as e: | |
| failed_requests += 1 | |
| logger.error(f"Concurrent benchmark task failed: {e}") | |
| total_time = time.time() - start_time | |
| if not all_latencies: | |
| logger.error("No successful requests in benchmark") | |
| return LatencyBenchmarkResult( | |
| test_name="multi_query_benchmark", | |
| total_requests=0, | |
| successful_requests=0, | |
| failed_requests=failed_requests, | |
| mean_latency=0, | |
| median_latency=0, | |
| p95_latency=0, | |
| p99_latency=0, | |
| min_latency=0, | |
| max_latency=0, | |
| cache_hit_rate=0, | |
| compression_rate=0, | |
| optimization_savings=0, | |
| fast_responses=0, | |
| normal_responses=0, | |
| slow_responses=0, | |
| test_duration=total_time, | |
| timestamp=time.time(), | |
| ) | |
| # Calculate statistics | |
| all_latencies.sort() | |
| # Performance tier classification | |
| fast_responses = sum(1 for lat in all_latencies if lat < 1.0) | |
| normal_responses = sum(1 for lat in all_latencies if 1.0 <= lat < 3.0) | |
| slow_responses = sum(1 for lat in all_latencies if lat >= 3.0) | |
| return LatencyBenchmarkResult( | |
| test_name="multi_query_benchmark", | |
| total_requests=successful_requests + failed_requests, | |
| successful_requests=successful_requests, | |
| failed_requests=failed_requests, | |
| # Timing statistics | |
| mean_latency=statistics.mean(all_latencies), | |
| median_latency=statistics.median(all_latencies), | |
| p95_latency=all_latencies[int(len(all_latencies) * 0.95)], | |
| p99_latency=all_latencies[int(len(all_latencies) * 0.99)], | |
| min_latency=min(all_latencies), | |
| max_latency=max(all_latencies), | |
| # Optimization metrics | |
| cache_hit_rate=cache_hits / successful_requests if successful_requests > 0 else 0, | |
| compression_rate=compressions / successful_requests if successful_requests > 0 else 0, | |
| optimization_savings=0.0, # Would need to calculate based on actual data | |
| # Performance tiers | |
| fast_responses=fast_responses, | |
| normal_responses=normal_responses, | |
| slow_responses=slow_responses, | |
| # Test metadata | |
| test_duration=total_time, | |
| timestamp=time.time(), | |
| ) | |
| def _execute_single_query(self, query: str) -> Dict[str, Any]: | |
| """Execute a single query and return timing/optimization data.""" | |
| start_time = time.time() | |
| try: | |
| if self.rag_pipeline: | |
| response = self.rag_pipeline.generate_answer(query) | |
| result = { | |
| "latency": time.time() - start_time, | |
| "success": True, | |
| "cache_hit": getattr(response, "cache_hit", False), | |
| "compressed": getattr(response, "context_compressed", False), | |
| } | |
| else: | |
| # Mock execution | |
| time.sleep(0.5) | |
| result = {"latency": time.time() - start_time, "success": True, "cache_hit": False, "compressed": False} | |
| return result | |
| except Exception as e: | |
| return { | |
| "latency": time.time() - start_time, | |
| "success": False, | |
| "error": str(e), | |
| "cache_hit": False, | |
| "compressed": False, | |
| } | |
| def save_benchmark_results(self, results: LatencyBenchmarkResult, output_file: str): | |
| """Save benchmark results to JSON file.""" | |
| results_dict = asdict(results) | |
| with open(output_file, "w") as f: | |
| json.dump(results_dict, f, indent=2) | |
| logger.info(f"Benchmark results saved to {output_file}") | |
| def load_benchmark_results(self, input_file: str) -> LatencyBenchmarkResult: | |
| """Load benchmark results from JSON file.""" | |
| with open(input_file, "r") as f: | |
| data = json.load(f) | |
| return LatencyBenchmarkResult(**data) | |
| def compare_benchmark_results(self, baseline_file: str, current_file: str) -> Dict[str, Any]: | |
| """ | |
| Compare two benchmark results to measure improvement. | |
| Args: | |
| baseline_file: Path to baseline benchmark results | |
| current_file: Path to current benchmark results | |
| Returns: | |
| Comparison analysis | |
| """ | |
| baseline = self.load_benchmark_results(baseline_file) | |
| current = self.load_benchmark_results(current_file) | |
| # Calculate improvements | |
| latency_improvement = ( | |
| (baseline.mean_latency - current.mean_latency) / baseline.mean_latency * 100 | |
| if baseline.mean_latency > 0 | |
| else 0 | |
| ) | |
| p95_improvement = ( | |
| (baseline.p95_latency - current.p95_latency) / baseline.p95_latency * 100 if baseline.p95_latency > 0 else 0 | |
| ) | |
| cache_improvement = current.cache_hit_rate - baseline.cache_hit_rate | |
| return { | |
| "baseline_timestamp": baseline.timestamp, | |
| "current_timestamp": current.timestamp, | |
| "latency_analysis": { | |
| "baseline_mean": baseline.mean_latency, | |
| "current_mean": current.mean_latency, | |
| "improvement_percent": latency_improvement, | |
| "is_improvement": latency_improvement > 0, | |
| }, | |
| "p95_analysis": { | |
| "baseline_p95": baseline.p95_latency, | |
| "current_p95": current.p95_latency, | |
| "improvement_percent": p95_improvement, | |
| "is_improvement": p95_improvement > 0, | |
| }, | |
| "cache_analysis": { | |
| "baseline_cache_rate": baseline.cache_hit_rate, | |
| "current_cache_rate": current.cache_hit_rate, | |
| "improvement": cache_improvement, | |
| "is_improvement": cache_improvement > 0, | |
| }, | |
| "performance_distribution": { | |
| "baseline_fast_rate": ( | |
| baseline.fast_responses / baseline.successful_requests if baseline.successful_requests > 0 else 0 | |
| ), | |
| "current_fast_rate": ( | |
| current.fast_responses / current.successful_requests if current.successful_requests > 0 else 0 | |
| ), | |
| "fast_response_improvement": ( | |
| (current.fast_responses / current.successful_requests if current.successful_requests > 0 else 0) | |
| - ( | |
| baseline.fast_responses / baseline.successful_requests | |
| if baseline.successful_requests > 0 | |
| else 0 | |
| ) | |
| ), | |
| }, | |
| "summary": { | |
| "overall_improvement": latency_improvement > 5 and p95_improvement > 5, | |
| "significant_improvement": latency_improvement > 20 or p95_improvement > 20, | |
| "recommendation": self._get_improvement_recommendation( | |
| latency_improvement, p95_improvement, cache_improvement | |
| ), | |
| }, | |
| } | |
| def _get_improvement_recommendation( | |
| self, latency_improvement: float, p95_improvement: float, cache_improvement: float | |
| ) -> str: | |
| """Generate improvement recommendations based on results.""" | |
| if latency_improvement > 20 and p95_improvement > 20: | |
| return "Excellent improvement! Optimizations are working very well." | |
| elif latency_improvement > 10 and p95_improvement > 10: | |
| return "Good improvement. Consider additional optimizations for further gains." | |
| elif latency_improvement > 0 and p95_improvement > 0: | |
| return "Modest improvement. May need more aggressive optimization strategies." | |
| elif cache_improvement > 0.2: | |
| return "Cache improvements detected. Focus on cache hit rate optimization." | |
| else: | |
| return ( | |
| "No significant improvement detected. Review optimization strategies " | |
| "and consider profiling for bottlenecks." | |
| ) | |
| def create_sample_benchmark_queries() -> List[str]: | |
| """Create a set of sample queries for benchmarking.""" | |
| return [ | |
| "What is the vacation policy?", | |
| "How much PTO do I get?", | |
| "Can I work remotely?", | |
| "What are the sick leave policies?", | |
| "How do I request time off?", | |
| "What is the bereavement leave policy?", | |
| "Are there any holiday policies?", | |
| "What about maternity leave?", | |
| "How does PTO accrual work?", | |
| "What is the remote work policy?", | |
| ] | |
| def run_quick_latency_test(rag_pipeline=None) -> Dict[str, Any]: | |
| """Run a quick latency test for immediate feedback.""" | |
| logger.info("Running quick latency test...") | |
| benchmark = LatencyBenchmark(rag_pipeline) | |
| queries = create_sample_benchmark_queries()[:3] # Use first 3 queries | |
| results = benchmark.run_multi_query_benchmark(queries=queries, concurrent_users=1, iterations_per_query=3) | |
| summary = { | |
| "test_type": "quick_latency_test", | |
| "queries_tested": len(queries), | |
| "total_requests": results.total_requests, | |
| "success_rate": results.successful_requests / results.total_requests if results.total_requests > 0 else 0, | |
| "mean_latency": results.mean_latency, | |
| "p95_latency": results.p95_latency, | |
| "cache_hit_rate": results.cache_hit_rate, | |
| "performance_grade": _grade_performance(results.mean_latency, results.p95_latency), | |
| "recommendations": _get_quick_recommendations(results), | |
| } | |
| logger.info( | |
| f"Quick test complete: {summary['performance_grade']} " | |
| f"(mean: {results.mean_latency:.2f}s, p95: {results.p95_latency:.2f}s)" | |
| ) | |
| return summary | |
| def _grade_performance(mean_latency: float, p95_latency: float) -> str: | |
| """Grade performance based on latency metrics.""" | |
| if mean_latency < 1.0 and p95_latency < 2.0: | |
| return "A+ (Excellent)" | |
| elif mean_latency < 2.0 and p95_latency < 3.0: | |
| return "A (Very Good)" | |
| elif mean_latency < 3.0 and p95_latency < 5.0: | |
| return "B (Good)" | |
| elif mean_latency < 5.0 and p95_latency < 8.0: | |
| return "C (Acceptable)" | |
| else: | |
| return "D (Needs Improvement)" | |
| def _get_quick_recommendations(results: LatencyBenchmarkResult) -> List[str]: | |
| """Generate quick recommendations based on test results.""" | |
| recommendations = [] | |
| if results.mean_latency > 3.0: | |
| recommendations.append("Mean latency is high - consider enabling response caching") | |
| if results.p95_latency > 5.0: | |
| recommendations.append("P95 latency is concerning - investigate LLM API performance") | |
| if results.cache_hit_rate < 0.1: | |
| recommendations.append("Low cache hit rate - review caching strategy") | |
| if results.fast_responses / results.successful_requests < 0.5: | |
| recommendations.append("Too few fast responses - enable context compression") | |
| if not recommendations: | |
| recommendations.append("Performance looks good - monitor for consistency") | |
| return recommendations | |