Spaces:
Build error
Build error
| import asyncio | |
| import httpx | |
| import time | |
| BASE_URL = "http://127.0.0.1:8000/api/v1/insightfy" | |
| async def register_user(session, email, mobile, name): | |
| response = await session.post( | |
| f"{BASE_URL}/register", | |
| json={"email": email, "mobile": mobile, "name": name}, | |
| ) | |
| return response.status_code, response.text | |
| async def login_user(session, email, otp): | |
| response = await session.post( | |
| f"{BASE_URL}/login", | |
| json={"email": email, "otp": otp}, | |
| ) | |
| return response.status_code, response.text | |
| async def load_test_auth(num_requests): | |
| async with httpx.AsyncClient() as session: | |
| tasks = [] | |
| for i in range(num_requests // 2): | |
| tasks.append(register_user(session, f"user{i}@example.com", f"+141555{i:04}", "Test User")) | |
| tasks.append(login_user(session, f"user{i}@example.com", "123456")) | |
| start_time = time.time() | |
| results = await asyncio.gather(*tasks) | |
| end_time = time.time() | |
| total_time = end_time - start_time | |
| successful_requests = sum(1 for status, _ in results if status == 200) | |
| failed_requests = num_requests - successful_requests | |
| print(f"Total Requests: {num_requests}") | |
| print(f"Successful Requests: {successful_requests}") | |
| print(f"Failed Requests: {failed_requests}") | |
| print(f"Total Time: {total_time:.2f} seconds") | |
| print(f"Requests per Second: {num_requests / total_time:.2f}") | |
| if __name__ == "__main__": | |
| num_requests = 1000 # Number of requests | |
| asyncio.run(load_test_auth(num_requests)) |