zenith-backend / scripts /performance_test.py
teoat's picture
Upload folder using huggingface_hub
4ae946d verified
#!/usr/bin/env python3
"""
Performance Testing Script
Tests the performance improvements from Phase 1 optimization
"""
import asyncio
import json
import statistics
import time
from typing import Any, Dict
import httpx
class PerformanceTester:
"""Comprehensive performance testing suite"""
def __init__(self, base_url: str = "http://localhost:8000"):
self.base_url = base_url
self.results = {}
async def run_full_performance_test(self) -> Dict[str, Any]:
"""Run comprehensive performance tests"""
print("๐Ÿš€ Starting Comprehensive Performance Test Suite")
print("=" * 60)
# Test 1: Health endpoint performance
print("\n๐Ÿ“Š Testing Health Endpoint Performance...")
health_results = await self.test_health_endpoint()
self.results["health_endpoint"] = health_results
# Test 2: Database query performance
print("\n๐Ÿ’พ Testing Database Query Performance...")
db_results = await self.test_database_performance()
self.results["database_queries"] = db_results
# Test 3: API caching effectiveness
print("\nโšก Testing API Caching Performance...")
cache_results = await self.test_api_caching()
self.results["api_caching"] = cache_results
# Test 4: Concurrent load testing
print("\n๐Ÿ”ฅ Testing Concurrent Load Handling...")
load_results = await self.test_concurrent_load()
self.results["concurrent_load"] = load_results
# Test 5: Memory usage analysis
print("\n๐Ÿง  Testing Memory Usage Patterns...")
memory_results = await self.test_memory_usage()
self.results["memory_usage"] = memory_results
# Generate comprehensive report
report = self.generate_performance_report()
print("\n" + "=" * 60)
print("โœ… Performance Testing Complete!")
print("=" * 60)
return report
async def test_health_endpoint(self) -> Dict[str, Any]:
"""Test health endpoint performance"""
response_times = []
async with httpx.AsyncClient() as client:
for i in range(100):
start_time = time.time()
try:
response = await client.get(f"{self.base_url}/health")
end_time = time.time()
if response.status_code == 200:
response_times.append(end_time - start_time)
else:
print(f" โš ๏ธ Health check failed: {response.status_code}")
except Exception as e:
print(f" โŒ Health check error: {e}")
if response_times:
return {
"sample_size": len(response_times),
"avg_response_time": round(statistics.mean(response_times), 4),
"median_response_time": round(statistics.median(response_times), 4),
"p95_response_time": round(sorted(response_times)[int(len(response_times) * 0.95)], 4),
"min_response_time": round(min(response_times), 4),
"max_response_time": round(max(response_times), 4),
"requests_per_second": round(len(response_times) / sum(response_times), 2),
}
else:
return {"error": "No successful health checks"}
async def test_database_performance(self) -> Dict[str, Any]:
"""Test database query performance"""
# This would require authenticated requests
# For now, we'll test what we can with public endpoints
try:
async with httpx.AsyncClient() as client:
# Test health endpoint that includes database check
start_time = time.time()
response = await client.get(f"{self.base_url}/health/detailed")
end_time = time.time()
if response.status_code == 200:
data = response.json()
db_check = data.get("checks", {}).get("database", {})
return {
"endpoint_response_time": round(end_time - start_time, 4),
"database_status": db_check.get("status"),
"database_response_time": db_check.get("response_time_seconds"),
"query_time": db_check.get("query_time_seconds"),
"record_count": db_check.get("case_count"),
}
else:
return {"error": f"Health check failed: {response.status_code}"}
except Exception as e:
return {"error": str(e)}
async def test_api_caching(self) -> Dict[str, Any]:
"""Test API caching effectiveness"""
cache_test_results = {
"first_request_time": None,
"cached_request_time": None,
"cache_hit_ratio": None,
"performance_improvement": None,
}
try:
async with httpx.AsyncClient() as client:
# First request (should cache)
start_time = time.time()
response1 = await client.get(f"{self.base_url}/health/detailed")
first_request_time = time.time() - start_time
if response1.status_code == 200:
cache_test_results["first_request_time"] = round(first_request_time, 4)
# Immediate second request (should hit cache if implemented)
start_time = time.time()
response2 = await client.get(f"{self.base_url}/health/detailed")
second_request_time = time.time() - start_time
if response2.status_code == 200:
cache_test_results["cached_request_time"] = round(second_request_time, 4)
# Calculate improvement
if first_request_time > 0 and second_request_time > 0:
improvement = ((first_request_time - second_request_time) / first_request_time) * 100
cache_test_results["performance_improvement"] = round(improvement, 2)
# Rough cache hit detection (significant improvement indicates caching)
if improvement > 20: # 20%+ improvement suggests caching
cache_test_results["cache_hit_ratio"] = "Likely cached"
else:
cache_test_results["cache_hit_ratio"] = "Possibly not cached"
except Exception as e:
cache_test_results["error"] = str(e)
return cache_test_results
async def test_concurrent_load(self) -> Dict[str, Any]:
"""Test system under concurrent load"""
async def make_request(request_id: int) -> Dict[str, Any]:
"""Make a single request and return timing data"""
async with httpx.AsyncClient() as client:
start_time = time.time()
try:
response = await client.get(f"{self.base_url}/health")
end_time = time.time()
return {
"request_id": request_id,
"response_time": end_time - start_time,
"status_code": response.status_code,
"success": response.status_code == 200,
}
except Exception as e:
end_time = time.time()
return {
"request_id": request_id,
"response_time": end_time - start_time,
"status_code": None,
"success": False,
"error": str(e),
}
# Run 50 concurrent requests
print(" ๐Ÿ“ˆ Running 50 concurrent requests...")
tasks = [make_request(i) for i in range(50)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
successful_requests = []
failed_requests = []
for result in results:
if isinstance(result, dict):
if result.get("success"):
successful_requests.append(result)
else:
failed_requests.append(result)
response_times = [r["response_time"] for r in successful_requests]
concurrent_results = {
"total_requests": len(results),
"successful_requests": len(successful_requests),
"failed_requests": len(failed_requests),
"success_rate": round(len(successful_requests) / len(results) * 100, 2) if results else 0,
}
if response_times:
concurrent_results.update(
{
"avg_response_time": round(statistics.mean(response_times), 4),
"median_response_time": round(statistics.median(response_times), 4),
"p95_response_time": round(sorted(response_times)[int(len(response_times) * 0.95)], 4),
"min_response_time": round(min(response_times), 4),
"max_response_time": round(max(response_times), 4),
}
)
# Calculate throughput
total_time = max(r["response_time"] for r in successful_requests)
concurrent_results["requests_per_second"] = round(len(successful_requests) / total_time, 2)
return concurrent_results
async def test_memory_usage(self) -> Dict[str, Any]:
"""Test memory usage patterns under load"""
try:
import os
import psutil
process = psutil.Process(os.getpid())
# Get baseline memory
baseline_memory = process.memory_info().rss / 1024 / 1024 # MB
# Generate some load
print(" ๐Ÿ”„ Generating load for memory testing...")
async with httpx.AsyncClient() as client:
# Make 200 requests over 10 seconds
tasks = []
for i in range(200):
task = client.get(f"{self.base_url}/health")
tasks.append(task)
# Small batch to avoid overwhelming
if len(tasks) >= 20:
await asyncio.gather(*tasks[:20])
tasks = tasks[20:]
# Check memory midway
current_memory = process.memory_info().rss / 1024 / 1024
memory_increase = current_memory - baseline_memory
if memory_increase > 50: # 50MB increase
print(f" โš ๏ธ Significant memory increase detected: +{memory_increase:.1f}MB")
# Complete remaining tasks
if tasks:
await asyncio.gather(*tasks)
# Final memory check
final_memory = process.memory_info().rss / 1024 / 1024
total_increase = final_memory - baseline_memory
return {
"baseline_memory_mb": round(baseline_memory, 2),
"final_memory_mb": round(final_memory, 2),
"total_increase_mb": round(total_increase, 2),
"memory_leak_detected": total_increase > 20, # 20MB+ increase suggests leak
"assessment": "Good" if total_increase < 20 else "Needs investigation",
}
except ImportError:
return {"error": "psutil not available for memory testing"}
except Exception as e:
return {"error": str(e)}
def generate_performance_report(self) -> Dict[str, Any]:
"""Generate comprehensive performance report"""
report = {
"test_timestamp": time.time(),
"test_duration_seconds": time.time() - getattr(self, "start_time", time.time()),
"summary": {},
"recommendations": [],
"raw_results": self.results,
}
# Calculate summary metrics
health_results = self.results.get("health_endpoint", {})
concurrent_results = self.results.get("concurrent_load", {})
cache_results = self.results.get("api_caching", {})
report["summary"] = {
"health_endpoint_avg_ms": health_results.get("avg_response_time", 0) * 1000,
"health_endpoint_p95_ms": health_results.get("p95_response_time", 0) * 1000,
"concurrent_success_rate": concurrent_results.get("success_rate", 0),
"concurrent_rps": concurrent_results.get("requests_per_second", 0),
"cache_performance_improvement": cache_results.get("performance_improvement", 0),
}
# Generate recommendations
recommendations = []
# Health endpoint performance
if health_results.get("avg_response_time", 1) > 0.5: # >500ms
recommendations.append(
"โš ๏ธ Health endpoint response time is slow (>500ms). Consider optimizing database queries."
)
# Concurrent performance
success_rate = concurrent_results.get("success_rate", 0)
if success_rate < 95:
recommendations.append(
f"โš ๏ธ Concurrent load success rate is low ({success_rate}%). Consider implementing rate limiting or load balancing."
)
if concurrent_results.get("requests_per_second", 0) < 100:
recommendations.append(
"โš ๏ธ Low throughput under concurrent load. Consider optimizing database connections and caching."
)
# Caching effectiveness
cache_improvement = cache_results.get("performance_improvement", 0)
if cache_improvement < 10:
recommendations.append(
"โš ๏ธ Limited caching effectiveness detected. Consider implementing Redis caching for frequently accessed data."
)
# Memory usage
memory_results = self.results.get("memory_usage", {})
if memory_results.get("memory_leak_detected"):
recommendations.append(
"๐Ÿšจ Potential memory leak detected. Investigate memory usage patterns and implement proper cleanup."
)
report["recommendations"] = recommendations if recommendations else ["โœ… All performance metrics look good!"]
return report
async def main():
"""Main performance testing function"""
print("๐ŸŽฏ Zenith Backend Performance Testing Suite")
print("=" * 60)
# Initialize tester
tester = PerformanceTester()
try:
# Run full test suite
report = await tester.run_full_performance_test()
# Display results
print("\n๐Ÿ“‹ PERFORMANCE TEST RESULTS")
print("-" * 40)
report.get("summary", {})
print(".2f")
print(".2f")
print(".1f")
print(".1f")
print(".1f")
recommendations = report.get("recommendations", [])
if recommendations:
print(f"\n๐Ÿ’ก RECOMMENDATIONS ({len(recommendations)})")
print("-" * 40)
for rec in recommendations:
print(f"โ€ข {rec}")
print("\n๐ŸŽ‰ Performance testing complete! Check the results above for optimization opportunities.")
# Save detailed report
with open("performance_test_results.json", "w") as f:
json.dump(report, f, indent=2, default=str)
print("๐Ÿ“„ Detailed results saved to: performance_test_results.json")
except Exception as e:
print(f"โŒ Performance testing failed: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())