trafficflow-api / test_batch_inference.py
Ha Trong Nguyen
test: Optimize test_batch_inference to actually perform concurrent stress testing using asyncio.gather
c7c0033
import os
import glob
import time
import csv
import httpx
import asyncio
# Cấu hình
API_URL = "https://htrnguyen-trafficflow-api.hf.space/api/predict"
INPUT_FOLDER = "./test_images"
OUTPUT_CSV = "evaluation_results.csv"
CONCURRENT_REQUESTS = 10
async def predict_image(
client: httpx.AsyncClient, image_path: str, sem: asyncio.Semaphore
):
filename = os.path.basename(image_path)
async with sem:
try:
with open(image_path, "rb") as f:
files = {"file": (filename, f, "image/jpeg")}
start_time = time.time()
response = await client.post(API_URL, files=files)
latency = time.time() - start_time
if response.status_code == 200:
data = response.json()
pred = data.get("prediction", {})
print(
f"[SUCCESS] {filename} -> {pred.get('total_count', 0)} xe ({round(latency, 2)}s)"
)
return {
"filename": filename,
"status": "success",
"total_count": pred.get("total_count", 0),
"car_count": pred.get("car_count", 0),
"motorbike_count": pred.get("motorbike_count", 0),
"person_count": pred.get("person_count", 0),
"density_level": pred.get("density_level", "unknown"),
"latency_seconds": round(latency, 2),
"error": "",
}
else:
print(f"[FAILED] {filename} -> HTTP {response.status_code}")
return {
"filename": filename,
"status": "failed",
"total_count": 0,
"car_count": 0,
"motorbike_count": 0,
"person_count": 0,
"density_level": "error",
"latency_seconds": round(latency, 2),
"error": f"HTTP {response.status_code}: {response.text}",
}
except Exception as e:
print(f"[ERROR] {filename} -> {str(e)}")
return {
"filename": filename,
"status": "error",
"total_count": 0,
"car_count": 0,
"motorbike_count": 0,
"person_count": 0,
"density_level": "error",
"latency_seconds": 0,
"error": str(e),
}
async def main():
if not os.path.exists(INPUT_FOLDER):
print(f"[ERROR] Không tìm thấy thư mục: {INPUT_FOLDER}")
print("[INFO] Vui lòng tạo thư mục này và chép ảnh (jpg, png) vào để bắt đầu.")
os.makedirs(INPUT_FOLDER, exist_ok=True)
return
image_extensions = ["*.jpg", "*.jpeg", "*.png"]
image_paths = []
for ext in image_extensions:
image_paths.extend(glob.glob(os.path.join(INPUT_FOLDER, ext)))
image_paths.extend(glob.glob(os.path.join(INPUT_FOLDER, ext.upper())))
if not image_paths:
print(f"[WARNING] Không có ảnh nào trong thư mục {INPUT_FOLDER}.")
return
print(f"[INFO] Bắt đầu Stress Test {len(image_paths)} ảnh qua API: {API_URL}")
print(
f"[INFO] Đang giả lập {CONCURRENT_REQUESTS} requests đồng thời (Concurrent Processing)..."
)
start_total_time = time.time()
sem = asyncio.Semaphore(CONCURRENT_REQUESTS)
transport = httpx.AsyncHTTPTransport(retries=3)
async with httpx.AsyncClient(transport=transport, timeout=120.0) as client:
tasks = [predict_image(client, path, sem) for path in image_paths]
results = await asyncio.gather(*tasks)
total_latency = time.time() - start_total_time
fieldnames = [
"filename",
"status",
"total_count",
"car_count",
"motorbike_count",
"person_count",
"density_level",
"latency_seconds",
"error",
]
with open(OUTPUT_CSV, mode="w", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
print(f"\n[INFO] Hoàn tất quá trình đánh giá. Báo cáo lưu tại: {OUTPUT_CSV}")
successful_runs = [r for r in results if r["status"] == "success"]
if successful_runs:
avg_latency = sum(r["latency_seconds"] for r in successful_runs) / len(
successful_runs
)
print(f"\n[REPORT] THỐNG KÊ HIỆU NĂNG BATCH INFERENCE:")
print(f" - Tổng số ảnh đã xử lý: {len(results)}")
print(
f" - Số lượng thành công: {len(successful_runs)}/{len(results)} ({(len(successful_runs)/len(results))*100:.1f}%)"
)
print(
f" - Tổng thời gian hoàn thành (Wall-clock time): {total_latency:.2f} giây"
)
print(
f" - Thông lượng (Throughput): {len(results)/total_latency:.2f} ảnh/giây"
)
print(
f" - Thời gian phản hồi trung bình (Per Image): {avg_latency:.2f} giây/ảnh"
)
if __name__ == "__main__":
asyncio.run(main())