Spaces:
Sleeping
Sleeping
| # test_queue.py | |
| # Simulate several geologists submitting crop GeoTIFF jobs concurrently. | |
| import os | |
| import threading | |
| import time | |
| import requests | |
| URL = "http://localhost:8000" | |
| # Use a real georeferenced crop GeoTIFF here. | |
| # For example, one generated by your QGIS plugin: | |
| # C:\Users\juan_\AppData\Local\Temp\sam2_crop_XXXXXXXXXX.tif | |
| CROP_PATH = os.path.abspath("sam2_crop_test.tif") | |
| def geologist_client(geologist_id: int): | |
| print(f"[Geólogo {geologist_id}] Enviando crop GeoTIFF al servidor...") | |
| if not os.path.isfile(CROP_PATH): | |
| print(f"[Geólogo {geologist_id}] No existe el crop: {CROP_PATH}") | |
| return | |
| try: | |
| with open(CROP_PATH, "rb") as f: | |
| files = { | |
| "crop": ( | |
| os.path.basename(CROP_PATH), | |
| f, | |
| "image/tiff", | |
| ) | |
| } | |
| data = { | |
| "device": "cpu", | |
| "use_clahe": "true", | |
| "clahe_clip": "4.0", | |
| "clahe_grid": "6", | |
| "sam2_points_per_side": "32", | |
| "sam2_points_per_batch": "32", | |
| "sam2_pred_iou_thresh": "0.35", | |
| "sam2_stability_score_thresh": "0.65", | |
| "filter_min_area_px": "1000", | |
| "filter_max_area_frac": "0.20", | |
| "filter_min_iou": "0.35", | |
| "filter_min_stability": "0.65", | |
| "filter_border_margin": "10", | |
| "max_crop_side": "4096", | |
| } | |
| resp = requests.post( | |
| f"{URL}/process", | |
| files=files, | |
| data=data, | |
| ) | |
| resp.raise_for_status() | |
| except Exception as e: | |
| print(f"[Geólogo {geologist_id}] Error enviando tarea: {e}") | |
| return | |
| data = resp.json() | |
| task_id = data["task_id"] | |
| print(f"[Geólogo {geologist_id}] Tarea aceptada. ID: {task_id}") | |
| last_status = None | |
| while True: | |
| try: | |
| status_resp = requests.get(f"{URL}/status/{task_id}") | |
| status_resp.raise_for_status() | |
| s_data = status_resp.json() | |
| status_value = s_data["status"] | |
| if status_value == "pending": | |
| pos = s_data.get("queue_position") | |
| current_status = f"pending_pos_{pos}" | |
| if current_status != last_status: | |
| print( | |
| f"[Geólogo {geologist_id}] Estado: En cola | Posición: {pos}" | |
| ) | |
| last_status = current_status | |
| elif status_value == "processing": | |
| if last_status != "processing": | |
| print( | |
| f"[Geólogo {geologist_id}] Estado: Procesando inferencia SAM2..." | |
| ) | |
| last_status = "processing" | |
| elif status_value == "completed": | |
| n_masks = s_data.get("n_masks") | |
| output_exists = s_data.get("output_exists") | |
| download_url = s_data.get("download_url") | |
| print( | |
| f"[Geólogo {geologist_id}] COMPLETO | " | |
| f"n_masks={n_masks} | output_exists={output_exists}" | |
| ) | |
| if output_exists and download_url: | |
| out_path = f"downloaded_result_{geologist_id}_{task_id}.gpkg" | |
| download_resp = requests.get(f"{URL}{download_url}") | |
| download_resp.raise_for_status() | |
| with open(out_path, "wb") as out: | |
| out.write(download_resp.content) | |
| print( | |
| f"[Geólogo {geologist_id}] Resultado descargado: {out_path}" | |
| ) | |
| break | |
| elif status_value == "failed": | |
| print( | |
| f"[Geólogo {geologist_id}] FALLÓ: {s_data.get('error')}" | |
| ) | |
| break | |
| except Exception as e: | |
| print(f"[Geólogo {geologist_id}] Error consultando estado: {e}") | |
| break | |
| time.sleep(0.5) | |
| if __name__ == "__main__": | |
| print("Iniciando prueba con 5 geólogos concurrentes...\n") | |
| threads = [] | |
| for i in range(1, 6): | |
| t = threading.Thread( | |
| target=geologist_client, | |
| args=(i,), | |
| ) | |
| threads.append(t) | |
| t.start() | |
| for t in threads: | |
| t.join() | |
| print("\nPrueba finalizada.") |