File size: 4,462 Bytes
7129113
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# test_queue.py
# Simulate several geologists submitting crop GeoTIFF jobs concurrently.

import os
import threading
import time

import requests

URL = "http://localhost:8000"

# Use a real georeferenced crop GeoTIFF here.
# For example, one generated by your QGIS plugin:
# C:\Users\juan_\AppData\Local\Temp\sam2_crop_XXXXXXXXXX.tif
CROP_PATH = os.path.abspath("sam2_crop_test.tif")


def geologist_client(geologist_id: int):
    print(f"[Geólogo {geologist_id}] Enviando crop GeoTIFF al servidor...")

    if not os.path.isfile(CROP_PATH):
        print(f"[Geólogo {geologist_id}] No existe el crop: {CROP_PATH}")
        return

    try:
        with open(CROP_PATH, "rb") as f:
            files = {
                "crop": (
                    os.path.basename(CROP_PATH),
                    f,
                    "image/tiff",
                )
            }

            data = {
                "device": "cpu",
                "use_clahe": "true",
                "clahe_clip": "4.0",
                "clahe_grid": "6",
                "sam2_points_per_side": "32",
                "sam2_points_per_batch": "32",
                "sam2_pred_iou_thresh": "0.35",
                "sam2_stability_score_thresh": "0.65",
                "filter_min_area_px": "1000",
                "filter_max_area_frac": "0.20",
                "filter_min_iou": "0.35",
                "filter_min_stability": "0.65",
                "filter_border_margin": "10",
                "max_crop_side": "4096",
            }

            resp = requests.post(
                f"{URL}/process",
                files=files,
                data=data,
            )

            resp.raise_for_status()

    except Exception as e:
        print(f"[Geólogo {geologist_id}] Error enviando tarea: {e}")
        return

    data = resp.json()
    task_id = data["task_id"]

    print(f"[Geólogo {geologist_id}] Tarea aceptada. ID: {task_id}")

    last_status = None

    while True:
        try:
            status_resp = requests.get(f"{URL}/status/{task_id}")
            status_resp.raise_for_status()
            s_data = status_resp.json()

            status_value = s_data["status"]

            if status_value == "pending":
                pos = s_data.get("queue_position")
                current_status = f"pending_pos_{pos}"

                if current_status != last_status:
                    print(
                        f"[Geólogo {geologist_id}] Estado: En cola | Posición: {pos}"
                    )
                    last_status = current_status

            elif status_value == "processing":
                if last_status != "processing":
                    print(
                        f"[Geólogo {geologist_id}] Estado: Procesando inferencia SAM2..."
                    )
                    last_status = "processing"

            elif status_value == "completed":
                n_masks = s_data.get("n_masks")
                output_exists = s_data.get("output_exists")
                download_url = s_data.get("download_url")

                print(
                    f"[Geólogo {geologist_id}] COMPLETO | "
                    f"n_masks={n_masks} | output_exists={output_exists}"
                )

                if output_exists and download_url:
                    out_path = f"downloaded_result_{geologist_id}_{task_id}.gpkg"
                    download_resp = requests.get(f"{URL}{download_url}")
                    download_resp.raise_for_status()

                    with open(out_path, "wb") as out:
                        out.write(download_resp.content)

                    print(
                        f"[Geólogo {geologist_id}] Resultado descargado: {out_path}"
                    )

                break

            elif status_value == "failed":
                print(
                    f"[Geólogo {geologist_id}] FALLÓ: {s_data.get('error')}"
                )
                break

        except Exception as e:
            print(f"[Geólogo {geologist_id}] Error consultando estado: {e}")
            break

        time.sleep(0.5)


if __name__ == "__main__":
    print("Iniciando prueba con 5 geólogos concurrentes...\n")

    threads = []

    for i in range(1, 6):
        t = threading.Thread(
            target=geologist_client,
            args=(i,),
        )

        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    print("\nPrueba finalizada.")