Spaces:
Sleeping
Sleeping
Ha Trong Nguyen
test: Optimize test_batch_inference to actually perform concurrent stress testing using asyncio.gather
c7c0033 | import os | |
| import glob | |
| import time | |
| import csv | |
| import httpx | |
| import asyncio | |
| # Cấu hình | |
| API_URL = "https://htrnguyen-trafficflow-api.hf.space/api/predict" | |
| INPUT_FOLDER = "./test_images" | |
| OUTPUT_CSV = "evaluation_results.csv" | |
| CONCURRENT_REQUESTS = 10 | |
| async def predict_image( | |
| client: httpx.AsyncClient, image_path: str, sem: asyncio.Semaphore | |
| ): | |
| filename = os.path.basename(image_path) | |
| async with sem: | |
| try: | |
| with open(image_path, "rb") as f: | |
| files = {"file": (filename, f, "image/jpeg")} | |
| start_time = time.time() | |
| response = await client.post(API_URL, files=files) | |
| latency = time.time() - start_time | |
| if response.status_code == 200: | |
| data = response.json() | |
| pred = data.get("prediction", {}) | |
| print( | |
| f"[SUCCESS] {filename} -> {pred.get('total_count', 0)} xe ({round(latency, 2)}s)" | |
| ) | |
| return { | |
| "filename": filename, | |
| "status": "success", | |
| "total_count": pred.get("total_count", 0), | |
| "car_count": pred.get("car_count", 0), | |
| "motorbike_count": pred.get("motorbike_count", 0), | |
| "person_count": pred.get("person_count", 0), | |
| "density_level": pred.get("density_level", "unknown"), | |
| "latency_seconds": round(latency, 2), | |
| "error": "", | |
| } | |
| else: | |
| print(f"[FAILED] {filename} -> HTTP {response.status_code}") | |
| return { | |
| "filename": filename, | |
| "status": "failed", | |
| "total_count": 0, | |
| "car_count": 0, | |
| "motorbike_count": 0, | |
| "person_count": 0, | |
| "density_level": "error", | |
| "latency_seconds": round(latency, 2), | |
| "error": f"HTTP {response.status_code}: {response.text}", | |
| } | |
| except Exception as e: | |
| print(f"[ERROR] {filename} -> {str(e)}") | |
| return { | |
| "filename": filename, | |
| "status": "error", | |
| "total_count": 0, | |
| "car_count": 0, | |
| "motorbike_count": 0, | |
| "person_count": 0, | |
| "density_level": "error", | |
| "latency_seconds": 0, | |
| "error": str(e), | |
| } | |
| async def main(): | |
| if not os.path.exists(INPUT_FOLDER): | |
| print(f"[ERROR] Không tìm thấy thư mục: {INPUT_FOLDER}") | |
| print("[INFO] Vui lòng tạo thư mục này và chép ảnh (jpg, png) vào để bắt đầu.") | |
| os.makedirs(INPUT_FOLDER, exist_ok=True) | |
| return | |
| image_extensions = ["*.jpg", "*.jpeg", "*.png"] | |
| image_paths = [] | |
| for ext in image_extensions: | |
| image_paths.extend(glob.glob(os.path.join(INPUT_FOLDER, ext))) | |
| image_paths.extend(glob.glob(os.path.join(INPUT_FOLDER, ext.upper()))) | |
| if not image_paths: | |
| print(f"[WARNING] Không có ảnh nào trong thư mục {INPUT_FOLDER}.") | |
| return | |
| print(f"[INFO] Bắt đầu Stress Test {len(image_paths)} ảnh qua API: {API_URL}") | |
| print( | |
| f"[INFO] Đang giả lập {CONCURRENT_REQUESTS} requests đồng thời (Concurrent Processing)..." | |
| ) | |
| start_total_time = time.time() | |
| sem = asyncio.Semaphore(CONCURRENT_REQUESTS) | |
| transport = httpx.AsyncHTTPTransport(retries=3) | |
| async with httpx.AsyncClient(transport=transport, timeout=120.0) as client: | |
| tasks = [predict_image(client, path, sem) for path in image_paths] | |
| results = await asyncio.gather(*tasks) | |
| total_latency = time.time() - start_total_time | |
| fieldnames = [ | |
| "filename", | |
| "status", | |
| "total_count", | |
| "car_count", | |
| "motorbike_count", | |
| "person_count", | |
| "density_level", | |
| "latency_seconds", | |
| "error", | |
| ] | |
| with open(OUTPUT_CSV, mode="w", newline="", encoding="utf-8-sig") as f: | |
| writer = csv.DictWriter(f, fieldnames=fieldnames) | |
| writer.writeheader() | |
| writer.writerows(results) | |
| print(f"\n[INFO] Hoàn tất quá trình đánh giá. Báo cáo lưu tại: {OUTPUT_CSV}") | |
| successful_runs = [r for r in results if r["status"] == "success"] | |
| if successful_runs: | |
| avg_latency = sum(r["latency_seconds"] for r in successful_runs) / len( | |
| successful_runs | |
| ) | |
| print(f"\n[REPORT] THỐNG KÊ HIỆU NĂNG BATCH INFERENCE:") | |
| print(f" - Tổng số ảnh đã xử lý: {len(results)}") | |
| print( | |
| f" - Số lượng thành công: {len(successful_runs)}/{len(results)} ({(len(successful_runs)/len(results))*100:.1f}%)" | |
| ) | |
| print( | |
| f" - Tổng thời gian hoàn thành (Wall-clock time): {total_latency:.2f} giây" | |
| ) | |
| print( | |
| f" - Thông lượng (Throughput): {len(results)/total_latency:.2f} ảnh/giây" | |
| ) | |
| print( | |
| f" - Thời gian phản hồi trung bình (Per Image): {avg_latency:.2f} giây/ảnh" | |
| ) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |