# test_queue.py # Simulate several geologists submitting crop GeoTIFF jobs concurrently. import os import threading import time import requests URL = "http://localhost:8000" # Use a real georeferenced crop GeoTIFF here. # For example, one generated by your QGIS plugin: # C:\Users\juan_\AppData\Local\Temp\sam2_crop_XXXXXXXXXX.tif CROP_PATH = os.path.abspath("sam2_crop_test.tif") def geologist_client(geologist_id: int): print(f"[Geólogo {geologist_id}] Enviando crop GeoTIFF al servidor...") if not os.path.isfile(CROP_PATH): print(f"[Geólogo {geologist_id}] No existe el crop: {CROP_PATH}") return try: with open(CROP_PATH, "rb") as f: files = { "crop": ( os.path.basename(CROP_PATH), f, "image/tiff", ) } data = { "device": "cpu", "use_clahe": "true", "clahe_clip": "4.0", "clahe_grid": "6", "sam2_points_per_side": "32", "sam2_points_per_batch": "32", "sam2_pred_iou_thresh": "0.35", "sam2_stability_score_thresh": "0.65", "filter_min_area_px": "1000", "filter_max_area_frac": "0.20", "filter_min_iou": "0.35", "filter_min_stability": "0.65", "filter_border_margin": "10", "max_crop_side": "4096", } resp = requests.post( f"{URL}/process", files=files, data=data, ) resp.raise_for_status() except Exception as e: print(f"[Geólogo {geologist_id}] Error enviando tarea: {e}") return data = resp.json() task_id = data["task_id"] print(f"[Geólogo {geologist_id}] Tarea aceptada. ID: {task_id}") last_status = None while True: try: status_resp = requests.get(f"{URL}/status/{task_id}") status_resp.raise_for_status() s_data = status_resp.json() status_value = s_data["status"] if status_value == "pending": pos = s_data.get("queue_position") current_status = f"pending_pos_{pos}" if current_status != last_status: print( f"[Geólogo {geologist_id}] Estado: En cola | Posición: {pos}" ) last_status = current_status elif status_value == "processing": if last_status != "processing": print( f"[Geólogo {geologist_id}] Estado: Procesando inferencia SAM2..." ) last_status = "processing" elif status_value == "completed": n_masks = s_data.get("n_masks") output_exists = s_data.get("output_exists") download_url = s_data.get("download_url") print( f"[Geólogo {geologist_id}] COMPLETO | " f"n_masks={n_masks} | output_exists={output_exists}" ) if output_exists and download_url: out_path = f"downloaded_result_{geologist_id}_{task_id}.gpkg" download_resp = requests.get(f"{URL}{download_url}") download_resp.raise_for_status() with open(out_path, "wb") as out: out.write(download_resp.content) print( f"[Geólogo {geologist_id}] Resultado descargado: {out_path}" ) break elif status_value == "failed": print( f"[Geólogo {geologist_id}] FALLÓ: {s_data.get('error')}" ) break except Exception as e: print(f"[Geólogo {geologist_id}] Error consultando estado: {e}") break time.sleep(0.5) if __name__ == "__main__": print("Iniciando prueba con 5 geólogos concurrentes...\n") threads = [] for i in range(1, 6): t = threading.Thread( target=geologist_client, args=(i,), ) threads.append(t) t.start() for t in threads: t.join() print("\nPrueba finalizada.")