File size: 43,703 Bytes
b30e7a3
48ece04
 
 
0e0cc26
2751463
 
 
 
 
 
 
0e0cc26
 
 
 
 
 
 
 
 
73d6c1c
 
 
 
 
 
0e0cc26
52536ca
b30e7a3
301e154
52536ca
 
 
b30e7a3
1c6c619
b30e7a3
52536ca
78d352c
b30e7a3
 
a52a96d
9803004
b30e7a3
 
3fde4e4
1c4206e
52536ca
 
dbfb5c9
52536ca
b8fe2b6
 
52536ca
 
 
 
 
 
d257dcc
0effcd5
d257dcc
 
 
6268ac2
624478a
b30e7a3
 
 
64bbe44
 
 
 
 
dbfb5c9
 
 
64bbe44
1a9b396
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d257dcc
 
1a9b396
 
 
 
d257dcc
 
 
 
517108e
 
 
 
d257dcc
 
1a9b396
 
d257dcc
 
1a9b396
 
 
d257dcc
 
 
 
517108e
 
 
 
d257dcc
 
1a9b396
d257dcc
1a9b396
 
d257dcc
1a9b396
 
d257dcc
1a9b396
 
 
 
 
 
 
 
 
52536ca
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b30e7a3
 
 
 
 
 
 
 
51193fd
 
 
 
 
 
 
 
 
 
 
 
 
 
9803004
22ef6b4
 
 
 
 
9803004
b30e7a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52536ca
 
 
 
 
 
 
b30e7a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52536ca
 
 
 
 
 
 
 
b30e7a3
b30c328
 
22ef6b4
 
b30e7a3
 
 
 
 
 
 
 
f89fa0b
21c29ae
0eeb0d9
06e44d3
b30e7a3
 
 
 
 
 
 
 
f89fa0b
6268ac2
0eeb0d9
537aca9
b30e7a3
 
 
 
537aca9
b30e7a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3fde4e4
b30e7a3
 
 
 
0c8527d
b30e7a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
537aca9
b30e7a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a2ca6f9
 
 
 
 
 
98a4ff6
624478a
a2ca6f9
 
 
 
 
537aca9
 
b30e7a3
 
 
0eeb0d9
 
 
 
391cb94
b30e7a3
 
 
537aca9
0eeb0d9
be0e60f
ff50694
b30e7a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52536ca
 
 
 
 
f89fa0b
21c29ae
5e832fe
be0e60f
0eeb0d9
06e44d3
05bd36a
52536ca
301e154
 
52536ca
 
 
 
 
 
 
 
 
 
 
 
 
 
b8fe2b6
 
52536ca
 
 
 
 
 
 
 
 
301e154
 
a2ca6f9
 
 
 
 
6268ac2
a2ca6f9
 
6268ac2
d73eff6
6268ac2
 
 
 
 
 
 
a2ca6f9
 
 
6268ac2
 
a2ca6f9
 
624478a
 
 
a2ca6f9
 
 
 
 
 
 
 
52536ca
a2ca6f9
 
 
 
 
52536ca
301e154
 
1c4206e
 
 
 
 
 
 
 
 
 
a2ca6f9
0eeb0d9
52536ca
 
301e154
dbfb5c9
52536ca
 
 
 
 
 
 
301e154
dbfb5c9
 
52536ca
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fbd1770
45eb65b
b8fe2b6
 
ff50694
a2ca6f9
 
ecbbe4e
05bd36a
301e154
52536ca
 
 
 
d73eff6
d257dcc
 
 
 
 
 
 
 
1a9b396
a2ca6f9
a52a96d
 
 
 
 
 
 
52536ca
 
a2ca6f9
52536ca
 
a2ca6f9
 
 
 
 
 
 
 
 
 
 
 
 
 
52536ca
 
 
 
 
 
 
 
 
 
 
 
1a9b396
52536ca
 
 
3a793fe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6a99834
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9803004
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52536ca
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9803004
 
52536ca
 
 
 
 
 
 
 
 
 
 
 
 
 
b8fe2b6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a52a96d
 
dbfb5c9
 
 
a52a96d
16cab91
8d938e9
dbfb5c9
301e154
 
 
 
 
 
 
 
 
dbfb5c9
 
 
a52a96d
 
 
 
dbfb5c9
a52a96d
8d938e9
 
301e154
 
 
f09ca9c
8d938e9
 
 
301e154
 
8d938e9
dbfb5c9
 
 
 
 
 
 
8d938e9
dbfb5c9
 
 
 
8d938e9
dbfb5c9
 
8d938e9
dbfb5c9
8d938e9
dbfb5c9
 
8d938e9
16cab91
dbfb5c9
a52a96d
301e154
 
 
 
a52a96d
 
dbfb5c9
 
 
8d938e9
a52a96d
 
 
 
dbfb5c9
a52a96d
 
 
 
e16d22f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dbfb5c9
 
1eea4fe
e16d22f
 
 
 
 
 
 
 
 
0effcd5
 
 
a2ca6f9
 
0effcd5
 
 
a2ca6f9
0effcd5
 
 
a2ca6f9
 
0effcd5
 
 
 
a2ca6f9
0effcd5
 
a2ca6f9
0effcd5
 
 
 
a2ca6f9
0effcd5
 
a2ca6f9
 
 
 
 
 
 
 
 
dbfb5c9
0effcd5
dbfb5c9
 
 
 
0effcd5
 
 
 
 
e16d22f
74ae00c
 
 
 
21c29ae
97b3a45
1c6c619
74ae00c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2f284f5
 
74ae00c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1c6c619
74ae00c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1c6c619
74ae00c
 
 
 
 
 
 
 
 
 
 
fc9835a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
078b447
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f89fa0b
21c29ae
078b447
 
 
97b3a45
afbbf27
078b447
 
 
 
 
 
 
 
 
 
 
 
afbbf27
078b447
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
afbbf27
078b447
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f89fa0b
21c29ae
078b447
 
 
97b3a45
afbbf27
078b447
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
afbbf27
078b447
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b30e7a3
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
import os
from dotenv import load_dotenv
load_dotenv()

import logging

# Fix: Set Hugging Face cache to writable location
# In containerized environments, /.cache may not be writable
if "HF_HOME" not in os.environ:
    os.environ["HF_HOME"] = "/tmp/huggingface"
    print(f"Set HF_HOME to {os.environ['HF_HOME']}")

# Debug/Fix: Unset CUDA_VISIBLE_DEVICES to ensure all GPUs are visible
# Some environments (like HF Spaces) might set this to "0" by default.
if "CUDA_VISIBLE_DEVICES" in os.environ:
    # Use print because logging config might not be set yet
    print(f"Found CUDA_VISIBLE_DEVICES={os.environ['CUDA_VISIBLE_DEVICES']}. Unsetting it to enable all GPUs.")
    del os.environ["CUDA_VISIBLE_DEVICES"]
else:
    print("CUDA_VISIBLE_DEVICES not set. All GPUs should be visible.")

import torch
try:
    print(f"Startup Diagnostics: Torch version {torch.__version__}, CUDA available: {torch.cuda.is_available()}, Device count: {torch.cuda.device_count()}")
except Exception as e:
    print(f"Startup Diagnostics Error: {e}")

import asyncio
import shutil
import tempfile
import time
import uuid
from contextlib import asynccontextmanager
from datetime import timedelta
from pathlib import Path
from typing import Optional

import cv2
import numpy as np
from fastapi import BackgroundTasks, FastAPI, File, Form, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
import uvicorn

from inference import process_first_frame, run_inference, run_grounded_sam2_tracking
from models.depth_estimators.model_loader import list_depth_estimators
from jobs.background import process_video_async
from jobs.models import JobInfo, JobStatus
from jobs.streaming import get_stream, get_stream_event
from jobs.storage import (
    get_depth_output_path,
    get_first_frame_depth_path,
    get_first_frame_path,
    get_input_video_path,
    get_job_directory,
    get_job_storage,
    get_output_video_path,
)
from utils.gpt_reasoning import estimate_threat_gpt
from utils.threat_chat import chat_about_threats
from utils.relevance import evaluate_relevance
from utils.enrichment import run_enrichment
from utils.schemas import AssessmentStatus
from models.segmenters.model_loader import get_segmenter_detector
from utils.mission_parser import parse_mission_text, build_broad_queries, MissionParseError

logging.basicConfig(level=logging.INFO)

# Suppress noisy external libraries
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("huggingface_hub").setLevel(logging.WARNING)
logging.getLogger("transformers").setLevel(logging.WARNING)

# GPT concurrency limiter — prevents thread exhaustion under load
_GPT_SEMAPHORE = asyncio.Semaphore(int(os.environ.get("GPT_CONCURRENCY_LIMIT", "4")))


async def _enrich_first_frame_gpt(
    job_id: str,
    frame: np.ndarray,
    detections: list,
    enable_gpt: bool,
    mission_spec,
) -> None:
    """Fire-and-forget GPT enrichment for first-frame track cards.

    Runs concurrently with the video pipeline so the user gets instant
    first-frame preview (UNASSESSED), then track cards update once GPT
    finishes (typically 2-5s later).
    """
    if not enable_gpt or not detections:
        return
    try:
        # Non-LLM_EXTRACTED relevance filter runs BEFORE run_enrichment (FAST_PATH case)
        if mission_spec and mission_spec.parse_mode != "LLM_EXTRACTED":
            for d in detections:
                decision = evaluate_relevance(d, mission_spec.relevance_criteria)
                d["mission_relevant"] = decision.relevant
                d["relevance_reason"] = decision.reason
            filtered = [d for d in detections if d.get("mission_relevant", True)]
            if not filtered:
                for det in detections:
                    det["assessment_status"] = AssessmentStatus.ASSESSED
                get_job_storage().update(
                    job_id,
                    first_frame_detections=detections,
                )
                logging.info("All detections non-relevant for job %s; marked ASSESSED", job_id)
                return

        gpt_results = await asyncio.to_thread(
            run_enrichment, 0, frame, detections, mission_spec,
            job_id=job_id,
        )
        logging.info("Background GPT enrichment complete for job %s", job_id)

        if not gpt_results:
            # All detections filtered as not relevant
            for det in detections:
                det["assessment_status"] = AssessmentStatus.ASSESSED
            get_job_storage().update(
                job_id,
                first_frame_detections=detections,
            )
            logging.info("All detections non-relevant for job %s; marked ASSESSED", job_id)
            return

        # Tag any remaining detections without an assessment status
        for det in detections:
            if "assessment_status" not in det:
                det["assessment_status"] = AssessmentStatus.UNASSESSED

        # Update stored job so frontend polls pick up GPT data
        get_job_storage().update(
            job_id,
            first_frame_detections=detections,
            first_frame_gpt_results=gpt_results,
        )
        logging.info("Updated first_frame_detections with GPT results for job %s", job_id)

    except Exception:
        logging.exception("Background GPT enrichment failed for job %s", job_id)


async def _periodic_cleanup() -> None:
    while True:
        await asyncio.sleep(600)
        get_job_storage().cleanup_expired(timedelta(hours=1))


@asynccontextmanager
async def lifespan(_: FastAPI):
    cleanup_task = asyncio.create_task(_periodic_cleanup())
    try:
        yield
    finally:
        cleanup_task.cancel()


app = FastAPI(title="Video Object Detection", lifespan=lifespan)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


from fastapi import Request

@app.middleware("http")
async def add_no_cache_header(request: Request, call_next):
    """Ensure frontend assets are not cached by the browser (important for HF Spaces updates)."""
    response = await call_next(request)
    # Apply to all static files and the root page
    if request.url.path.startswith("/laser") or request.url.path == "/":
        response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
        response.headers["Pragma"] = "no-cache"
        response.headers["Expires"] = "0"
    return response

# Optional: serve the LaserPerception frontend from this backend.
# The frontend files are now located in the 'frontend' directory.
_FRONTEND_DIR = Path(__file__).with_name("frontend")
if _FRONTEND_DIR.exists():
    # Mount the entire frontend directory at /laser (legacy path) or /frontend
    app.mount("/laser", StaticFiles(directory=_FRONTEND_DIR, html=True), name="laser")

# Valid detection modes
VALID_MODES = {"object_detection", "segmentation", "drone_detection"}


def _save_upload_to_tmp(upload: UploadFile) -> str:
    """Save uploaded file to temporary location."""
    suffix = Path(upload.filename or "upload.mp4").suffix or ".mp4"
    fd, path = tempfile.mkstemp(prefix="input_", suffix=suffix, dir="/tmp")
    os.close(fd)
    with open(path, "wb") as buffer:
        data = upload.file.read()
        buffer.write(data)
    return path


def _save_upload_to_path(upload: UploadFile, path: Path) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    with open(path, "wb") as buffer:
        data = upload.file.read()
        buffer.write(data)


def _safe_delete(path: str) -> None:
    """Safely delete a file, ignoring errors."""
    try:
        os.remove(path)
    except FileNotFoundError:
        return
    except Exception:
        logging.exception("Failed to remove temporary file: %s", path)


def _schedule_cleanup(background_tasks: BackgroundTasks, path: str) -> None:
    """Schedule file cleanup after response is sent."""
    def _cleanup(target: str = path) -> None:
        _safe_delete(target)

    background_tasks.add_task(_cleanup)


def _default_queries_for_mode(mode: str) -> list[str]:
    if mode == "segmentation":
        return ["object"]
    if mode == "drone_detection":
        return ["drone"]
    return ["person", "car", "truck", "motorcycle", "bicycle", "bus", "train", "airplane"]


@app.get("/", response_class=HTMLResponse)
async def demo_page():
    """Redirect to LaserPerception app."""
    # The main entry point is now index.html in the mounted directory
    return RedirectResponse(url="/laser/index.html")


@app.post("/detect")
async def detect_endpoint(
    background_tasks: BackgroundTasks,
    video: UploadFile = File(...),
    mode: str = Form(...),
    queries: str = Form(""),
    detector: str = Form("yolo11"),
    segmenter: str = Form("GSAM2-L"),
    enable_depth: bool = Form(False),
    enable_gpt: bool = Form(True),
):
    """
    Main detection endpoint.

    Args:
        video: Video file to process
        mode: Detection mode (object_detection, segmentation, drone_detection)
        queries: Comma-separated object classes for object_detection mode
        detector: Model to use (yolo11, detr_resnet50, grounding_dino)
        segmenter: Segmentation model to use (GSAM2-S/B/L, YSAM2-S/B/L)
        enable_depth: Whether to run legacy depth estimation (default: False)
        drone_detection uses the dedicated drone_yolo model.

    Returns:
        - For object_detection: Processed video with bounding boxes
        - For segmentation: Processed video with masks rendered
        - For drone_detection: Processed video with bounding boxes
    """
    # Validate mode
    if mode not in VALID_MODES:
        raise HTTPException(
            status_code=400,
            detail=f"Invalid mode '{mode}'. Must be one of: {', '.join(VALID_MODES)}"
        )

    if mode == "segmentation":
        if video is None:
            raise HTTPException(status_code=400, detail="Video file is required.")

        try:
            input_path = _save_upload_to_tmp(video)
        except Exception:
            logging.exception("Failed to save uploaded file.")
            raise HTTPException(status_code=500, detail="Failed to save uploaded video.")
        finally:
            await video.close()

        fd, output_path = tempfile.mkstemp(prefix="output_", suffix=".mp4", dir="/tmp")
        os.close(fd)

        # Parse queries
        query_list = [q.strip() for q in queries.split(",") if q.strip()]
        if not query_list:
            query_list = ["object"]

        try:
            output_path = run_grounded_sam2_tracking(
                input_path,
                output_path,
                query_list,
                segmenter_name=segmenter,
                num_maskmem=7,
            )
        except ValueError as exc:
            logging.exception("Segmentation processing failed.")
            _safe_delete(input_path)
            _safe_delete(output_path)
            raise HTTPException(status_code=500, detail=str(exc))
        except Exception as exc:
            logging.exception("Segmentation inference failed.")
            _safe_delete(input_path)
            _safe_delete(output_path)
            return JSONResponse(status_code=500, content={"error": str(exc)})

        _schedule_cleanup(background_tasks, input_path)
        _schedule_cleanup(background_tasks, output_path)

        return FileResponse(
            path=output_path,
            media_type="video/mp4",
            filename="segmented.mp4",
        )

    # Handle object detection or drone detection mode
    if video is None:
        raise HTTPException(status_code=400, detail="Video file is required.")

    # Save uploaded video
    try:
        input_path = _save_upload_to_tmp(video)
    except Exception:
        logging.exception("Failed to save uploaded file.")
        raise HTTPException(status_code=500, detail="Failed to save uploaded video.")
    finally:
        await video.close()

    # Create output path
    fd, output_path = tempfile.mkstemp(prefix="output_", suffix=".mp4", dir="/tmp")
    os.close(fd)

    # Parse queries with mission awareness
    detector_name = "drone_yolo" if mode == "drone_detection" else detector
    mission_spec = None

    if queries.strip():
        try:
            mission_spec = parse_mission_text(queries.strip(), detector_name, video_path=input_path)
            query_list = build_broad_queries(detector_name, mission_spec)
        except MissionParseError as e:
            raise HTTPException(status_code=422, detail=str(e))
    else:
        query_list = _default_queries_for_mode(mode)

    if mode == "drone_detection" and not query_list:
        query_list = ["drone"]

    # Run inference
    try:
        
        # Determine depth estimator
        active_depth = "depth" if enable_depth else None

        output_path, _ = run_inference(
            input_path,
            output_path,
            query_list,
            detector_name=detector_name,
            depth_estimator_name=active_depth,
            depth_scale=25.0,
            enable_gpt=enable_gpt,
        )
    except ValueError as exc:
        logging.exception("Video processing failed.")
        _safe_delete(input_path)
        _safe_delete(output_path)
        raise HTTPException(status_code=500, detail=str(exc))
    except Exception as exc:
        logging.exception("Inference failed.")
        _safe_delete(input_path)
        _safe_delete(output_path)
        return JSONResponse(status_code=500, content={"error": str(exc)})

    # Schedule cleanup
    _schedule_cleanup(background_tasks, input_path)
    _schedule_cleanup(background_tasks, output_path)

    # Return processed video
    response = FileResponse(
        path=output_path,
        media_type="video/mp4",
        filename="processed.mp4",
    )
    return response


@app.post("/detect/async")
async def detect_async_endpoint(
    video: UploadFile = File(...),
    mode: str = Form(...),
    queries: str = Form(""),
    detector: str = Form("yolo11"),
    segmenter: str = Form("GSAM2-L"),
    depth_estimator: str = Form("depth"),
    depth_scale: float = Form(25.0),
    enable_depth: bool = Form(False),
    enable_gpt: bool = Form(True),
    step: int = Form(7),
):
    _ttfs_t0 = time.perf_counter()

    if mode not in VALID_MODES:
        raise HTTPException(
            status_code=400,
            detail=f"Invalid mode '{mode}'. Must be one of: {', '.join(VALID_MODES)}",
        )

    if video is None:
        raise HTTPException(status_code=400, detail="Video file is required.")

    job_id = uuid.uuid4().hex
    job_dir = get_job_directory(job_id)
    input_path = get_input_video_path(job_id)
    output_path = get_output_video_path(job_id)
    first_frame_path = get_first_frame_path(job_id)
    depth_output_path = get_depth_output_path(job_id)
    first_frame_depth_path = get_first_frame_depth_path(job_id)

    try:
        _save_upload_to_path(video, input_path)
    except Exception:
        logging.exception("Failed to save uploaded file.")
        raise HTTPException(status_code=500, detail="Failed to save uploaded video.")
    finally:
        await video.close()

    logging.info("[TTFS:%s] +%.1fs upload_saved", job_id, time.perf_counter() - _ttfs_t0)

    # --- Mission-Driven Query Parsing ---
    mission_spec = None
    mission_mode = "LEGACY"

    detector_name = detector
    mission_detector = detector  # detector key used for mission query parsing
    if mode == "drone_detection":
        detector_name = "drone_yolo"
        mission_detector = "drone_yolo"
    elif mode == "segmentation":
        # Segmenter registry owns detector selection (GSAM2→GDINO, YSAM2→YOLO).
        # detector_name=None so the job doesn't forward it (avoids duplicate kwarg).
        try:
            mission_detector = get_segmenter_detector(segmenter)
        except ValueError as exc:
            raise HTTPException(status_code=400, detail=str(exc))
        detector_name = None

    if queries.strip():
        try:
            mission_spec = parse_mission_text(queries.strip(), mission_detector, video_path=str(input_path))
            query_list = build_broad_queries(mission_detector, mission_spec)
            mission_mode = "MISSION"
            logging.info(
                "Mission parsed: mode=%s classes=%s broad_queries=%s domain=%s(%s)",
                mission_mode, mission_spec.object_classes, query_list,
                mission_spec.domain, mission_spec.domain_source,
            )
        except MissionParseError as e:
            raise HTTPException(
                status_code=422,
                detail=str(e),
            )
    else:
        # LEGACY mode: no mission context, use defaults, disable GPT
        query_list = _default_queries_for_mode(mode)
        enable_gpt = False
        mission_mode = "LEGACY"
        logging.info(
            "LEGACY mode: no mission text, defaults=%s, GPT disabled", query_list
        )

    logging.info("[TTFS:%s] +%.1fs mission_parsed", job_id, time.perf_counter() - _ttfs_t0)

    available_depth_estimators = set(list_depth_estimators())
    if depth_estimator not in available_depth_estimators:
        raise HTTPException(
            status_code=400,
            detail=(
                f"Invalid depth estimator '{depth_estimator}'. "
                f"Must be one of: {', '.join(sorted(available_depth_estimators))}"
            ),
        )

    # Determine active depth estimator (Legacy)
    active_depth = depth_estimator if enable_depth else None

    try:
        logging.info("[TTFS:%s] +%.1fs process_first_frame start", job_id, time.perf_counter() - _ttfs_t0)
        processed_frame, detections = process_first_frame(
            str(input_path),
            query_list,
            mode=mode,
            detector_name=detector_name,
            segmenter_name=segmenter,
        )
        cv2.imwrite(str(first_frame_path), processed_frame)
        logging.info("[TTFS:%s] +%.1fs process_first_frame done", job_id, time.perf_counter() - _ttfs_t0)
        # GPT and depth are now handled in the async pipeline (enrichment thread)
        first_frame_gpt_results = None
    except Exception:
        logging.exception("First-frame processing failed.")
        shutil.rmtree(job_dir, ignore_errors=True)
        raise HTTPException(status_code=500, detail="Failed to process first frame.")

    job = JobInfo(
        job_id=job_id,
        status=JobStatus.PROCESSING,
        mode=mode,
        queries=query_list,
        detector_name=detector_name,
        segmenter_name=segmenter,
        input_video_path=str(input_path),
        output_video_path=str(output_path),
        first_frame_path=str(first_frame_path),
        first_frame_detections=detections,
        depth_estimator_name=active_depth,
        depth_scale=float(depth_scale),
        depth_output_path=str(depth_output_path),
        first_frame_depth_path=str(first_frame_depth_path),
        enable_gpt=enable_gpt,
        mission_spec=mission_spec,
        mission_mode=mission_mode,
        first_frame_gpt_results=first_frame_gpt_results,
        step=step,
        ttfs_t0=_ttfs_t0,
    )
    get_job_storage().create(job)
    asyncio.create_task(process_video_async(job_id))

    # Fire-and-forget: enrich first-frame detections with GPT in background.
    # Runs for ALL modes including segmentation — first-frame detections from
    # process_first_frame() already have stable track IDs (T01, T02, ...) and
    # valid bboxes, so there's no reason to defer.  The GSAM2 writer's
    # enrichment thread will see the cached results via first_frame_gpt_results
    # in JobStorage and skip the duplicate call on frame 0.
    asyncio.create_task(_enrich_first_frame_gpt(
        job_id, processed_frame, detections, enable_gpt, mission_spec,
    ))

    response_data = {
        "job_id": job_id,
        "first_frame_url": f"/detect/first-frame/{job_id}",
        "first_frame_depth_url": f"/detect/first-frame-depth/{job_id}",
        "status_url": f"/detect/status/{job_id}",
        "video_url": f"/detect/video/{job_id}",
        "depth_video_url": f"/detect/depth-video/{job_id}",
        "stream_url": f"/detect/stream/{job_id}",
        "status": job.status.value,
        "first_frame_detections": detections,
        "mission_mode": mission_mode,
    }

    if mission_spec:
        response_data["mission_spec"] = {
            "object_classes": mission_spec.object_classes,
            "mission_intent": mission_spec.mission_intent,
            "domain": mission_spec.domain,
            "domain_source": mission_spec.domain_source,
            "parse_confidence": mission_spec.parse_confidence,
            "parse_warnings": mission_spec.parse_warnings,
            "context_phrases": mission_spec.context_phrases,
            "stripped_modifiers": mission_spec.stripped_modifiers,
        }

    return response_data


@app.get("/detect/status/{job_id}")
async def detect_status(job_id: str):
    job = get_job_storage().get(job_id)
    if not job:
        raise HTTPException(status_code=404, detail="Job not found or expired.")
    return {
        "job_id": job.job_id,
        "status": job.status.value,
        "created_at": job.created_at.isoformat(),
        "completed_at": job.completed_at.isoformat() if job.completed_at else None,
        "error": job.error,
        "first_frame_detections": job.first_frame_detections,
    }


@app.get("/detect/tracks/{job_id}/{frame_idx}")
async def get_frame_tracks(job_id: str, frame_idx: int):
    """Retrieve detections (with tracking info) for a specific frame."""
    # This requires us to store detections PER FRAME in JobStorage or similar.
    # Currently, inference.py returns 'sorted_detections' at the end.
    # But during streaming, where is it?
    # We can peek into the 'stream_queue' logic or we need a shared store.
    # Ideally, inference should write to a map/db that we can read.
    
    # Quick fix: If job is done, we might have it. If running, it's harder absent a DB.
    # BUT, 'stream_queue' sends frames.
    # Let's use a global cache in memory for active jobs?
    # See inference.py: 'all_detections_map' is local to that function.
    
    # BETTER APPROACH for this demo:
    # Use a simple shared dictionary in jobs/storage.py or app.py used by inference.
    # We will pass a callback or shared dict to run_inference.
    
    # For now, let's just return 404 if not implemented, but I need to implement it.
    # I'll add a cache in app.py for active job tracks?
    from jobs.storage import get_track_data
    data = get_track_data(job_id, frame_idx)
    return data or []


@app.post("/detect/analyze-frame")
async def analyze_frame(
    image: UploadFile = File(...),
    detections: str = Form(...),
    job_id: str = Form(None),
):
    """Run GPT threat assessment on a single video frame."""
    import json as json_module
    from utils.gpt_reasoning import encode_frame_to_b64

    dets = json_module.loads(detections)

    # Look up mission_spec from stored job (if available)
    mission_spec = None
    if job_id:
        job = get_job_storage().get(job_id)
        if job:
            mission_spec = job.mission_spec

    # Decode uploaded image
    image_bytes = await image.read()
    nparr = np.frombuffer(image_bytes, np.uint8)
    frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
    if frame is None:
        raise HTTPException(status_code=400, detail="Invalid image")

    # Run GPT in thread pool (blocking OpenAI API call)
    frame_b64 = encode_frame_to_b64(frame)
    async with _GPT_SEMAPHORE:
        gpt_results = await asyncio.to_thread(
            estimate_threat_gpt,
            detections=dets,
            mission_spec=mission_spec,
            image_b64=frame_b64,
        )

    # Merge GPT results into detection records
    for d in dets:
        oid = d.get("track_id") or d.get("id")
        if oid and oid in gpt_results:
            payload = gpt_results[oid]
            d["gpt_raw"] = payload
            d["assessment_status"] = payload.get("assessment_status", "ASSESSED")
            d["threat_level_score"] = payload.get("threat_level_score", 0)
            d["threat_classification"] = payload.get("threat_classification", "Unknown")
            d["weapon_readiness"] = payload.get("weapon_readiness", "Unknown")
            d["gpt_description"] = payload.get("gpt_description")
            d["gpt_distance_m"] = payload.get("gpt_distance_m")
            d["gpt_direction"] = payload.get("gpt_direction")

    return dets


@app.delete("/detect/job/{job_id}")
async def cancel_job(job_id: str):
    """Cancel a running job."""
    job = get_job_storage().get(job_id)
    if not job:
        raise HTTPException(status_code=404, detail="Job not found or expired.")
    if job.status != JobStatus.PROCESSING:
        return {
            "message": f"Job already {job.status.value}",
            "status": job.status.value,
        }

    get_job_storage().update(job_id, status=JobStatus.CANCELLED)
    return {
        "message": "Job cancellation requested",
        "status": "cancelled",
    }


@app.get("/detect/first-frame/{job_id}")
async def detect_first_frame(job_id: str):
    job = get_job_storage().get(job_id)
    if not job or not Path(job.first_frame_path).exists():
        raise HTTPException(status_code=404, detail="First frame not found.")
    return FileResponse(
        path=job.first_frame_path,
        media_type="image/jpeg",
        filename="first_frame.jpg",
    )


@app.get("/detect/video/{job_id}")
async def detect_video(job_id: str):
    job = get_job_storage().get(job_id)
    if not job:
        raise HTTPException(status_code=404, detail="Job not found or expired.")
    if job.status == JobStatus.FAILED:
        raise HTTPException(status_code=500, detail=f"Job failed: {job.error}")
    if job.status == JobStatus.CANCELLED:
        raise HTTPException(status_code=410, detail="Job was cancelled")
    if job.status == JobStatus.PROCESSING:
        return JSONResponse(
            status_code=202,
            content={"detail": "Video still processing", "status": "processing"},
        )
    if not job.output_video_path or not Path(job.output_video_path).exists():
        raise HTTPException(status_code=404, detail="Video file not found.")
    return FileResponse(
        path=job.output_video_path,
        media_type="video/mp4",
        filename="processed.mp4",
    )


@app.get("/detect/depth-video/{job_id}")
async def detect_depth_video(job_id: str):
    """Return depth estimation video."""
    job = get_job_storage().get(job_id)
    if not job:
        raise HTTPException(status_code=404, detail="Job not found or expired.")
    if not job.depth_output_path:
        # Check if depth failed (partial success)
        if job.partial_success and job.depth_error:
            raise HTTPException(status_code=404, detail=f"Depth unavailable: {job.depth_error}")
        raise HTTPException(status_code=404, detail="No depth video for this job.")
    if job.status == JobStatus.FAILED:
        raise HTTPException(status_code=500, detail=f"Job failed: {job.error}")
    if job.status == JobStatus.CANCELLED:
        raise HTTPException(status_code=410, detail="Job was cancelled")
    if job.status == JobStatus.PROCESSING:
        return JSONResponse(
            status_code=202,
            content={"detail": "Video still processing", "status": "processing"},
        )
    if not Path(job.depth_output_path).exists():
        raise HTTPException(status_code=404, detail="Depth video file not found.")
    return FileResponse(
        path=job.depth_output_path,
        media_type="video/mp4",
        filename="depth.mp4",
    )


@app.get("/detect/first-frame-depth/{job_id}")
async def detect_first_frame_depth(job_id: str):
    """Return first frame depth visualization."""
    job = get_job_storage().get(job_id)
    if not job:
        raise HTTPException(status_code=404, detail="Job not found or expired.")
    if not job.first_frame_depth_path:
        # Return placeholder or error if depth not available
        if job.partial_success and job.depth_error:
            raise HTTPException(status_code=404, detail=f"Depth unavailable: {job.depth_error}")
        raise HTTPException(status_code=404, detail="First frame depth not found.")
    if not Path(job.first_frame_depth_path).exists():
        raise HTTPException(status_code=404, detail="First frame depth file not found.")
    return FileResponse(
        path=job.first_frame_depth_path,
        media_type="image/jpeg",
        filename="first_frame_depth.jpg",
    )


@app.get("/detect/stream/{job_id}")
async def stream_video(job_id: str):
    """MJPEG stream of the processing video (event-driven)."""
    import queue as queue_mod

    async def stream_generator():
        loop = asyncio.get_running_loop()
        buffered = False

        # TTFS instrumentation
        _first_yielded = False
        _buffer_wait_logged = False
        _job = get_job_storage().get(job_id)
        _stream_t0 = _job.ttfs_t0 if _job else None

        if _stream_t0:
            logging.info("[TTFS:%s] +%.1fs stream_subscribed", job_id, time.perf_counter() - _stream_t0)

        # Get or create the asyncio.Event for this stream (must be in async context)
        event = get_stream_event(job_id)

        while True:
            q = get_stream(job_id)
            if not q:
                break

            try:
                # Initial Buffer: Wait until we have enough frames or job is done
                if not buffered:
                    if not _buffer_wait_logged and _stream_t0:
                        logging.info("[TTFS:%s] +%.1fs stream_buffer_wait (qsize=%d)", job_id, time.perf_counter() - _stream_t0, q.qsize())
                        _buffer_wait_logged = True
                    if q.qsize() < 5:
                        await asyncio.sleep(0.1)
                        continue
                    buffered = True
                    if _stream_t0:
                        logging.info("[TTFS:%s] +%.1fs stream_buffer_ready", job_id, time.perf_counter() - _stream_t0)

                # Event-driven wait — replaces busy-wait polling
                if event is not None:
                    try:
                        await asyncio.wait_for(event.wait(), timeout=1.0)
                        event.clear()
                    except asyncio.TimeoutError:
                        if not get_stream(job_id):
                            return
                        continue
                else:
                    # Fallback if no event (shouldn't happen)
                    await asyncio.sleep(0.033)

                # Drain available frame (already pre-resized by publish_frame)
                try:
                    frame = q.get_nowait()
                except queue_mod.Empty:
                    continue

                # Encode in thread (frame already resized by publish_frame)
                encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 60]
                success, buffer = await loop.run_in_executor(None, cv2.imencode, '.jpg', frame, encode_param)

                if success:
                    if not _first_yielded:
                        _first_yielded = True
                        if _stream_t0:
                            logging.info("[TTFS:%s] +%.1fs first_yield_to_client", job_id, time.perf_counter() - _stream_t0)
                    yield (b'--frame\r\n'
                           b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')

                # Simple pacer (~30fps)
                await asyncio.sleep(0.033)

            except Exception:
                await asyncio.sleep(0.1)

    return StreamingResponse(
        stream_generator(),
        media_type="multipart/x-mixed-replace; boundary=frame"
    )


@app.post("/reason/track")
async def reason_track(
    frame: UploadFile = File(...),
    tracks: str = Form(...)  # JSON string of tracks: [{"id": "T01", "bbox": [x,y,w,h], "label": "car"}, ...]
):
    """
    Reason about specific tracks in a frame using GPT.
    Returns distance and description for each object ID.
    """
    import json
    try:
        input_path = _save_upload_to_tmp(frame)
    except Exception:
        raise HTTPException(status_code=500, detail="Failed to save uploaded frame")

    try:
        track_list = json.loads(tracks)
    except json.JSONDecodeError:
        _safe_delete(input_path)
        raise HTTPException(status_code=400, detail="Invalid tracks JSON")

    # Run GPT estimation
    # This is blocking, but that's expected for this endpoint structure. 
    # For high concurrency, might want to offload to threadpool or async wrapper.
    try:
        async with _GPT_SEMAPHORE:
            results = await asyncio.to_thread(estimate_threat_gpt, input_path, track_list)
        logging.info(f"GPT Output for Video Track Update:\n{results}")
    except Exception as e:
        logging.exception("GPT reasoning failed")
        _safe_delete(input_path)
        raise HTTPException(status_code=500, detail=str(e))
    
    _safe_delete(input_path)
    return results


@app.post("/chat/threat")
async def chat_threat_endpoint(
    question: str = Form(...),
    detections: str = Form(...),  # JSON string of current detections
    mission_context: str = Form(""),  # Optional JSON string of mission spec
):
    """
    Chat about detected threats using GPT.

    Args:
        question: User's question about the current threat situation.
        detections: JSON string of detection list with threat analysis data.
        mission_context: Optional JSON string of mission specification.

    Returns:
        GPT response about the threats.
    """
    import json as json_module

    if not question.strip():
        raise HTTPException(status_code=400, detail="Question cannot be empty.")

    try:
        detection_list = json_module.loads(detections)
    except json_module.JSONDecodeError:
        raise HTTPException(status_code=400, detail="Invalid detections JSON.")

    if not isinstance(detection_list, list):
        raise HTTPException(status_code=400, detail="Detections must be a list.")

    # Parse optional mission context
    mission_spec_dict = None
    if mission_context.strip():
        try:
            mission_spec_dict = json_module.loads(mission_context)
        except json_module.JSONDecodeError:
            pass  # Non-critical, proceed without mission context

    # Run chat in thread to avoid blocking (with concurrency limit)
    try:
        async with _GPT_SEMAPHORE:
            response = await asyncio.to_thread(
                chat_about_threats, question, detection_list, mission_spec_dict
            )
        return {"response": response}
    except Exception as e:
        logging.exception("Threat chat failed")
        raise HTTPException(status_code=500, detail=str(e))


@app.post("/benchmark")
async def benchmark_endpoint(
    video: UploadFile = File(...),
    queries: str = Form("person,car,truck"),
    segmenter: str = Form("GSAM2-L"),
    step: int = Form(60),
    num_maskmem: Optional[int] = Form(None),
):
    """Run instrumented GSAM2 pipeline and return latency breakdown JSON.

    This is a long-running synchronous request (may take minutes).
    Callers should set an appropriate HTTP timeout.
    """
    import threading

    # Save uploaded video to temp path
    input_path = tempfile.mktemp(suffix=".mp4", prefix="bench_in_")
    output_path = tempfile.mktemp(suffix=".mp4", prefix="bench_out_")
    try:
        with open(input_path, "wb") as f:
            shutil.copyfileobj(video.file, f)

        query_list = [q.strip() for q in queries.split(",") if q.strip()]

        metrics = {
            "end_to_end_ms": 0.0,
            "frame_extraction_ms": 0.0,
            "model_load_ms": 0.0,
            "init_state_ms": 0.0,
            "tracking_total_ms": 0.0,
            "gdino_total_ms": 0.0,
            "sam_image_total_ms": 0.0,
            "sam_video_total_ms": 0.0,
            "id_reconciliation_ms": 0.0,
            "render_total_ms": 0.0,
            "writer_total_ms": 0.0,
            "gpu_peak_mem_mb": 0.0,
        }
        lock = threading.Lock()

        await asyncio.to_thread(
            run_grounded_sam2_tracking,
            input_path,
            output_path,
            query_list,
            segmenter_name=segmenter,
            step=step,
            enable_gpt=False,
            _perf_metrics=metrics,
            _perf_lock=lock,
            num_maskmem=num_maskmem,
        )

        # Read frame count and fps from output video
        total_frames = 0
        fps = 0.0
        cap = cv2.VideoCapture(output_path)
        if cap.isOpened():
            total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            fps = cap.get(cv2.CAP_PROP_FPS) or 0.0
            cap.release()

        num_gpus = torch.cuda.device_count()

        return JSONResponse({
            "total_frames": total_frames,
            "fps": fps,
            "num_gpus": num_gpus,
            "num_maskmem": num_maskmem if num_maskmem is not None else 7,
            "metrics": metrics,
        })

    finally:
        for p in (input_path, output_path):
            try:
                os.remove(p)
            except OSError:
                pass


@app.get("/gpu-monitor")
async def gpu_monitor_endpoint(duration: int = 180, interval: int = 1):
    """Stream nvidia-smi dmon output for the given duration.

    Usage: curl 'http://.../gpu-monitor?duration=180&interval=1'
    Run this in one terminal while /benchmark runs in another.
    """
    import subprocess

    async def _stream():
        proc = subprocess.Popen(
            ["nvidia-smi", "dmon", "-s", "u", "-d", str(interval)],
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            text=True,
        )
        try:
            elapsed = 0
            for line in proc.stdout:
                yield line
                if interval > 0:
                    elapsed += interval
                    if elapsed > duration:
                        break
        finally:
            proc.terminate()
            proc.wait()

    return StreamingResponse(_stream(), media_type="text/plain")


# ---------------------------------------------------------------------------
# Benchmark Profiler & Roofline Analysis Endpoints
# ---------------------------------------------------------------------------

@app.get("/benchmark/hardware")
async def benchmark_hardware():
    """Return hardware specs JSON (no video needed, cached)."""
    import dataclasses
    from utils.hardware_info import get_hardware_info

    hw = await asyncio.to_thread(get_hardware_info)
    return JSONResponse(dataclasses.asdict(hw))


@app.post("/benchmark/profile")
async def benchmark_profile(
    video: UploadFile = File(...),
    mode: str = Form("detection"),
    detector: str = Form("yolo11"),
    segmenter: str = Form("GSAM2-L"),
    queries: str = Form("person,car,truck"),
    max_frames: int = Form(100),
    warmup_frames: int = Form(5),
    step: int = Form(60),
    num_maskmem: Optional[int] = Form(None),
):
    """Run profiled inference and return per-frame timing breakdown.

    Args:
        video: Video file to profile.
        mode: "detection" or "segmentation".
        detector: Detector key (for detection mode).
        segmenter: Segmenter key (for segmentation mode).
        queries: Comma-separated object classes.
        max_frames: Maximum frames to profile.
        warmup_frames: Warmup frames (detection only).
        step: Keyframe interval (segmentation only).
        num_maskmem: SAM2 memory frames (None = model default 7).
    """
    import dataclasses
    from utils.profiler import run_profiled_detection, run_profiled_segmentation

    if mode not in ("detection", "segmentation"):
        raise HTTPException(status_code=400, detail="mode must be 'detection' or 'segmentation'")

    input_path = _save_upload_to_tmp(video)
    await video.close()

    query_list = [q.strip() for q in queries.split(",") if q.strip()]

    try:
        if mode == "detection":
            result = await asyncio.to_thread(
                run_profiled_detection,
                input_path, detector, query_list,
                max_frames=max_frames, warmup_frames=warmup_frames,
            )
        else:
            result = await asyncio.to_thread(
                run_profiled_segmentation,
                input_path, segmenter, query_list,
                max_frames=max_frames, step=step,
                num_maskmem=num_maskmem,
            )
    except Exception as exc:
        _safe_delete(input_path)
        logging.exception("Profiling failed")
        raise HTTPException(status_code=500, detail=str(exc))
    finally:
        _safe_delete(input_path)

    # Serialize dataclass, handling any non-serializable fields
    out = dataclasses.asdict(result)
    # Include GSAM2 metrics if present
    gsam2 = getattr(result, "_gsam2_metrics", None)
    if gsam2:
        out["gsam2_metrics"] = gsam2
    return JSONResponse(out)


@app.post("/benchmark/analysis")
async def benchmark_analysis(
    video: UploadFile = File(...),
    mode: str = Form("detection"),
    detector: str = Form("yolo11"),
    segmenter: str = Form("GSAM2-L"),
    queries: str = Form("person,car,truck"),
    max_frames: int = Form(100),
    warmup_frames: int = Form(5),
    step: int = Form(60),
    num_maskmem: Optional[int] = Form(None),
):
    """Full roofline analysis: hardware + profiling + theoretical ceilings + bottleneck ID.

    Combines hardware extraction, profiled inference, and roofline model
    to identify bottlenecks and provide actionable recommendations.
    """
    import dataclasses
    from utils.hardware_info import get_hardware_info
    from utils.profiler import run_profiled_detection, run_profiled_segmentation
    from utils.roofline import compute_roofline

    if mode not in ("detection", "segmentation"):
        raise HTTPException(status_code=400, detail="mode must be 'detection' or 'segmentation'")

    input_path = _save_upload_to_tmp(video)
    await video.close()

    query_list = [q.strip() for q in queries.split(",") if q.strip()]

    try:
        # Get hardware info (cached, fast)
        hardware = await asyncio.to_thread(get_hardware_info)

        # Run profiling
        if mode == "detection":
            profiling = await asyncio.to_thread(
                run_profiled_detection,
                input_path, detector, query_list,
                max_frames=max_frames, warmup_frames=warmup_frames,
            )
        else:
            profiling = await asyncio.to_thread(
                run_profiled_segmentation,
                input_path, segmenter, query_list,
                max_frames=max_frames, step=step,
                num_maskmem=num_maskmem,
            )

        # Compute roofline
        roofline = compute_roofline(hardware, profiling)

    except Exception as exc:
        _safe_delete(input_path)
        logging.exception("Benchmark analysis failed")
        raise HTTPException(status_code=500, detail=str(exc))
    finally:
        _safe_delete(input_path)

    return JSONResponse({
        "hardware": dataclasses.asdict(hardware),
        "profiling": dataclasses.asdict(profiling),
        "roofline": dataclasses.asdict(roofline),
    })


if __name__ == "__main__":
    uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False)