Spaces:
Sleeping
Sleeping
| """ | |
| Load Testing Script for Medical AI Platform Monitoring Infrastructure | |
| Tests system performance, monitoring accuracy, and error handling under stress | |
| Requirements: | |
| - Tests monitoring middleware performance impact | |
| - Validates cache effectiveness under load | |
| - Verifies error rate tracking accuracy | |
| - Confirms alert system responsiveness | |
| - Measures latency tracking precision | |
| """ | |
| import asyncio | |
| import aiohttp | |
| import time | |
| import json | |
| from typing import List, Dict, Any | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| import statistics | |
| class LoadTestResult: | |
| """Result from a single request""" | |
| success: bool | |
| latency_ms: float | |
| status_code: int | |
| endpoint: str | |
| timestamp: float | |
| error_message: str = None | |
| class MonitoringLoadTester: | |
| """Load tester for monitoring infrastructure""" | |
| def __init__(self, base_url: str = "http://localhost:7860"): | |
| self.base_url = base_url | |
| self.results: List[LoadTestResult] = [] | |
| async def make_request( | |
| self, | |
| session: aiohttp.ClientSession, | |
| endpoint: str, | |
| method: str = "GET", | |
| data: Dict = None | |
| ) -> LoadTestResult: | |
| """Make a single HTTP request and measure performance""" | |
| start_time = time.time() | |
| url = f"{self.base_url}{endpoint}" | |
| try: | |
| if method == "GET": | |
| async with session.get(url) as response: | |
| await response.text() | |
| latency_ms = (time.time() - start_time) * 1000 | |
| return LoadTestResult( | |
| success=response.status == 200, | |
| latency_ms=latency_ms, | |
| status_code=response.status, | |
| endpoint=endpoint, | |
| timestamp=time.time() | |
| ) | |
| elif method == "POST": | |
| async with session.post(url, json=data) as response: | |
| await response.text() | |
| latency_ms = (time.time() - start_time) * 1000 | |
| return LoadTestResult( | |
| success=response.status == 200, | |
| latency_ms=latency_ms, | |
| status_code=response.status, | |
| endpoint=endpoint, | |
| timestamp=time.time() | |
| ) | |
| except Exception as e: | |
| latency_ms = (time.time() - start_time) * 1000 | |
| return LoadTestResult( | |
| success=False, | |
| latency_ms=latency_ms, | |
| status_code=0, | |
| endpoint=endpoint, | |
| timestamp=time.time(), | |
| error_message=str(e) | |
| ) | |
| async def run_concurrent_requests( | |
| self, | |
| endpoint: str, | |
| num_requests: int, | |
| concurrent_workers: int = 10 | |
| ): | |
| """Run multiple concurrent requests to an endpoint""" | |
| print(f"\n{'='*60}") | |
| print(f"Testing: {endpoint}") | |
| print(f"Requests: {num_requests}, Concurrent Workers: {concurrent_workers}") | |
| print(f"{'='*60}") | |
| async with aiohttp.ClientSession() as session: | |
| tasks = [] | |
| for i in range(num_requests): | |
| task = self.make_request(session, endpoint) | |
| tasks.append(task) | |
| # Limit concurrency | |
| if len(tasks) >= concurrent_workers or i == num_requests - 1: | |
| results = await asyncio.gather(*tasks) | |
| self.results.extend(results) | |
| tasks = [] | |
| # Small delay to avoid overwhelming the server | |
| await asyncio.sleep(0.1) | |
| # Analyze results for this endpoint | |
| self.analyze_endpoint_results(endpoint) | |
| def analyze_endpoint_results(self, endpoint: str): | |
| """Analyze results for a specific endpoint""" | |
| endpoint_results = [r for r in self.results if r.endpoint == endpoint] | |
| if not endpoint_results: | |
| print(f"No results for {endpoint}") | |
| return | |
| successes = [r for r in endpoint_results if r.success] | |
| failures = [r for r in endpoint_results if not r.success] | |
| latencies = [r.latency_ms for r in successes] | |
| print(f"\n📊 Results for {endpoint}:") | |
| print(f" Total Requests: {len(endpoint_results)}") | |
| print(f" ✓ Successful: {len(successes)} ({len(successes)/len(endpoint_results)*100:.1f}%)") | |
| print(f" ✗ Failed: {len(failures)} ({len(failures)/len(endpoint_results)*100:.1f}%)") | |
| if latencies: | |
| print(f"\n⏱ Latency Statistics:") | |
| print(f" Mean: {statistics.mean(latencies):.2f} ms") | |
| print(f" Median: {statistics.median(latencies):.2f} ms") | |
| print(f" Min: {min(latencies):.2f} ms") | |
| print(f" Max: {max(latencies):.2f} ms") | |
| print(f" Std Dev: {statistics.stdev(latencies) if len(latencies) > 1 else 0:.2f} ms") | |
| if len(latencies) >= 10: | |
| sorted_latencies = sorted(latencies) | |
| p95_index = int(len(sorted_latencies) * 0.95) | |
| p99_index = int(len(sorted_latencies) * 0.99) | |
| print(f" P95: {sorted_latencies[p95_index]:.2f} ms") | |
| print(f" P99: {sorted_latencies[p99_index]:.2f} ms") | |
| if failures: | |
| print(f"\n⚠ Sample Errors:") | |
| for failure in failures[:3]: | |
| print(f" Status: {failure.status_code}, Error: {failure.error_message}") | |
| async def test_health_endpoint(self, num_requests: int = 100): | |
| """Test health check endpoint""" | |
| await self.run_concurrent_requests("/health", num_requests, concurrent_workers=20) | |
| async def test_dashboard_endpoint(self, num_requests: int = 50): | |
| """Test dashboard endpoint (more intensive)""" | |
| await self.run_concurrent_requests("/health/dashboard", num_requests, concurrent_workers=10) | |
| async def test_admin_endpoints(self): | |
| """Test admin endpoints""" | |
| # Test cache statistics | |
| await self.run_concurrent_requests("/admin/cache/statistics", num_requests=30, concurrent_workers=5) | |
| # Test metrics | |
| await self.run_concurrent_requests("/admin/metrics", num_requests=30, concurrent_workers=5) | |
| async def verify_monitoring_accuracy(self): | |
| """Verify that monitoring system accurately tracks requests""" | |
| print(f"\n{'='*60}") | |
| print("VERIFYING MONITORING ACCURACY") | |
| print(f"{'='*60}") | |
| # Get initial dashboard state | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(f"{self.base_url}/health/dashboard") as response: | |
| initial_data = await response.json() | |
| initial_requests = initial_data['system']['total_requests'] | |
| print(f"Initial request count: {initial_requests}") | |
| # Make exactly 50 requests | |
| print(f"\nMaking 50 test requests...") | |
| await self.run_concurrent_requests("/health", num_requests=50, concurrent_workers=10) | |
| # Wait for monitoring to update | |
| await asyncio.sleep(2) | |
| # Check final dashboard state | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(f"{self.base_url}/health/dashboard") as response: | |
| final_data = await response.json() | |
| final_requests = final_data['system']['total_requests'] | |
| print(f"Final request count: {final_requests}") | |
| actual_increase = final_requests - initial_requests | |
| expected_increase = 50 | |
| print(f"\n📈 Monitoring Accuracy:") | |
| print(f" Expected increase: {expected_increase}") | |
| print(f" Actual increase: {actual_increase}") | |
| print(f" Accuracy: {(actual_increase/expected_increase*100):.1f}%") | |
| if actual_increase >= expected_increase * 0.95: | |
| print(f" ✓ Monitoring is accurately tracking requests") | |
| else: | |
| print(f" ⚠ Monitoring may have tracking issues") | |
| async def test_cache_effectiveness(self): | |
| """Test cache effectiveness under repeated requests""" | |
| print(f"\n{'='*60}") | |
| print("TESTING CACHE EFFECTIVENESS") | |
| print(f"{'='*60}") | |
| # Get initial cache stats | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(f"{self.base_url}/health/dashboard") as response: | |
| initial_data = await response.json() | |
| initial_hits = initial_data['cache']['hits'] | |
| initial_misses = initial_data['cache']['misses'] | |
| initial_hit_rate = initial_data['cache']['hit_rate'] | |
| print(f"Initial cache state:") | |
| print(f" Hits: {initial_hits}") | |
| print(f" Misses: {initial_misses}") | |
| print(f" Hit Rate: {(initial_hit_rate * 100):.1f}%") | |
| # Make repeated requests to same endpoint (should benefit from caching) | |
| print(f"\nMaking 100 requests to test caching...") | |
| await self.run_concurrent_requests("/health/dashboard", num_requests=100, concurrent_workers=10) | |
| # Wait for cache to update | |
| await asyncio.sleep(2) | |
| # Check final cache stats | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(f"{self.base_url}/health/dashboard") as response: | |
| final_data = await response.json() | |
| final_hits = final_data['cache']['hits'] | |
| final_misses = final_data['cache']['misses'] | |
| final_hit_rate = final_data['cache']['hit_rate'] | |
| print(f"\nFinal cache state:") | |
| print(f" Hits: {final_hits}") | |
| print(f" Misses: {final_misses}") | |
| print(f" Hit Rate: {(final_hit_rate * 100):.1f}%") | |
| print(f"\n📊 Cache Performance:") | |
| print(f" Hit increase: {final_hits - initial_hits}") | |
| print(f" Miss increase: {final_misses - initial_misses}") | |
| print(f" Current hit rate: {(final_hit_rate * 100):.1f}%") | |
| async def stress_test(self, duration_seconds: int = 30): | |
| """Run sustained load test""" | |
| print(f"\n{'='*60}") | |
| print(f"STRESS TEST - {duration_seconds} seconds") | |
| print(f"{'='*60}") | |
| start_time = time.time() | |
| request_count = 0 | |
| async with aiohttp.ClientSession() as session: | |
| while time.time() - start_time < duration_seconds: | |
| tasks = [] | |
| for _ in range(10): # 10 concurrent requests per batch | |
| task = self.make_request(session, "/health") | |
| tasks.append(task) | |
| results = await asyncio.gather(*tasks) | |
| self.results.extend(results) | |
| request_count += len(tasks) | |
| await asyncio.sleep(0.5) # 0.5s between batches | |
| total_time = time.time() - start_time | |
| requests_per_second = request_count / total_time | |
| print(f"\n⚡ Stress Test Results:") | |
| print(f" Duration: {total_time:.2f} seconds") | |
| print(f" Total Requests: {request_count}") | |
| print(f" Requests/Second: {requests_per_second:.2f}") | |
| # Analyze stress test results | |
| recent_results = self.results[-request_count:] | |
| successes = [r for r in recent_results if r.success] | |
| print(f" Success Rate: {len(successes)/len(recent_results)*100:.1f}%") | |
| def generate_report(self): | |
| """Generate comprehensive test report""" | |
| print(f"\n{'='*60}") | |
| print("COMPREHENSIVE LOAD TEST REPORT") | |
| print(f"{'='*60}") | |
| print(f"Generated: {datetime.now().isoformat()}") | |
| if not self.results: | |
| print("No test results available") | |
| return | |
| total_requests = len(self.results) | |
| successes = [r for r in self.results if r.success] | |
| failures = [r for r in self.results if not r.success] | |
| print(f"\n📊 Overall Statistics:") | |
| print(f" Total Requests: {total_requests}") | |
| print(f" ✓ Successful: {len(successes)} ({len(successes)/total_requests*100:.1f}%)") | |
| print(f" ✗ Failed: {len(failures)} ({len(failures)/total_requests*100:.1f}%)") | |
| all_latencies = [r.latency_ms for r in successes] | |
| if all_latencies: | |
| print(f"\n⏱ Global Latency Statistics:") | |
| print(f" Mean: {statistics.mean(all_latencies):.2f} ms") | |
| print(f" Median: {statistics.median(all_latencies):.2f} ms") | |
| print(f" Min: {min(all_latencies):.2f} ms") | |
| print(f" Max: {max(all_latencies):.2f} ms") | |
| # Breakdown by endpoint | |
| endpoints = set(r.endpoint for r in self.results) | |
| print(f"\n📍 Breakdown by Endpoint:") | |
| for endpoint in sorted(endpoints): | |
| endpoint_results = [r for r in self.results if r.endpoint == endpoint] | |
| endpoint_successes = [r for r in endpoint_results if r.success] | |
| print(f" {endpoint}:") | |
| print(f" Requests: {len(endpoint_results)}") | |
| print(f" Success Rate: {len(endpoint_successes)/len(endpoint_results)*100:.1f}%") | |
| if endpoint_successes: | |
| latencies = [r.latency_ms for r in endpoint_successes] | |
| print(f" Avg Latency: {statistics.mean(latencies):.2f} ms") | |
| print(f"\n✅ Load testing complete!") | |
| async def run_comprehensive_load_test(base_url: str = "http://localhost:7860"): | |
| """Run comprehensive load testing suite""" | |
| tester = MonitoringLoadTester(base_url) | |
| print(f"{'='*60}") | |
| print("MEDICAL AI PLATFORM - MONITORING LOAD TEST") | |
| print(f"{'='*60}") | |
| print(f"Target: {base_url}") | |
| print(f"Started: {datetime.now().isoformat()}") | |
| try: | |
| # Test 1: Health endpoint load | |
| await tester.test_health_endpoint(num_requests=100) | |
| # Test 2: Dashboard endpoint load | |
| await tester.test_dashboard_endpoint(num_requests=50) | |
| # Test 3: Admin endpoints | |
| # await tester.test_admin_endpoints() # Comment out if admin auth is required | |
| # Test 4: Monitoring accuracy | |
| await tester.verify_monitoring_accuracy() | |
| # Test 5: Cache effectiveness | |
| await tester.test_cache_effectiveness() | |
| # Test 6: Stress test | |
| await tester.stress_test(duration_seconds=30) | |
| # Generate final report | |
| tester.generate_report() | |
| print(f"\n{'='*60}") | |
| print("ALL TESTS COMPLETED SUCCESSFULLY") | |
| print(f"{'='*60}") | |
| except Exception as e: | |
| print(f"\n❌ Test failed with error: {str(e)}") | |
| raise | |
| if __name__ == "__main__": | |
| import sys | |
| # Get base URL from command line or use default | |
| base_url = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:7860" | |
| print(f"Starting load tests against: {base_url}") | |
| print(f"Ensure the server is running before continuing...\n") | |
| # Run the tests | |
| asyncio.run(run_comprehensive_load_test(base_url)) | |