Spaces:
Running
Running
| # utils/performance_benchmark.py | |
| """ | |
| Comprehensive performance benchmarking system | |
| Tracks and optimizes all components of the RAG pipeline | |
| """ | |
| import time | |
| import statistics | |
| from typing import Dict, List, Any, Optional | |
| from dataclasses import dataclass | |
| from datetime import datetime, timedelta | |
| import json | |
| import os | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from typing import Callable | |
| class BenchmarkResult: | |
| """Single benchmark measurement""" | |
| component: str | |
| operation: str | |
| execution_time: float | |
| success: bool | |
| error_message: Optional[str] = None | |
| timestamp: datetime = None | |
| metadata: Dict[str, Any] = None | |
| def __post_init__(self): | |
| if self.timestamp is None: | |
| self.timestamp = datetime.now() | |
| class PerformanceBenchmark: | |
| """ | |
| Comprehensive performance benchmarking and optimization system | |
| Tracks metrics across all RAG pipeline components | |
| """ | |
| def __init__(self, results_file: str = "./data/benchmark/performance_results.json"): | |
| self.results_file = results_file | |
| self.results: List[BenchmarkResult] = [] | |
| self._load_results() | |
| def _load_results(self): | |
| """Load previous benchmark results""" | |
| try: | |
| with open(self.results_file, 'r') as f: | |
| data = json.load(f) | |
| for item in data: | |
| item['timestamp'] = datetime.fromisoformat(item['timestamp']) | |
| self.results.append(BenchmarkResult(**item)) | |
| print(f"β Loaded {len(self.results)} benchmark results") | |
| except (FileNotFoundError, json.JSONDecodeError): | |
| self.results = [] | |
| print("π Starting with empty benchmark results") | |
| def _save_results(self): | |
| """Save benchmark results to file""" | |
| try: | |
| os.makedirs(os.path.dirname(self.results_file), exist_ok=True) | |
| with open(self.results_file, 'w') as f: | |
| json_data = [] | |
| for result in self.results: | |
| result_dict = { | |
| 'component': result.component, | |
| 'operation': result.operation, | |
| 'execution_time': result.execution_time, | |
| 'success': result.success, | |
| 'error_message': result.error_message, | |
| 'timestamp': result.timestamp.isoformat(), | |
| 'metadata': result.metadata or {} | |
| } | |
| json_data.append(result_dict) | |
| json.dump(json_data, f, indent=2) | |
| except Exception as e: | |
| print(f"β Could not save benchmark results: {e}") | |
| def measure_execution(self, component: str, operation: str): | |
| """Decorator to measure execution time of functions""" | |
| def decorator(func: Callable): | |
| def wrapper(*args, **kwargs): | |
| start_time = time.time() | |
| success = True | |
| error_message = None | |
| metadata = {} | |
| try: | |
| result = func(*args, **kwargs) | |
| metadata['result_type'] = type(result).__name__ | |
| if hasattr(result, 'keys'): | |
| metadata['result_keys'] = list(result.keys()) | |
| return result | |
| except Exception as e: | |
| success = False | |
| error_message = str(e) | |
| raise e | |
| finally: | |
| execution_time = time.time() - start_time | |
| benchmark_result = BenchmarkResult( | |
| component=component, | |
| operation=operation, | |
| execution_time=execution_time, | |
| success=success, | |
| error_message=error_message, | |
| metadata=metadata | |
| ) | |
| self.results.append(benchmark_result) | |
| self._save_results() | |
| return wrapper | |
| return decorator | |
| def benchmark_llm_providers(self, llm_providers: List, test_prompts: List[str]) -> Dict[str, Any]: | |
| """Benchmark different LLM providers""" | |
| print("π§ͺ Benchmarking LLM Providers") | |
| print("=" * 50) | |
| provider_results = {} | |
| for provider in llm_providers: | |
| provider_name = provider.get_provider_name() | |
| print(f"π¬ Testing {provider_name}...") | |
| execution_times = [] | |
| successes = 0 | |
| for i, prompt in enumerate(test_prompts): | |
| try: | |
| start_time = time.time() | |
| response = provider.generate( | |
| prompt, | |
| system_message="You are a helpful assistant.", | |
| max_tokens=100 | |
| ) | |
| execution_time = time.time() - start_time | |
| execution_times.append(execution_time) | |
| successes += 1 | |
| # Store benchmark result | |
| self.results.append(BenchmarkResult( | |
| component="llm_provider", | |
| operation=f"generate_{provider_name}", | |
| execution_time=execution_time, | |
| success=True, | |
| metadata={ | |
| 'provider': provider_name, | |
| 'prompt_length': len(prompt), | |
| 'response_length': len(response), | |
| 'prompt_index': i | |
| } | |
| )) | |
| except Exception as e: | |
| self.results.append(BenchmarkResult( | |
| component="llm_provider", | |
| operation=f"generate_{provider_name}", | |
| execution_time=0, | |
| success=False, | |
| error_message=str(e), | |
| metadata={'provider': provider_name, 'prompt_index': i} | |
| )) | |
| if execution_times: | |
| provider_results[provider_name] = { | |
| 'avg_time': statistics.mean(execution_times), | |
| 'min_time': min(execution_times), | |
| 'max_time': max(execution_times), | |
| 'std_dev': statistics.stdev(execution_times) if len(execution_times) > 1 else 0, | |
| 'success_rate': (successes / len(test_prompts)) * 100, | |
| 'total_tests': len(test_prompts) | |
| } | |
| self._save_results() | |
| return provider_results | |
| def benchmark_rag_components(self, rag_engine, test_queries: List[Dict]) -> Dict[str, Any]: | |
| """Benchmark RAG pipeline components""" | |
| print("π§ͺ Benchmarking RAG Components") | |
| print("=" * 50) | |
| component_results = {} | |
| for query_data in test_queries: | |
| query = query_data['query'] | |
| domain = query_data['domain'] | |
| print(f"π¬ Testing query: '{query}'") | |
| # Benchmark complete pipeline | |
| start_time = time.time() | |
| try: | |
| result = rag_engine.answer_research_question(query, domain) | |
| execution_time = time.time() - start_time | |
| self.results.append(BenchmarkResult( | |
| component="rag_pipeline", | |
| operation="complete_workflow", | |
| execution_time=execution_time, | |
| success=True, | |
| metadata={ | |
| 'query': query, | |
| 'domain': domain, | |
| 'papers_used': result.get('papers_used', 0), | |
| 'query_type': result.get('query_type', 'unknown') | |
| } | |
| )) | |
| # Track per-component times from analysis results | |
| analysis_results = result.get('analysis_results', {}) | |
| for component, analysis in analysis_results.items(): | |
| if isinstance(analysis, dict) and 'papers_analyzed' in analysis: | |
| component_results.setdefault(component, []).append(execution_time) | |
| except Exception as e: | |
| self.results.append(BenchmarkResult( | |
| component="rag_pipeline", | |
| operation="complete_workflow", | |
| execution_time=time.time() - start_time, | |
| success=False, | |
| error_message=str(e), | |
| metadata={'query': query, 'domain': domain} | |
| )) | |
| # Calculate component statistics | |
| stats = {} | |
| for component, times in component_results.items(): | |
| if times: | |
| stats[component] = { | |
| 'avg_time': statistics.mean(times), | |
| 'min_time': min(times), | |
| 'max_time': max(times), | |
| 'total_calls': len(times) | |
| } | |
| self._save_results() | |
| return stats | |
| def benchmark_vector_search(self, vector_store, test_queries: List[str], domains: List[str]) -> Dict[str, Any]: | |
| """Benchmark vector search performance""" | |
| print("π§ͺ Benchmarking Vector Search") | |
| print("=" * 50) | |
| search_results = {} | |
| for domain in domains: | |
| domain_times = [] | |
| for query in test_queries: | |
| start_time = time.time() | |
| try: | |
| results = vector_store.search(query=query, domain=domain, n_results=10) | |
| execution_time = time.time() - start_time | |
| domain_times.append(execution_time) | |
| self.results.append(BenchmarkResult( | |
| component="vector_search", | |
| operation=f"search_{domain}", | |
| execution_time=execution_time, | |
| success=True, | |
| metadata={ | |
| 'query': query, | |
| 'domain': domain, | |
| 'results_count': len(results), | |
| 'query_length': len(query) | |
| } | |
| )) | |
| except Exception as e: | |
| self.results.append(BenchmarkResult( | |
| component="vector_search", | |
| operation=f"search_{domain}", | |
| execution_time=time.time() - start_time, | |
| success=False, | |
| error_message=str(e), | |
| metadata={'query': query, 'domain': domain} | |
| )) | |
| if domain_times: | |
| search_results[domain] = { | |
| 'avg_time': statistics.mean(domain_times), | |
| 'min_time': min(domain_times), | |
| 'max_time': max(domain_times), | |
| 'total_searches': len(domain_times) | |
| } | |
| self._save_results() | |
| return search_results | |
| def get_performance_summary(self, time_period_hours: int = 24) -> Dict[str, Any]: | |
| """Get performance summary for recent period""" | |
| cutoff_time = datetime.now() - timedelta(hours=time_period_hours) | |
| recent_results = [r for r in self.results if r.timestamp > cutoff_time] | |
| if not recent_results: | |
| return {"message": "No recent benchmark data"} | |
| summary = { | |
| "total_benchmarks": len(recent_results), | |
| "success_rate": (sum(1 for r in recent_results if r.success) / len(recent_results)) * 100, | |
| "components": {}, | |
| "operations": {} | |
| } | |
| # Component-level statistics | |
| components = set(r.component for r in recent_results) | |
| for component in components: | |
| component_results = [r for r in recent_results if r.component == component and r.success] | |
| if component_results: | |
| times = [r.execution_time for r in component_results] | |
| summary["components"][component] = { | |
| "avg_time": statistics.mean(times), | |
| "min_time": min(times), | |
| "max_time": max(times), | |
| "total_calls": len(component_results), | |
| "success_rate": (len(component_results) / len( | |
| [r for r in recent_results if r.component == component])) * 100 | |
| } | |
| # Operation-level statistics | |
| operations = set(r.operation for r in recent_results) | |
| for operation in operations: | |
| operation_results = [r for r in recent_results if r.operation == operation and r.success] | |
| if operation_results: | |
| times = [r.execution_time for r in operation_results] | |
| summary["operations"][operation] = { | |
| "avg_time": statistics.mean(times), | |
| "min_time": min(times), | |
| "max_time": max(times), | |
| "total_calls": len(operation_results) | |
| } | |
| return summary | |
| def identify_bottlenecks(self, time_period_hours: int = 24) -> List[Dict[str, Any]]: | |
| """Identify performance bottlenecks in the system""" | |
| summary = self.get_performance_summary(time_period_hours) | |
| bottlenecks = [] | |
| # Check for slow components | |
| for component, stats in summary.get("components", {}).items(): | |
| if stats["avg_time"] > 5.0: # More than 5 seconds average | |
| bottlenecks.append({ | |
| "type": "slow_component", | |
| "component": component, | |
| "avg_time": stats["avg_time"], | |
| "severity": "high" if stats["avg_time"] > 10.0 else "medium", | |
| "suggestion": f"Optimize {component} performance - consider caching or parallel processing" | |
| }) | |
| if stats["success_rate"] < 80.0: | |
| bottlenecks.append({ | |
| "type": "unreliable_component", | |
| "component": component, | |
| "success_rate": stats["success_rate"], | |
| "severity": "high" if stats["success_rate"] < 50.0 else "medium", | |
| "suggestion": f"Improve error handling in {component} - check for common failure modes" | |
| }) | |
| # Check for high variance operations | |
| for operation, stats in summary.get("operations", {}).items(): | |
| if stats["max_time"] > stats["avg_time"] * 3: # High variance | |
| bottlenecks.append({ | |
| "type": "high_variance_operation", | |
| "operation": operation, | |
| "variance_ratio": stats["max_time"] / stats["avg_time"], | |
| "severity": "medium", | |
| "suggestion": f"Investigate performance variance in {operation} - may have inconsistent workloads" | |
| }) | |
| return sorted(bottlenecks, key=lambda x: 0 if x["severity"] == "high" else 1) | |
| def generate_performance_report(self, output_dir: str = "./data/benchmark/reports") -> str: | |
| """Generate comprehensive performance report with visualizations""" | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Generate summary data | |
| summary = self.get_performance_summary(168) # 1 week | |
| bottlenecks = self.identify_bottlenecks(168) | |
| # Create visualizations | |
| self._create_performance_charts(output_dir) | |
| # Generate HTML report | |
| report_path = os.path.join(output_dir, f"performance_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html") | |
| html_content = self._generate_html_report(summary, bottlenecks) | |
| with open(report_path, 'w') as f: | |
| f.write(html_content) | |
| print(f"β Performance report generated: {report_path}") | |
| return report_path | |
| def _create_performance_charts(self, output_dir: str): | |
| """Create performance visualization charts""" | |
| try: | |
| # Convert to DataFrame for easier plotting | |
| df_data = [] | |
| for result in self.results: | |
| if result.success: | |
| df_data.append({ | |
| 'component': result.component, | |
| 'operation': result.operation, | |
| 'execution_time': result.execution_time, | |
| 'timestamp': result.timestamp | |
| }) | |
| if not df_data: | |
| return | |
| df = pd.DataFrame(df_data) | |
| # Component performance comparison | |
| plt.figure(figsize=(12, 8)) | |
| component_avg = df.groupby('component')['execution_time'].mean().sort_values(ascending=False) | |
| component_avg.plot(kind='bar', color='skyblue') | |
| plt.title('Average Execution Time by Component') | |
| plt.ylabel('Time (seconds)') | |
| plt.xticks(rotation=45) | |
| plt.tight_layout() | |
| plt.savefig(os.path.join(output_dir, 'component_performance.png'), dpi=300, bbox_inches='tight') | |
| plt.close() | |
| # Success rate by component | |
| plt.figure(figsize=(10, 6)) | |
| component_success = {} | |
| for component in df['component'].unique(): | |
| total = len([r for r in self.results if r.component == component]) | |
| success = len([r for r in self.results if r.component == component and r.success]) | |
| component_success[component] = (success / total) * 100 if total > 0 else 0 | |
| pd.Series(component_success).sort_values().plot(kind='barh', color='lightgreen') | |
| plt.title('Success Rate by Component') | |
| plt.xlabel('Success Rate (%)') | |
| plt.tight_layout() | |
| plt.savefig(os.path.join(output_dir, 'success_rates.png'), dpi=300, bbox_inches='tight') | |
| plt.close() | |
| # Performance over time | |
| plt.figure(figsize=(12, 6)) | |
| df['date'] = df['timestamp'].dt.date | |
| daily_avg = df.groupby('date')['execution_time'].mean() | |
| daily_avg.plot(kind='line', marker='o', color='orange') | |
| plt.title('Average Daily Performance Over Time') | |
| plt.ylabel('Time (seconds)') | |
| plt.xlabel('Date') | |
| plt.grid(True, alpha=0.3) | |
| plt.tight_layout() | |
| plt.savefig(os.path.join(output_dir, 'performance_trend.png'), dpi=300, bbox_inches='tight') | |
| plt.close() | |
| except Exception as e: | |
| print(f"β Error creating charts: {e}") | |
| def _generate_html_report(self, summary: Dict, bottlenecks: List[Dict]) -> str: | |
| """Generate HTML performance report""" | |
| html_template = """ | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>RAG System Performance Report</title> | |
| <style> | |
| body { font-family: Arial, sans-serif; margin: 40px; } | |
| .header { background: #2c3e50; color: white; padding: 20px; border-radius: 5px; } | |
| .summary { background: #ecf0f1; padding: 20px; margin: 20px 0; border-radius: 5px; } | |
| .bottleneck { background: #fff3cd; padding: 15px; margin: 10px 0; border-left: 4px solid #ffc107; } | |
| .bottleneck.high { background: #f8d7da; border-left-color: #dc3545; } | |
| .metric { display: inline-block; margin: 10px; padding: 10px; background: white; border-radius: 5px; } | |
| .chart { margin: 20px 0; text-align: center; } | |
| </style> | |
| </head> | |
| <body> | |
| <div class="header"> | |
| <h1>π€ RAG System Performance Report</h1> | |
| <p>Generated on: {timestamp}</p> | |
| </div> | |
| <div class="summary"> | |
| <h2>π Performance Summary</h2> | |
| <div class="metric"> | |
| <h3>Total Benchmarks</h3> | |
| <p style="font-size: 24px; font-weight: bold;">{total_benchmarks}</p> | |
| </div> | |
| <div class="metric"> | |
| <h3>Success Rate</h3> | |
| <p style="font-size: 24px; font-weight: bold; color: {success_color};">{success_rate}%</p> | |
| </div> | |
| </div> | |
| <h2>π Performance Bottlenecks</h2> | |
| {bottlenecks_html} | |
| <h2>π Component Performance</h2> | |
| <div class="chart"> | |
| <img src="component_performance.png" alt="Component Performance" style="max-width: 100%;"> | |
| </div> | |
| <div class="chart"> | |
| <img src="success_rates.png" alt="Success Rates" style="max-width: 100%;"> | |
| </div> | |
| <div class="chart"> | |
| <img src="performance_trend.png" alt="Performance Trend" style="max-width: 100%;"> | |
| </div> | |
| <h2>π Detailed Metrics</h2> | |
| <pre>{metrics_json}</pre> | |
| </body> | |
| </html> | |
| """ | |
| # Generate bottlenecks HTML | |
| bottlenecks_html = "" | |
| if bottlenecks: | |
| for bottleneck in bottlenecks: | |
| severity_class = "high" if bottleneck["severity"] == "high" else "" | |
| bottlenecks_html += f""" | |
| <div class="bottleneck {severity_class}"> | |
| <h3>π¨ {bottleneck['type'].replace('_', ' ').title()}</h3> | |
| <p><strong>Component:</strong> {bottleneck.get('component', bottleneck.get('operation', 'N/A'))}</p> | |
| <p><strong>Severity:</strong> {bottleneck['severity'].title()}</p> | |
| <p><strong>Suggestion:</strong> {bottleneck['suggestion']}</p> | |
| </div> | |
| """ | |
| else: | |
| bottlenecks_html = "<p>β No significant bottlenecks identified</p>" | |
| # Determine success rate color | |
| success_rate = summary.get("success_rate", 0) | |
| success_color = "#28a745" if success_rate > 90 else "#ffc107" if success_rate > 75 else "#dc3545" | |
| return html_template.format( | |
| timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| total_benchmarks=summary.get("total_benchmarks", 0), | |
| success_rate=round(success_rate, 1), | |
| success_color=success_color, | |
| bottlenecks_html=bottlenecks_html, | |
| metrics_json=json.dumps(summary, indent=2) | |
| ) | |
| def clear_old_data(self, days_to_keep: int = 30): | |
| """Clear benchmark data older than specified days""" | |
| cutoff_time = datetime.now() - timedelta(days=days_to_keep) | |
| self.results = [r for r in self.results if r.timestamp > cutoff_time] | |
| self._save_results() | |
| print(f"β Cleared benchmark data older than {days_to_keep} days") | |
| # Quick test | |
| def test_benchmark_system(): | |
| """Test the performance benchmark system""" | |
| print("π§ͺ Testing Performance Benchmark System") | |
| print("=" * 50) | |
| benchmark = PerformanceBenchmark("./data/test_benchmark/results.json") | |
| # Test basic measurement | |
| def test_function(): | |
| time.sleep(0.1) | |
| return {"result": "success"} | |
| test_function() | |
| # Generate summary | |
| summary = benchmark.get_performance_summary() | |
| print(f"π Summary: {summary}") | |
| # Identify bottlenecks | |
| bottlenecks = benchmark.identify_bottlenecks() | |
| print(f"π Bottlenecks: {len(bottlenecks)}") | |
| # Generate report | |
| report_path = benchmark.generate_performance_report("./data/test_benchmark/reports") | |
| print(f"π Report: {report_path}") | |
| if __name__ == "__main__": | |
| test_benchmark_system() |