Geoglyph_SAM2 / test_queue.py
JuanHernandez-uc
add SAM2 segmentation
7129113
Raw
History Blame Contribute Delete
4.46 kB
# test_queue.py
# Simulate several geologists submitting crop GeoTIFF jobs concurrently.
import os
import threading
import time
import requests
URL = "http://localhost:8000"
# Use a real georeferenced crop GeoTIFF here.
# For example, one generated by your QGIS plugin:
# C:\Users\juan_\AppData\Local\Temp\sam2_crop_XXXXXXXXXX.tif
CROP_PATH = os.path.abspath("sam2_crop_test.tif")
def geologist_client(geologist_id: int):
print(f"[Geólogo {geologist_id}] Enviando crop GeoTIFF al servidor...")
if not os.path.isfile(CROP_PATH):
print(f"[Geólogo {geologist_id}] No existe el crop: {CROP_PATH}")
return
try:
with open(CROP_PATH, "rb") as f:
files = {
"crop": (
os.path.basename(CROP_PATH),
f,
"image/tiff",
)
}
data = {
"device": "cpu",
"use_clahe": "true",
"clahe_clip": "4.0",
"clahe_grid": "6",
"sam2_points_per_side": "32",
"sam2_points_per_batch": "32",
"sam2_pred_iou_thresh": "0.35",
"sam2_stability_score_thresh": "0.65",
"filter_min_area_px": "1000",
"filter_max_area_frac": "0.20",
"filter_min_iou": "0.35",
"filter_min_stability": "0.65",
"filter_border_margin": "10",
"max_crop_side": "4096",
}
resp = requests.post(
f"{URL}/process",
files=files,
data=data,
)
resp.raise_for_status()
except Exception as e:
print(f"[Geólogo {geologist_id}] Error enviando tarea: {e}")
return
data = resp.json()
task_id = data["task_id"]
print(f"[Geólogo {geologist_id}] Tarea aceptada. ID: {task_id}")
last_status = None
while True:
try:
status_resp = requests.get(f"{URL}/status/{task_id}")
status_resp.raise_for_status()
s_data = status_resp.json()
status_value = s_data["status"]
if status_value == "pending":
pos = s_data.get("queue_position")
current_status = f"pending_pos_{pos}"
if current_status != last_status:
print(
f"[Geólogo {geologist_id}] Estado: En cola | Posición: {pos}"
)
last_status = current_status
elif status_value == "processing":
if last_status != "processing":
print(
f"[Geólogo {geologist_id}] Estado: Procesando inferencia SAM2..."
)
last_status = "processing"
elif status_value == "completed":
n_masks = s_data.get("n_masks")
output_exists = s_data.get("output_exists")
download_url = s_data.get("download_url")
print(
f"[Geólogo {geologist_id}] COMPLETO | "
f"n_masks={n_masks} | output_exists={output_exists}"
)
if output_exists and download_url:
out_path = f"downloaded_result_{geologist_id}_{task_id}.gpkg"
download_resp = requests.get(f"{URL}{download_url}")
download_resp.raise_for_status()
with open(out_path, "wb") as out:
out.write(download_resp.content)
print(
f"[Geólogo {geologist_id}] Resultado descargado: {out_path}"
)
break
elif status_value == "failed":
print(
f"[Geólogo {geologist_id}] FALLÓ: {s_data.get('error')}"
)
break
except Exception as e:
print(f"[Geólogo {geologist_id}] Error consultando estado: {e}")
break
time.sleep(0.5)
if __name__ == "__main__":
print("Iniciando prueba con 5 geólogos concurrentes...\n")
threads = []
for i in range(1, 6):
t = threading.Thread(
target=geologist_client,
args=(i,),
)
threads.append(t)
t.start()
for t in threads:
t.join()
print("\nPrueba finalizada.")