""" Load Testing Script for DanceDynamics Tests system performance under various loads """ import asyncio import aiohttp import time import statistics from pathlib import Path from typing import List, Dict import json class LoadTester: """Load testing utility for the API""" def __init__(self, base_url: str = "http://localhost:7860"): self.base_url = base_url self.results: List[Dict] = [] async def upload_video(self, session: aiohttp.ClientSession, video_path: Path) -> Dict: """Upload a video and measure response time""" start_time = time.time() try: with open(video_path, 'rb') as f: data = aiohttp.FormData() data.add_field('file', f, filename=video_path.name, content_type='video/mp4') async with session.post(f"{self.base_url}/api/upload", data=data) as response: response_time = time.time() - start_time result = await response.json() return { 'endpoint': 'upload', 'status': response.status, 'response_time': response_time, 'success': response.status == 200, 'session_id': result.get('session_id') if response.status == 200 else None } except Exception as e: return { 'endpoint': 'upload', 'status': 0, 'response_time': time.time() - start_time, 'success': False, 'error': str(e) } async def health_check(self, session: aiohttp.ClientSession) -> Dict: """Check API health""" start_time = time.time() try: async with session.get(f"{self.base_url}/health") as response: response_time = time.time() - start_time return { 'endpoint': 'health', 'status': response.status, 'response_time': response_time, 'success': response.status == 200 } except Exception as e: return { 'endpoint': 'health', 'status': 0, 'response_time': time.time() - start_time, 'success': False, 'error': str(e) } async def concurrent_uploads(self, video_path: Path, num_concurrent: int = 5): """Test concurrent uploads""" print(f"\n๐Ÿ”„ Testing {num_concurrent} concurrent uploads...") async with aiohttp.ClientSession() as session: tasks = [self.upload_video(session, video_path) for _ in range(num_concurrent)] results = await asyncio.gather(*tasks) self.results.extend(results) # Calculate statistics response_times = [r['response_time'] for r in results if r['success']] success_rate = sum(1 for r in results if r['success']) / len(results) * 100 print(f"\n๐Ÿ“Š Results:") print(f" Success Rate: {success_rate:.1f}%") if response_times: print(f" Avg Response Time: {statistics.mean(response_times):.2f}s") print(f" Min Response Time: {min(response_times):.2f}s") print(f" Max Response Time: {max(response_times):.2f}s") print(f" Median Response Time: {statistics.median(response_times):.2f}s") return results async def stress_test(self, video_path: Path, duration_seconds: int = 60): """Stress test by continuously uploading for a duration""" print(f"\nโšก Stress testing for {duration_seconds} seconds...") start_time = time.time() upload_count = 0 errors = 0 async with aiohttp.ClientSession() as session: while time.time() - start_time < duration_seconds: result = await self.upload_video(session, video_path) self.results.append(result) if result['success']: upload_count += 1 else: errors += 1 # Brief pause between requests await asyncio.sleep(0.1) total_time = time.time() - start_time requests_per_second = upload_count / total_time print(f"\n๐Ÿ“Š Stress Test Results:") print(f" Total Requests: {upload_count + errors}") print(f" Successful: {upload_count}") print(f" Failed: {errors}") print(f" Duration: {total_time:.2f}s") print(f" Requests/Second: {requests_per_second:.2f}") async def latency_test(self, num_requests: int = 100): """Test API latency with health checks""" print(f"\nโšก Testing latency with {num_requests} health checks...") async with aiohttp.ClientSession() as session: tasks = [self.health_check(session) for _ in range(num_requests)] results = await asyncio.gather(*tasks) response_times = [r['response_time'] for r in results if r['success']] if response_times: print(f"\n๐Ÿ“Š Latency Results:") print(f" Average: {statistics.mean(response_times)*1000:.2f}ms") print(f" Min: {min(response_times)*1000:.2f}ms") print(f" Max: {max(response_times)*1000:.2f}ms") print(f" Median: {statistics.median(response_times)*1000:.2f}ms") print(f" P95: {sorted(response_times)[int(len(response_times)*0.95)]*1000:.2f}ms") print(f" P99: {sorted(response_times)[int(len(response_times)*0.99)]*1000:.2f}ms") def generate_report(self, output_path: str = "load_test_report.json"): """Generate JSON report of test results""" # Calculate overall statistics upload_results = [r for r in self.results if r['endpoint'] == 'upload'] health_results = [r for r in self.results if r['endpoint'] == 'health'] report = { 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), 'total_requests': len(self.results), 'summary': { 'uploads': { 'total': len(upload_results), 'successful': sum(1 for r in upload_results if r['success']), 'failed': sum(1 for r in upload_results if not r['success']), 'avg_response_time': statistics.mean([r['response_time'] for r in upload_results if r['success']]) if upload_results else 0 }, 'health_checks': { 'total': len(health_results), 'successful': sum(1 for r in health_results if r['success']), 'failed': sum(1 for r in health_results if not r['success']), 'avg_response_time': statistics.mean([r['response_time'] for r in health_results if r['success']]) if health_results else 0 } }, 'detailed_results': self.results } with open(output_path, 'w') as f: json.dump(report, f, indent=2) print(f"\n๐Ÿ“„ Report saved to {output_path}") async def main(): """Run load tests""" print("=" * 60) print("๐Ÿงช DanceDynamics - Load Testing") print("=" * 60) # Initialize tester tester = LoadTester() # Check if test video exists test_video = Path("sample_videos/test_dance.mp4") if not test_video.exists(): print(f"\nโŒ Test video not found: {test_video}") print(" Please place a test video in sample_videos/test_dance.mp4") return print(f"\nโœ… Using test video: {test_video}") print(f" Size: {test_video.stat().st_size / 1024 / 1024:.2f} MB") # Test 1: Latency Test print("\n" + "="*60) print("TEST 1: API Latency") print("="*60) await tester.latency_test(num_requests=100) # Test 2: Concurrent Uploads (Light) print("\n" + "="*60) print("TEST 2: Concurrent Uploads (Light Load)") print("="*60) await tester.concurrent_uploads(test_video, num_concurrent=3) # Test 3: Concurrent Uploads (Medium) print("\n" + "="*60) print("TEST 3: Concurrent Uploads (Medium Load)") print("="*60) await tester.concurrent_uploads(test_video, num_concurrent=5) # Test 4: Concurrent Uploads (Heavy) print("\n" + "="*60) print("TEST 4: Concurrent Uploads (Heavy Load)") print("="*60) await tester.concurrent_uploads(test_video, num_concurrent=10) # Test 5: Stress Test (Optional - commented out by default) # print("\n" + "="*60) # print("TEST 5: Stress Test (60 seconds)") # print("="*60) # await tester.stress_test(test_video, duration_seconds=60) # Generate report print("\n" + "="*60) print("๐Ÿ“Š Generating Report") print("="*60) tester.generate_report() print("\nโœ… Load testing complete!") print("="*60) if __name__ == "__main__": asyncio.run(main())