#!/usr/bin/env python3 """ Performance Testing Script Tests the performance improvements from Phase 1 optimization """ import asyncio import json import statistics import time from typing import Any, Dict import httpx class PerformanceTester: """Comprehensive performance testing suite""" def __init__(self, base_url: str = "http://localhost:8000"): self.base_url = base_url self.results = {} async def run_full_performance_test(self) -> Dict[str, Any]: """Run comprehensive performance tests""" print("šŸš€ Starting Comprehensive Performance Test Suite") print("=" * 60) # Test 1: Health endpoint performance print("\nšŸ“Š Testing Health Endpoint Performance...") health_results = await self.test_health_endpoint() self.results["health_endpoint"] = health_results # Test 2: Database query performance print("\nšŸ’¾ Testing Database Query Performance...") db_results = await self.test_database_performance() self.results["database_queries"] = db_results # Test 3: API caching effectiveness print("\n⚔ Testing API Caching Performance...") cache_results = await self.test_api_caching() self.results["api_caching"] = cache_results # Test 4: Concurrent load testing print("\nšŸ”„ Testing Concurrent Load Handling...") load_results = await self.test_concurrent_load() self.results["concurrent_load"] = load_results # Test 5: Memory usage analysis print("\n🧠 Testing Memory Usage Patterns...") memory_results = await self.test_memory_usage() self.results["memory_usage"] = memory_results # Generate comprehensive report report = self.generate_performance_report() print("\n" + "=" * 60) print("āœ… Performance Testing Complete!") print("=" * 60) return report async def test_health_endpoint(self) -> Dict[str, Any]: """Test health endpoint performance""" response_times = [] async with httpx.AsyncClient() as client: for i in range(100): start_time = time.time() try: response = await client.get(f"{self.base_url}/health") end_time = time.time() if response.status_code == 200: response_times.append(end_time - start_time) else: print(f" āš ļø Health check failed: {response.status_code}") except Exception as e: print(f" āŒ Health check error: {e}") if response_times: return { "sample_size": len(response_times), "avg_response_time": round(statistics.mean(response_times), 4), "median_response_time": round(statistics.median(response_times), 4), "p95_response_time": round(sorted(response_times)[int(len(response_times) * 0.95)], 4), "min_response_time": round(min(response_times), 4), "max_response_time": round(max(response_times), 4), "requests_per_second": round(len(response_times) / sum(response_times), 2), } else: return {"error": "No successful health checks"} async def test_database_performance(self) -> Dict[str, Any]: """Test database query performance""" # This would require authenticated requests # For now, we'll test what we can with public endpoints try: async with httpx.AsyncClient() as client: # Test health endpoint that includes database check start_time = time.time() response = await client.get(f"{self.base_url}/health/detailed") end_time = time.time() if response.status_code == 200: data = response.json() db_check = data.get("checks", {}).get("database", {}) return { "endpoint_response_time": round(end_time - start_time, 4), "database_status": db_check.get("status"), "database_response_time": db_check.get("response_time_seconds"), "query_time": db_check.get("query_time_seconds"), "record_count": db_check.get("case_count"), } else: return {"error": f"Health check failed: {response.status_code}"} except Exception as e: return {"error": str(e)} async def test_api_caching(self) -> Dict[str, Any]: """Test API caching effectiveness""" cache_test_results = { "first_request_time": None, "cached_request_time": None, "cache_hit_ratio": None, "performance_improvement": None, } try: async with httpx.AsyncClient() as client: # First request (should cache) start_time = time.time() response1 = await client.get(f"{self.base_url}/health/detailed") first_request_time = time.time() - start_time if response1.status_code == 200: cache_test_results["first_request_time"] = round(first_request_time, 4) # Immediate second request (should hit cache if implemented) start_time = time.time() response2 = await client.get(f"{self.base_url}/health/detailed") second_request_time = time.time() - start_time if response2.status_code == 200: cache_test_results["cached_request_time"] = round(second_request_time, 4) # Calculate improvement if first_request_time > 0 and second_request_time > 0: improvement = ((first_request_time - second_request_time) / first_request_time) * 100 cache_test_results["performance_improvement"] = round(improvement, 2) # Rough cache hit detection (significant improvement indicates caching) if improvement > 20: # 20%+ improvement suggests caching cache_test_results["cache_hit_ratio"] = "Likely cached" else: cache_test_results["cache_hit_ratio"] = "Possibly not cached" except Exception as e: cache_test_results["error"] = str(e) return cache_test_results async def test_concurrent_load(self) -> Dict[str, Any]: """Test system under concurrent load""" async def make_request(request_id: int) -> Dict[str, Any]: """Make a single request and return timing data""" async with httpx.AsyncClient() as client: start_time = time.time() try: response = await client.get(f"{self.base_url}/health") end_time = time.time() return { "request_id": request_id, "response_time": end_time - start_time, "status_code": response.status_code, "success": response.status_code == 200, } except Exception as e: end_time = time.time() return { "request_id": request_id, "response_time": end_time - start_time, "status_code": None, "success": False, "error": str(e), } # Run 50 concurrent requests print(" šŸ“ˆ Running 50 concurrent requests...") tasks = [make_request(i) for i in range(50)] results = await asyncio.gather(*tasks, return_exceptions=True) # Process results successful_requests = [] failed_requests = [] for result in results: if isinstance(result, dict): if result.get("success"): successful_requests.append(result) else: failed_requests.append(result) response_times = [r["response_time"] for r in successful_requests] concurrent_results = { "total_requests": len(results), "successful_requests": len(successful_requests), "failed_requests": len(failed_requests), "success_rate": round(len(successful_requests) / len(results) * 100, 2) if results else 0, } if response_times: concurrent_results.update( { "avg_response_time": round(statistics.mean(response_times), 4), "median_response_time": round(statistics.median(response_times), 4), "p95_response_time": round(sorted(response_times)[int(len(response_times) * 0.95)], 4), "min_response_time": round(min(response_times), 4), "max_response_time": round(max(response_times), 4), } ) # Calculate throughput total_time = max(r["response_time"] for r in successful_requests) concurrent_results["requests_per_second"] = round(len(successful_requests) / total_time, 2) return concurrent_results async def test_memory_usage(self) -> Dict[str, Any]: """Test memory usage patterns under load""" try: import os import psutil process = psutil.Process(os.getpid()) # Get baseline memory baseline_memory = process.memory_info().rss / 1024 / 1024 # MB # Generate some load print(" šŸ”„ Generating load for memory testing...") async with httpx.AsyncClient() as client: # Make 200 requests over 10 seconds tasks = [] for i in range(200): task = client.get(f"{self.base_url}/health") tasks.append(task) # Small batch to avoid overwhelming if len(tasks) >= 20: await asyncio.gather(*tasks[:20]) tasks = tasks[20:] # Check memory midway current_memory = process.memory_info().rss / 1024 / 1024 memory_increase = current_memory - baseline_memory if memory_increase > 50: # 50MB increase print(f" āš ļø Significant memory increase detected: +{memory_increase:.1f}MB") # Complete remaining tasks if tasks: await asyncio.gather(*tasks) # Final memory check final_memory = process.memory_info().rss / 1024 / 1024 total_increase = final_memory - baseline_memory return { "baseline_memory_mb": round(baseline_memory, 2), "final_memory_mb": round(final_memory, 2), "total_increase_mb": round(total_increase, 2), "memory_leak_detected": total_increase > 20, # 20MB+ increase suggests leak "assessment": "Good" if total_increase < 20 else "Needs investigation", } except ImportError: return {"error": "psutil not available for memory testing"} except Exception as e: return {"error": str(e)} def generate_performance_report(self) -> Dict[str, Any]: """Generate comprehensive performance report""" report = { "test_timestamp": time.time(), "test_duration_seconds": time.time() - getattr(self, "start_time", time.time()), "summary": {}, "recommendations": [], "raw_results": self.results, } # Calculate summary metrics health_results = self.results.get("health_endpoint", {}) concurrent_results = self.results.get("concurrent_load", {}) cache_results = self.results.get("api_caching", {}) report["summary"] = { "health_endpoint_avg_ms": health_results.get("avg_response_time", 0) * 1000, "health_endpoint_p95_ms": health_results.get("p95_response_time", 0) * 1000, "concurrent_success_rate": concurrent_results.get("success_rate", 0), "concurrent_rps": concurrent_results.get("requests_per_second", 0), "cache_performance_improvement": cache_results.get("performance_improvement", 0), } # Generate recommendations recommendations = [] # Health endpoint performance if health_results.get("avg_response_time", 1) > 0.5: # >500ms recommendations.append( "āš ļø Health endpoint response time is slow (>500ms). Consider optimizing database queries." ) # Concurrent performance success_rate = concurrent_results.get("success_rate", 0) if success_rate < 95: recommendations.append( f"āš ļø Concurrent load success rate is low ({success_rate}%). Consider implementing rate limiting or load balancing." ) if concurrent_results.get("requests_per_second", 0) < 100: recommendations.append( "āš ļø Low throughput under concurrent load. Consider optimizing database connections and caching." ) # Caching effectiveness cache_improvement = cache_results.get("performance_improvement", 0) if cache_improvement < 10: recommendations.append( "āš ļø Limited caching effectiveness detected. Consider implementing Redis caching for frequently accessed data." ) # Memory usage memory_results = self.results.get("memory_usage", {}) if memory_results.get("memory_leak_detected"): recommendations.append( "🚨 Potential memory leak detected. Investigate memory usage patterns and implement proper cleanup." ) report["recommendations"] = recommendations if recommendations else ["āœ… All performance metrics look good!"] return report async def main(): """Main performance testing function""" print("šŸŽÆ Zenith Backend Performance Testing Suite") print("=" * 60) # Initialize tester tester = PerformanceTester() try: # Run full test suite report = await tester.run_full_performance_test() # Display results print("\nšŸ“‹ PERFORMANCE TEST RESULTS") print("-" * 40) report.get("summary", {}) print(".2f") print(".2f") print(".1f") print(".1f") print(".1f") recommendations = report.get("recommendations", []) if recommendations: print(f"\nšŸ’” RECOMMENDATIONS ({len(recommendations)})") print("-" * 40) for rec in recommendations: print(f"• {rec}") print("\nšŸŽ‰ Performance testing complete! Check the results above for optimization opportunities.") # Save detailed report with open("performance_test_results.json", "w") as f: json.dump(report, f, indent=2, default=str) print("šŸ“„ Detailed results saved to: performance_test_results.json") except Exception as e: print(f"āŒ Performance testing failed: {e}") import traceback traceback.print_exc() if __name__ == "__main__": asyncio.run(main())