Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| """ | |
| Performance Testing Script | |
| Tests the performance improvements from Phase 1 optimization | |
| """ | |
| import asyncio | |
| import json | |
| import statistics | |
| import time | |
| from typing import Any, Dict | |
| import httpx | |
| class PerformanceTester: | |
| """Comprehensive performance testing suite""" | |
| def __init__(self, base_url: str = "http://localhost:8000"): | |
| self.base_url = base_url | |
| self.results = {} | |
| async def run_full_performance_test(self) -> Dict[str, Any]: | |
| """Run comprehensive performance tests""" | |
| print("๐ Starting Comprehensive Performance Test Suite") | |
| print("=" * 60) | |
| # Test 1: Health endpoint performance | |
| print("\n๐ Testing Health Endpoint Performance...") | |
| health_results = await self.test_health_endpoint() | |
| self.results["health_endpoint"] = health_results | |
| # Test 2: Database query performance | |
| print("\n๐พ Testing Database Query Performance...") | |
| db_results = await self.test_database_performance() | |
| self.results["database_queries"] = db_results | |
| # Test 3: API caching effectiveness | |
| print("\nโก Testing API Caching Performance...") | |
| cache_results = await self.test_api_caching() | |
| self.results["api_caching"] = cache_results | |
| # Test 4: Concurrent load testing | |
| print("\n๐ฅ Testing Concurrent Load Handling...") | |
| load_results = await self.test_concurrent_load() | |
| self.results["concurrent_load"] = load_results | |
| # Test 5: Memory usage analysis | |
| print("\n๐ง Testing Memory Usage Patterns...") | |
| memory_results = await self.test_memory_usage() | |
| self.results["memory_usage"] = memory_results | |
| # Generate comprehensive report | |
| report = self.generate_performance_report() | |
| print("\n" + "=" * 60) | |
| print("โ Performance Testing Complete!") | |
| print("=" * 60) | |
| return report | |
| async def test_health_endpoint(self) -> Dict[str, Any]: | |
| """Test health endpoint performance""" | |
| response_times = [] | |
| async with httpx.AsyncClient() as client: | |
| for i in range(100): | |
| start_time = time.time() | |
| try: | |
| response = await client.get(f"{self.base_url}/health") | |
| end_time = time.time() | |
| if response.status_code == 200: | |
| response_times.append(end_time - start_time) | |
| else: | |
| print(f" โ ๏ธ Health check failed: {response.status_code}") | |
| except Exception as e: | |
| print(f" โ Health check error: {e}") | |
| if response_times: | |
| return { | |
| "sample_size": len(response_times), | |
| "avg_response_time": round(statistics.mean(response_times), 4), | |
| "median_response_time": round(statistics.median(response_times), 4), | |
| "p95_response_time": round(sorted(response_times)[int(len(response_times) * 0.95)], 4), | |
| "min_response_time": round(min(response_times), 4), | |
| "max_response_time": round(max(response_times), 4), | |
| "requests_per_second": round(len(response_times) / sum(response_times), 2), | |
| } | |
| else: | |
| return {"error": "No successful health checks"} | |
| async def test_database_performance(self) -> Dict[str, Any]: | |
| """Test database query performance""" | |
| # This would require authenticated requests | |
| # For now, we'll test what we can with public endpoints | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| # Test health endpoint that includes database check | |
| start_time = time.time() | |
| response = await client.get(f"{self.base_url}/health/detailed") | |
| end_time = time.time() | |
| if response.status_code == 200: | |
| data = response.json() | |
| db_check = data.get("checks", {}).get("database", {}) | |
| return { | |
| "endpoint_response_time": round(end_time - start_time, 4), | |
| "database_status": db_check.get("status"), | |
| "database_response_time": db_check.get("response_time_seconds"), | |
| "query_time": db_check.get("query_time_seconds"), | |
| "record_count": db_check.get("case_count"), | |
| } | |
| else: | |
| return {"error": f"Health check failed: {response.status_code}"} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| async def test_api_caching(self) -> Dict[str, Any]: | |
| """Test API caching effectiveness""" | |
| cache_test_results = { | |
| "first_request_time": None, | |
| "cached_request_time": None, | |
| "cache_hit_ratio": None, | |
| "performance_improvement": None, | |
| } | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| # First request (should cache) | |
| start_time = time.time() | |
| response1 = await client.get(f"{self.base_url}/health/detailed") | |
| first_request_time = time.time() - start_time | |
| if response1.status_code == 200: | |
| cache_test_results["first_request_time"] = round(first_request_time, 4) | |
| # Immediate second request (should hit cache if implemented) | |
| start_time = time.time() | |
| response2 = await client.get(f"{self.base_url}/health/detailed") | |
| second_request_time = time.time() - start_time | |
| if response2.status_code == 200: | |
| cache_test_results["cached_request_time"] = round(second_request_time, 4) | |
| # Calculate improvement | |
| if first_request_time > 0 and second_request_time > 0: | |
| improvement = ((first_request_time - second_request_time) / first_request_time) * 100 | |
| cache_test_results["performance_improvement"] = round(improvement, 2) | |
| # Rough cache hit detection (significant improvement indicates caching) | |
| if improvement > 20: # 20%+ improvement suggests caching | |
| cache_test_results["cache_hit_ratio"] = "Likely cached" | |
| else: | |
| cache_test_results["cache_hit_ratio"] = "Possibly not cached" | |
| except Exception as e: | |
| cache_test_results["error"] = str(e) | |
| return cache_test_results | |
| async def test_concurrent_load(self) -> Dict[str, Any]: | |
| """Test system under concurrent load""" | |
| async def make_request(request_id: int) -> Dict[str, Any]: | |
| """Make a single request and return timing data""" | |
| async with httpx.AsyncClient() as client: | |
| start_time = time.time() | |
| try: | |
| response = await client.get(f"{self.base_url}/health") | |
| end_time = time.time() | |
| return { | |
| "request_id": request_id, | |
| "response_time": end_time - start_time, | |
| "status_code": response.status_code, | |
| "success": response.status_code == 200, | |
| } | |
| except Exception as e: | |
| end_time = time.time() | |
| return { | |
| "request_id": request_id, | |
| "response_time": end_time - start_time, | |
| "status_code": None, | |
| "success": False, | |
| "error": str(e), | |
| } | |
| # Run 50 concurrent requests | |
| print(" ๐ Running 50 concurrent requests...") | |
| tasks = [make_request(i) for i in range(50)] | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # Process results | |
| successful_requests = [] | |
| failed_requests = [] | |
| for result in results: | |
| if isinstance(result, dict): | |
| if result.get("success"): | |
| successful_requests.append(result) | |
| else: | |
| failed_requests.append(result) | |
| response_times = [r["response_time"] for r in successful_requests] | |
| concurrent_results = { | |
| "total_requests": len(results), | |
| "successful_requests": len(successful_requests), | |
| "failed_requests": len(failed_requests), | |
| "success_rate": round(len(successful_requests) / len(results) * 100, 2) if results else 0, | |
| } | |
| if response_times: | |
| concurrent_results.update( | |
| { | |
| "avg_response_time": round(statistics.mean(response_times), 4), | |
| "median_response_time": round(statistics.median(response_times), 4), | |
| "p95_response_time": round(sorted(response_times)[int(len(response_times) * 0.95)], 4), | |
| "min_response_time": round(min(response_times), 4), | |
| "max_response_time": round(max(response_times), 4), | |
| } | |
| ) | |
| # Calculate throughput | |
| total_time = max(r["response_time"] for r in successful_requests) | |
| concurrent_results["requests_per_second"] = round(len(successful_requests) / total_time, 2) | |
| return concurrent_results | |
| async def test_memory_usage(self) -> Dict[str, Any]: | |
| """Test memory usage patterns under load""" | |
| try: | |
| import os | |
| import psutil | |
| process = psutil.Process(os.getpid()) | |
| # Get baseline memory | |
| baseline_memory = process.memory_info().rss / 1024 / 1024 # MB | |
| # Generate some load | |
| print(" ๐ Generating load for memory testing...") | |
| async with httpx.AsyncClient() as client: | |
| # Make 200 requests over 10 seconds | |
| tasks = [] | |
| for i in range(200): | |
| task = client.get(f"{self.base_url}/health") | |
| tasks.append(task) | |
| # Small batch to avoid overwhelming | |
| if len(tasks) >= 20: | |
| await asyncio.gather(*tasks[:20]) | |
| tasks = tasks[20:] | |
| # Check memory midway | |
| current_memory = process.memory_info().rss / 1024 / 1024 | |
| memory_increase = current_memory - baseline_memory | |
| if memory_increase > 50: # 50MB increase | |
| print(f" โ ๏ธ Significant memory increase detected: +{memory_increase:.1f}MB") | |
| # Complete remaining tasks | |
| if tasks: | |
| await asyncio.gather(*tasks) | |
| # Final memory check | |
| final_memory = process.memory_info().rss / 1024 / 1024 | |
| total_increase = final_memory - baseline_memory | |
| return { | |
| "baseline_memory_mb": round(baseline_memory, 2), | |
| "final_memory_mb": round(final_memory, 2), | |
| "total_increase_mb": round(total_increase, 2), | |
| "memory_leak_detected": total_increase > 20, # 20MB+ increase suggests leak | |
| "assessment": "Good" if total_increase < 20 else "Needs investigation", | |
| } | |
| except ImportError: | |
| return {"error": "psutil not available for memory testing"} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def generate_performance_report(self) -> Dict[str, Any]: | |
| """Generate comprehensive performance report""" | |
| report = { | |
| "test_timestamp": time.time(), | |
| "test_duration_seconds": time.time() - getattr(self, "start_time", time.time()), | |
| "summary": {}, | |
| "recommendations": [], | |
| "raw_results": self.results, | |
| } | |
| # Calculate summary metrics | |
| health_results = self.results.get("health_endpoint", {}) | |
| concurrent_results = self.results.get("concurrent_load", {}) | |
| cache_results = self.results.get("api_caching", {}) | |
| report["summary"] = { | |
| "health_endpoint_avg_ms": health_results.get("avg_response_time", 0) * 1000, | |
| "health_endpoint_p95_ms": health_results.get("p95_response_time", 0) * 1000, | |
| "concurrent_success_rate": concurrent_results.get("success_rate", 0), | |
| "concurrent_rps": concurrent_results.get("requests_per_second", 0), | |
| "cache_performance_improvement": cache_results.get("performance_improvement", 0), | |
| } | |
| # Generate recommendations | |
| recommendations = [] | |
| # Health endpoint performance | |
| if health_results.get("avg_response_time", 1) > 0.5: # >500ms | |
| recommendations.append( | |
| "โ ๏ธ Health endpoint response time is slow (>500ms). Consider optimizing database queries." | |
| ) | |
| # Concurrent performance | |
| success_rate = concurrent_results.get("success_rate", 0) | |
| if success_rate < 95: | |
| recommendations.append( | |
| f"โ ๏ธ Concurrent load success rate is low ({success_rate}%). Consider implementing rate limiting or load balancing." | |
| ) | |
| if concurrent_results.get("requests_per_second", 0) < 100: | |
| recommendations.append( | |
| "โ ๏ธ Low throughput under concurrent load. Consider optimizing database connections and caching." | |
| ) | |
| # Caching effectiveness | |
| cache_improvement = cache_results.get("performance_improvement", 0) | |
| if cache_improvement < 10: | |
| recommendations.append( | |
| "โ ๏ธ Limited caching effectiveness detected. Consider implementing Redis caching for frequently accessed data." | |
| ) | |
| # Memory usage | |
| memory_results = self.results.get("memory_usage", {}) | |
| if memory_results.get("memory_leak_detected"): | |
| recommendations.append( | |
| "๐จ Potential memory leak detected. Investigate memory usage patterns and implement proper cleanup." | |
| ) | |
| report["recommendations"] = recommendations if recommendations else ["โ All performance metrics look good!"] | |
| return report | |
| async def main(): | |
| """Main performance testing function""" | |
| print("๐ฏ Zenith Backend Performance Testing Suite") | |
| print("=" * 60) | |
| # Initialize tester | |
| tester = PerformanceTester() | |
| try: | |
| # Run full test suite | |
| report = await tester.run_full_performance_test() | |
| # Display results | |
| print("\n๐ PERFORMANCE TEST RESULTS") | |
| print("-" * 40) | |
| report.get("summary", {}) | |
| print(".2f") | |
| print(".2f") | |
| print(".1f") | |
| print(".1f") | |
| print(".1f") | |
| recommendations = report.get("recommendations", []) | |
| if recommendations: | |
| print(f"\n๐ก RECOMMENDATIONS ({len(recommendations)})") | |
| print("-" * 40) | |
| for rec in recommendations: | |
| print(f"โข {rec}") | |
| print("\n๐ Performance testing complete! Check the results above for optimization opportunities.") | |
| # Save detailed report | |
| with open("performance_test_results.json", "w") as f: | |
| json.dump(report, f, indent=2, default=str) | |
| print("๐ Detailed results saved to: performance_test_results.json") | |
| except Exception as e: | |
| print(f"โ Performance testing failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |