""" Load Testing Script for Medical AI Platform Monitoring Infrastructure Tests system performance, monitoring accuracy, and error handling under stress Requirements: - Tests monitoring middleware performance impact - Validates cache effectiveness under load - Verifies error rate tracking accuracy - Confirms alert system responsiveness - Measures latency tracking precision """ import asyncio import aiohttp import time import json from typing import List, Dict, Any from dataclasses import dataclass from datetime import datetime import statistics @dataclass class LoadTestResult: """Result from a single request""" success: bool latency_ms: float status_code: int endpoint: str timestamp: float error_message: str = None class MonitoringLoadTester: """Load tester for monitoring infrastructure""" def __init__(self, base_url: str = "http://localhost:7860"): self.base_url = base_url self.results: List[LoadTestResult] = [] async def make_request( self, session: aiohttp.ClientSession, endpoint: str, method: str = "GET", data: Dict = None ) -> LoadTestResult: """Make a single HTTP request and measure performance""" start_time = time.time() url = f"{self.base_url}{endpoint}" try: if method == "GET": async with session.get(url) as response: await response.text() latency_ms = (time.time() - start_time) * 1000 return LoadTestResult( success=response.status == 200, latency_ms=latency_ms, status_code=response.status, endpoint=endpoint, timestamp=time.time() ) elif method == "POST": async with session.post(url, json=data) as response: await response.text() latency_ms = (time.time() - start_time) * 1000 return LoadTestResult( success=response.status == 200, latency_ms=latency_ms, status_code=response.status, endpoint=endpoint, timestamp=time.time() ) except Exception as e: latency_ms = (time.time() - start_time) * 1000 return LoadTestResult( success=False, latency_ms=latency_ms, status_code=0, endpoint=endpoint, timestamp=time.time(), error_message=str(e) ) async def run_concurrent_requests( self, endpoint: str, num_requests: int, concurrent_workers: int = 10 ): """Run multiple concurrent requests to an endpoint""" print(f"\n{'='*60}") print(f"Testing: {endpoint}") print(f"Requests: {num_requests}, Concurrent Workers: {concurrent_workers}") print(f"{'='*60}") async with aiohttp.ClientSession() as session: tasks = [] for i in range(num_requests): task = self.make_request(session, endpoint) tasks.append(task) # Limit concurrency if len(tasks) >= concurrent_workers or i == num_requests - 1: results = await asyncio.gather(*tasks) self.results.extend(results) tasks = [] # Small delay to avoid overwhelming the server await asyncio.sleep(0.1) # Analyze results for this endpoint self.analyze_endpoint_results(endpoint) def analyze_endpoint_results(self, endpoint: str): """Analyze results for a specific endpoint""" endpoint_results = [r for r in self.results if r.endpoint == endpoint] if not endpoint_results: print(f"No results for {endpoint}") return successes = [r for r in endpoint_results if r.success] failures = [r for r in endpoint_results if not r.success] latencies = [r.latency_ms for r in successes] print(f"\nšŸ“Š Results for {endpoint}:") print(f" Total Requests: {len(endpoint_results)}") print(f" āœ“ Successful: {len(successes)} ({len(successes)/len(endpoint_results)*100:.1f}%)") print(f" āœ— Failed: {len(failures)} ({len(failures)/len(endpoint_results)*100:.1f}%)") if latencies: print(f"\nā± Latency Statistics:") print(f" Mean: {statistics.mean(latencies):.2f} ms") print(f" Median: {statistics.median(latencies):.2f} ms") print(f" Min: {min(latencies):.2f} ms") print(f" Max: {max(latencies):.2f} ms") print(f" Std Dev: {statistics.stdev(latencies) if len(latencies) > 1 else 0:.2f} ms") if len(latencies) >= 10: sorted_latencies = sorted(latencies) p95_index = int(len(sorted_latencies) * 0.95) p99_index = int(len(sorted_latencies) * 0.99) print(f" P95: {sorted_latencies[p95_index]:.2f} ms") print(f" P99: {sorted_latencies[p99_index]:.2f} ms") if failures: print(f"\n⚠ Sample Errors:") for failure in failures[:3]: print(f" Status: {failure.status_code}, Error: {failure.error_message}") async def test_health_endpoint(self, num_requests: int = 100): """Test health check endpoint""" await self.run_concurrent_requests("/health", num_requests, concurrent_workers=20) async def test_dashboard_endpoint(self, num_requests: int = 50): """Test dashboard endpoint (more intensive)""" await self.run_concurrent_requests("/health/dashboard", num_requests, concurrent_workers=10) async def test_admin_endpoints(self): """Test admin endpoints""" # Test cache statistics await self.run_concurrent_requests("/admin/cache/statistics", num_requests=30, concurrent_workers=5) # Test metrics await self.run_concurrent_requests("/admin/metrics", num_requests=30, concurrent_workers=5) async def verify_monitoring_accuracy(self): """Verify that monitoring system accurately tracks requests""" print(f"\n{'='*60}") print("VERIFYING MONITORING ACCURACY") print(f"{'='*60}") # Get initial dashboard state async with aiohttp.ClientSession() as session: async with session.get(f"{self.base_url}/health/dashboard") as response: initial_data = await response.json() initial_requests = initial_data['system']['total_requests'] print(f"Initial request count: {initial_requests}") # Make exactly 50 requests print(f"\nMaking 50 test requests...") await self.run_concurrent_requests("/health", num_requests=50, concurrent_workers=10) # Wait for monitoring to update await asyncio.sleep(2) # Check final dashboard state async with aiohttp.ClientSession() as session: async with session.get(f"{self.base_url}/health/dashboard") as response: final_data = await response.json() final_requests = final_data['system']['total_requests'] print(f"Final request count: {final_requests}") actual_increase = final_requests - initial_requests expected_increase = 50 print(f"\nšŸ“ˆ Monitoring Accuracy:") print(f" Expected increase: {expected_increase}") print(f" Actual increase: {actual_increase}") print(f" Accuracy: {(actual_increase/expected_increase*100):.1f}%") if actual_increase >= expected_increase * 0.95: print(f" āœ“ Monitoring is accurately tracking requests") else: print(f" ⚠ Monitoring may have tracking issues") async def test_cache_effectiveness(self): """Test cache effectiveness under repeated requests""" print(f"\n{'='*60}") print("TESTING CACHE EFFECTIVENESS") print(f"{'='*60}") # Get initial cache stats async with aiohttp.ClientSession() as session: async with session.get(f"{self.base_url}/health/dashboard") as response: initial_data = await response.json() initial_hits = initial_data['cache']['hits'] initial_misses = initial_data['cache']['misses'] initial_hit_rate = initial_data['cache']['hit_rate'] print(f"Initial cache state:") print(f" Hits: {initial_hits}") print(f" Misses: {initial_misses}") print(f" Hit Rate: {(initial_hit_rate * 100):.1f}%") # Make repeated requests to same endpoint (should benefit from caching) print(f"\nMaking 100 requests to test caching...") await self.run_concurrent_requests("/health/dashboard", num_requests=100, concurrent_workers=10) # Wait for cache to update await asyncio.sleep(2) # Check final cache stats async with aiohttp.ClientSession() as session: async with session.get(f"{self.base_url}/health/dashboard") as response: final_data = await response.json() final_hits = final_data['cache']['hits'] final_misses = final_data['cache']['misses'] final_hit_rate = final_data['cache']['hit_rate'] print(f"\nFinal cache state:") print(f" Hits: {final_hits}") print(f" Misses: {final_misses}") print(f" Hit Rate: {(final_hit_rate * 100):.1f}%") print(f"\nšŸ“Š Cache Performance:") print(f" Hit increase: {final_hits - initial_hits}") print(f" Miss increase: {final_misses - initial_misses}") print(f" Current hit rate: {(final_hit_rate * 100):.1f}%") async def stress_test(self, duration_seconds: int = 30): """Run sustained load test""" print(f"\n{'='*60}") print(f"STRESS TEST - {duration_seconds} seconds") print(f"{'='*60}") start_time = time.time() request_count = 0 async with aiohttp.ClientSession() as session: while time.time() - start_time < duration_seconds: tasks = [] for _ in range(10): # 10 concurrent requests per batch task = self.make_request(session, "/health") tasks.append(task) results = await asyncio.gather(*tasks) self.results.extend(results) request_count += len(tasks) await asyncio.sleep(0.5) # 0.5s between batches total_time = time.time() - start_time requests_per_second = request_count / total_time print(f"\n⚔ Stress Test Results:") print(f" Duration: {total_time:.2f} seconds") print(f" Total Requests: {request_count}") print(f" Requests/Second: {requests_per_second:.2f}") # Analyze stress test results recent_results = self.results[-request_count:] successes = [r for r in recent_results if r.success] print(f" Success Rate: {len(successes)/len(recent_results)*100:.1f}%") def generate_report(self): """Generate comprehensive test report""" print(f"\n{'='*60}") print("COMPREHENSIVE LOAD TEST REPORT") print(f"{'='*60}") print(f"Generated: {datetime.now().isoformat()}") if not self.results: print("No test results available") return total_requests = len(self.results) successes = [r for r in self.results if r.success] failures = [r for r in self.results if not r.success] print(f"\nšŸ“Š Overall Statistics:") print(f" Total Requests: {total_requests}") print(f" āœ“ Successful: {len(successes)} ({len(successes)/total_requests*100:.1f}%)") print(f" āœ— Failed: {len(failures)} ({len(failures)/total_requests*100:.1f}%)") all_latencies = [r.latency_ms for r in successes] if all_latencies: print(f"\nā± Global Latency Statistics:") print(f" Mean: {statistics.mean(all_latencies):.2f} ms") print(f" Median: {statistics.median(all_latencies):.2f} ms") print(f" Min: {min(all_latencies):.2f} ms") print(f" Max: {max(all_latencies):.2f} ms") # Breakdown by endpoint endpoints = set(r.endpoint for r in self.results) print(f"\nšŸ“ Breakdown by Endpoint:") for endpoint in sorted(endpoints): endpoint_results = [r for r in self.results if r.endpoint == endpoint] endpoint_successes = [r for r in endpoint_results if r.success] print(f" {endpoint}:") print(f" Requests: {len(endpoint_results)}") print(f" Success Rate: {len(endpoint_successes)/len(endpoint_results)*100:.1f}%") if endpoint_successes: latencies = [r.latency_ms for r in endpoint_successes] print(f" Avg Latency: {statistics.mean(latencies):.2f} ms") print(f"\nāœ… Load testing complete!") async def run_comprehensive_load_test(base_url: str = "http://localhost:7860"): """Run comprehensive load testing suite""" tester = MonitoringLoadTester(base_url) print(f"{'='*60}") print("MEDICAL AI PLATFORM - MONITORING LOAD TEST") print(f"{'='*60}") print(f"Target: {base_url}") print(f"Started: {datetime.now().isoformat()}") try: # Test 1: Health endpoint load await tester.test_health_endpoint(num_requests=100) # Test 2: Dashboard endpoint load await tester.test_dashboard_endpoint(num_requests=50) # Test 3: Admin endpoints # await tester.test_admin_endpoints() # Comment out if admin auth is required # Test 4: Monitoring accuracy await tester.verify_monitoring_accuracy() # Test 5: Cache effectiveness await tester.test_cache_effectiveness() # Test 6: Stress test await tester.stress_test(duration_seconds=30) # Generate final report tester.generate_report() print(f"\n{'='*60}") print("ALL TESTS COMPLETED SUCCESSFULLY") print(f"{'='*60}") except Exception as e: print(f"\nāŒ Test failed with error: {str(e)}") raise if __name__ == "__main__": import sys # Get base URL from command line or use default base_url = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:7860" print(f"Starting load tests against: {base_url}") print(f"Ensure the server is running before continuing...\n") # Run the tests asyncio.run(run_comprehensive_load_test(base_url))