Zhen Ye commited on
Commit
74ae00c
·
1 Parent(s): 517108e

feat: add detailed timing metrics to GroundedSAM2 segmenter

Browse files
app.py CHANGED
@@ -28,7 +28,6 @@ except Exception as e:
28
  import asyncio
29
  import shutil
30
  import tempfile
31
- import time
32
  import uuid
33
  from contextlib import asynccontextmanager
34
  from datetime import timedelta
@@ -89,7 +88,6 @@ async def _enrich_first_frame_gpt(
89
  """
90
  if not enable_gpt or not detections:
91
  return
92
- t_gpt_start = time.monotonic()
93
  try:
94
  # Non-LLM_EXTRACTED relevance filter runs BEFORE run_enrichment (FAST_PATH case)
95
  if mission_spec and mission_spec.parse_mode != "LLM_EXTRACTED":
@@ -101,12 +99,9 @@ async def _enrich_first_frame_gpt(
101
  if not filtered:
102
  for det in detections:
103
  det["assessment_status"] = AssessmentStatus.ASSESSED
104
- gpt_elapsed = time.monotonic() - t_gpt_start
105
- logging.info("TIMING gpt_first_frame=%.3fs for job %s (all non-relevant)", gpt_elapsed, job_id)
106
  get_job_storage().update(
107
  job_id,
108
  first_frame_detections=detections,
109
- timing_gpt_first_frame_s=gpt_elapsed,
110
  )
111
  logging.info("All detections non-relevant for job %s; marked ASSESSED", job_id)
112
  return
@@ -115,8 +110,6 @@ async def _enrich_first_frame_gpt(
115
  run_enrichment, 0, frame, detections, mission_spec,
116
  job_id=job_id,
117
  )
118
- gpt_elapsed = time.monotonic() - t_gpt_start
119
- logging.info("TIMING gpt_first_frame=%.3fs for job %s", gpt_elapsed, job_id)
120
  logging.info("Background GPT enrichment complete for job %s", job_id)
121
 
122
  if not gpt_results:
@@ -126,7 +119,6 @@ async def _enrich_first_frame_gpt(
126
  get_job_storage().update(
127
  job_id,
128
  first_frame_detections=detections,
129
- timing_gpt_first_frame_s=gpt_elapsed,
130
  )
131
  logging.info("All detections non-relevant for job %s; marked ASSESSED", job_id)
132
  return
@@ -141,7 +133,6 @@ async def _enrich_first_frame_gpt(
141
  job_id,
142
  first_frame_detections=detections,
143
  first_frame_gpt_results=gpt_results,
144
- timing_gpt_first_frame_s=gpt_elapsed,
145
  )
146
  logging.info("Updated first_frame_detections with GPT results for job %s", job_id)
147
 
@@ -425,7 +416,6 @@ async def detect_async_endpoint(
425
  raise HTTPException(status_code=400, detail="Video file is required.")
426
 
427
  job_id = uuid.uuid4().hex
428
- t_job_start = time.monotonic()
429
  job_dir = get_job_directory(job_id)
430
  input_path = get_input_video_path(job_id)
431
  output_path = get_output_video_path(job_id)
@@ -498,8 +488,6 @@ async def detect_async_endpoint(
498
  segmenter_name=segmenter,
499
  )
500
  cv2.imwrite(str(first_frame_path), processed_frame)
501
- first_frame_elapsed = time.monotonic() - t_job_start
502
- logging.info("TIMING first_frame=%.3fs for job %s", first_frame_elapsed, job_id)
503
  # GPT and depth are now handled in the async pipeline (enrichment thread)
504
  depth_map = None
505
  first_frame_gpt_results = None
@@ -527,7 +515,6 @@ async def detect_async_endpoint(
527
  mission_spec=mission_spec,
528
  mission_mode=mission_mode,
529
  first_frame_gpt_results=first_frame_gpt_results,
530
- timing_first_frame_s=first_frame_elapsed,
531
  )
532
  get_job_storage().create(job)
533
  asyncio.create_task(process_video_async(job_id))
@@ -582,13 +569,6 @@ async def detect_status(job_id: str):
582
  "completed_at": job.completed_at.isoformat() if job.completed_at else None,
583
  "error": job.error,
584
  "first_frame_detections": job.first_frame_detections,
585
- "timing": {
586
- "first_frame_s": job.timing_first_frame_s,
587
- "video_processing_s": job.timing_video_processing_s,
588
- "gpt_first_frame_s": job.timing_gpt_first_frame_s,
589
- "gpt_enrichment_s": job.timing_gpt_enrichment_s,
590
- "total_s": job.timing_total_s,
591
- },
592
  }
593
 
594
 
@@ -871,5 +851,80 @@ async def chat_threat_endpoint(
871
  raise HTTPException(status_code=500, detail=str(e))
872
 
873
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
874
  if __name__ == "__main__":
875
  uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False)
 
28
  import asyncio
29
  import shutil
30
  import tempfile
 
31
  import uuid
32
  from contextlib import asynccontextmanager
33
  from datetime import timedelta
 
88
  """
89
  if not enable_gpt or not detections:
90
  return
 
91
  try:
92
  # Non-LLM_EXTRACTED relevance filter runs BEFORE run_enrichment (FAST_PATH case)
93
  if mission_spec and mission_spec.parse_mode != "LLM_EXTRACTED":
 
99
  if not filtered:
100
  for det in detections:
101
  det["assessment_status"] = AssessmentStatus.ASSESSED
 
 
102
  get_job_storage().update(
103
  job_id,
104
  first_frame_detections=detections,
 
105
  )
106
  logging.info("All detections non-relevant for job %s; marked ASSESSED", job_id)
107
  return
 
110
  run_enrichment, 0, frame, detections, mission_spec,
111
  job_id=job_id,
112
  )
 
 
113
  logging.info("Background GPT enrichment complete for job %s", job_id)
114
 
115
  if not gpt_results:
 
119
  get_job_storage().update(
120
  job_id,
121
  first_frame_detections=detections,
 
122
  )
123
  logging.info("All detections non-relevant for job %s; marked ASSESSED", job_id)
124
  return
 
133
  job_id,
134
  first_frame_detections=detections,
135
  first_frame_gpt_results=gpt_results,
 
136
  )
137
  logging.info("Updated first_frame_detections with GPT results for job %s", job_id)
138
 
 
416
  raise HTTPException(status_code=400, detail="Video file is required.")
417
 
418
  job_id = uuid.uuid4().hex
 
419
  job_dir = get_job_directory(job_id)
420
  input_path = get_input_video_path(job_id)
421
  output_path = get_output_video_path(job_id)
 
488
  segmenter_name=segmenter,
489
  )
490
  cv2.imwrite(str(first_frame_path), processed_frame)
 
 
491
  # GPT and depth are now handled in the async pipeline (enrichment thread)
492
  depth_map = None
493
  first_frame_gpt_results = None
 
515
  mission_spec=mission_spec,
516
  mission_mode=mission_mode,
517
  first_frame_gpt_results=first_frame_gpt_results,
 
518
  )
519
  get_job_storage().create(job)
520
  asyncio.create_task(process_video_async(job_id))
 
569
  "completed_at": job.completed_at.isoformat() if job.completed_at else None,
570
  "error": job.error,
571
  "first_frame_detections": job.first_frame_detections,
 
 
 
 
 
 
 
572
  }
573
 
574
 
 
851
  raise HTTPException(status_code=500, detail=str(e))
852
 
853
 
854
+ @app.post("/benchmark")
855
+ async def benchmark_endpoint(
856
+ video: UploadFile = File(...),
857
+ queries: str = Form("person,car,truck"),
858
+ segmenter: str = Form("gsam2_large"),
859
+ step: int = Form(20),
860
+ ):
861
+ """Run instrumented GSAM2 pipeline and return latency breakdown JSON.
862
+
863
+ This is a long-running synchronous request (may take minutes).
864
+ Callers should set an appropriate HTTP timeout.
865
+ """
866
+ import threading
867
+
868
+ # Save uploaded video to temp path
869
+ input_path = tempfile.mktemp(suffix=".mp4", prefix="bench_in_")
870
+ output_path = tempfile.mktemp(suffix=".mp4", prefix="bench_out_")
871
+ try:
872
+ with open(input_path, "wb") as f:
873
+ shutil.copyfileobj(video.file, f)
874
+
875
+ query_list = [q.strip() for q in queries.split(",") if q.strip()]
876
+
877
+ metrics = {
878
+ "end_to_end_ms": 0.0,
879
+ "frame_extraction_ms": 0.0,
880
+ "tracking_total_ms": 0.0,
881
+ "gdino_total_ms": 0.0,
882
+ "sam_image_total_ms": 0.0,
883
+ "sam_video_total_ms": 0.0,
884
+ "id_reconciliation_ms": 0.0,
885
+ "render_total_ms": 0.0,
886
+ "writer_total_ms": 0.0,
887
+ "gpu_peak_mem_mb": 0.0,
888
+ }
889
+ lock = threading.Lock()
890
+
891
+ await asyncio.to_thread(
892
+ run_grounded_sam2_tracking,
893
+ input_path,
894
+ output_path,
895
+ query_list,
896
+ segmenter_name=segmenter,
897
+ step=step,
898
+ enable_gpt=False,
899
+ _perf_metrics=metrics,
900
+ _perf_lock=lock,
901
+ )
902
+
903
+ # Read frame count and fps from output video
904
+ total_frames = 0
905
+ fps = 0.0
906
+ cap = cv2.VideoCapture(output_path)
907
+ if cap.isOpened():
908
+ total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
909
+ fps = cap.get(cv2.CAP_PROP_FPS) or 0.0
910
+ cap.release()
911
+
912
+ num_gpus = torch.cuda.device_count()
913
+
914
+ return JSONResponse({
915
+ "total_frames": total_frames,
916
+ "fps": fps,
917
+ "num_gpus": num_gpus,
918
+ "metrics": metrics,
919
+ })
920
+
921
+ finally:
922
+ for p in (input_path, output_path):
923
+ try:
924
+ os.remove(p)
925
+ except OSError:
926
+ pass
927
+
928
+
929
  if __name__ == "__main__":
930
  uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False)
frontend/js/api/client.js CHANGED
@@ -192,18 +192,6 @@ APP.api.client.pollAsyncJob = async function () {
192
  syncGpt(status.first_frame_detections, "final sync");
193
  }
194
 
195
- // Display timing summary
196
- if (status.timing) {
197
- const t = status.timing;
198
- const parts = [];
199
- if (t.first_frame_s != null) parts.push(`1st frame: ${t.first_frame_s.toFixed(2)}s`);
200
- if (t.video_processing_s != null) parts.push(`video: ${t.video_processing_s.toFixed(2)}s`);
201
- if (t.gpt_first_frame_s != null) parts.push(`GPT: ${t.gpt_first_frame_s.toFixed(2)}s`);
202
- if (t.gpt_enrichment_s != null) parts.push(`GPT enrich: ${t.gpt_enrichment_s.toFixed(2)}s`);
203
- if (t.total_s != null) parts.push(`total: ${t.total_s.toFixed(2)}s`);
204
- if (parts.length) log(`Timing: ${parts.join(" | ")}`, "t");
205
- }
206
-
207
  try {
208
  await fetchProcessedVideo();
209
  await fetchDepthVideo();
 
192
  syncGpt(status.first_frame_detections, "final sync");
193
  }
194
 
 
 
 
 
 
 
 
 
 
 
 
 
195
  try {
196
  await fetchProcessedVideo();
197
  await fetchDepthVideo();
inference.py CHANGED
@@ -1629,6 +1629,8 @@ def run_grounded_sam2_tracking(
1629
  enable_gpt: bool = False,
1630
  mission_spec=None, # Optional[MissionSpecification]
1631
  first_frame_gpt_results: Optional[Dict[str, Any]] = None,
 
 
1632
  ) -> str:
1633
  """Run Grounded-SAM-2 video tracking pipeline.
1634
 
@@ -1652,9 +1654,20 @@ def run_grounded_sam2_tracking(
1652
  # 1. Extract frames to JPEG directory
1653
  frame_dir = tempfile.mkdtemp(prefix="gsam2_frames_")
1654
  try:
 
 
 
 
 
 
 
 
1655
  frame_names, fps, width, height = extract_frames_to_jpeg_dir(
1656
  input_video_path, frame_dir, max_frames=max_frames,
1657
  )
 
 
 
1658
  total_frames = len(frame_names)
1659
  logging.info("Extracted %d frames to %s", total_frames, frame_dir)
1660
 
@@ -1668,9 +1681,21 @@ def run_grounded_sam2_tracking(
1668
  device_str = "cuda:0" if torch.cuda.is_available() else "cpu"
1669
  segmenter = load_segmenter_on_device(active_segmenter, device_str)
1670
  _check_cancellation(job_id)
 
 
 
 
 
 
 
 
1671
  tracking_results = segmenter.process_video(
1672
  frame_dir, frame_names, queries,
1673
  )
 
 
 
 
1674
  logging.info(
1675
  "Single-GPU tracking complete: %d frames",
1676
  len(tracking_results),
@@ -1696,6 +1721,13 @@ def run_grounded_sam2_tracking(
1696
  segmenters = [f.result() for f in futs]
1697
  logging.info("Loaded %d segmenters", len(segmenters))
1698
 
 
 
 
 
 
 
 
1699
  # Phase 2: Init SAM2 models/state per GPU (parallel)
1700
  def _init_seg_state(seg):
1701
  seg._ensure_models_loaded()
@@ -1709,6 +1741,9 @@ def run_grounded_sam2_tracking(
1709
  futs = [pool.submit(_init_seg_state, seg) for seg in segmenters]
1710
  inference_states = [f.result() for f in futs]
1711
 
 
 
 
1712
  # Phase 3: Parallel segment processing (queue-based workers)
1713
  segments = list(range(0, total_frames, step))
1714
  seg_queue_in: Queue = Queue()
@@ -1822,6 +1857,9 @@ def run_grounded_sam2_tracking(
1822
  segment_data[seg_idx] = (start_idx, mask_dict, results)
1823
 
1824
  # Phase 4: Sequential ID reconciliation
 
 
 
1825
  global_id_counter = 0
1826
  sam2_masks = MaskDictionary()
1827
  tracking_results: Dict[int, Dict[int, ObjectInfo]] = {}
@@ -1909,6 +1947,10 @@ def run_grounded_sam2_tracking(
1909
  m.shape[-1] if m.ndim >= 2 else 0
1910
  )
1911
 
 
 
 
 
1912
  logging.info(
1913
  "Multi-GPU reconciliation complete: %d frames, %d objects",
1914
  len(tracking_results), global_id_counter,
@@ -1931,11 +1973,22 @@ def run_grounded_sam2_tracking(
1931
  break
1932
  fidx, fobjs = item
1933
  try:
 
 
 
1934
  frm = _gsam2_render_frame(
1935
  frame_dir, frame_names, fidx, fobjs,
1936
  height, width,
1937
  masks_only=enable_gpt,
1938
  )
 
 
 
 
 
 
 
 
1939
  payload = (fidx, frm, fobjs) if enable_gpt else (fidx, frm, {})
1940
  while True:
1941
  try:
@@ -1985,15 +2038,12 @@ def run_grounded_sam2_tracking(
1985
  break
1986
  frame_idx, frame_data, gpt_dets, ms = item
1987
  try:
1988
- t_enrich_start = time.monotonic()
1989
  gpt_res = run_enrichment(
1990
  frame_idx, frame_data, gpt_dets, ms,
1991
  first_frame_gpt_results=first_frame_gpt_results,
1992
  job_id=job_id,
1993
  relevance_refined_event=_relevance_refined,
1994
  )
1995
- gpt_enrichment_elapsed = time.monotonic() - t_enrich_start
1996
- logging.info("TIMING gpt_enrichment=%.3fs for job %s", gpt_enrichment_elapsed, job_id)
1997
 
1998
  # GSAM2-specific: store results in per-track dict and persist to job storage
1999
  if gpt_res:
@@ -2035,7 +2085,6 @@ def run_grounded_sam2_tracking(
2035
  job_id,
2036
  first_frame_detections=_st.first_frame_detections,
2037
  first_frame_gpt_results=gpt_res,
2038
- timing_gpt_enrichment_s=gpt_enrichment_elapsed,
2039
  )
2040
  logging.info(
2041
  "GSAM2 enrichment: updated first_frame_detections in job storage for %s",
@@ -2180,8 +2229,14 @@ def run_grounded_sam2_tracking(
2180
  if job_id:
2181
  set_track_data(job_id, next_idx, [])
2182
 
 
 
 
2183
  writer.write(frm)
2184
 
 
 
 
2185
  if stream_queue:
2186
  try:
2187
  from jobs.streaming import (
@@ -2237,6 +2292,11 @@ def run_grounded_sam2_tracking(
2237
  t.join()
2238
  writer_thread.join()
2239
 
 
 
 
 
 
2240
  logging.info("Grounded-SAM-2 output written to: %s", output_video_path)
2241
  return output_video_path
2242
 
 
1629
  enable_gpt: bool = False,
1630
  mission_spec=None, # Optional[MissionSpecification]
1631
  first_frame_gpt_results: Optional[Dict[str, Any]] = None,
1632
+ _perf_metrics: Optional[Dict[str, float]] = None,
1633
+ _perf_lock=None,
1634
  ) -> str:
1635
  """Run Grounded-SAM-2 video tracking pipeline.
1636
 
 
1654
  # 1. Extract frames to JPEG directory
1655
  frame_dir = tempfile.mkdtemp(prefix="gsam2_frames_")
1656
  try:
1657
+ if _perf_metrics is not None:
1658
+ _t_e2e = time.perf_counter()
1659
+ if torch.cuda.is_available():
1660
+ torch.cuda.reset_peak_memory_stats()
1661
+
1662
+ if _perf_metrics is not None:
1663
+ _t_ext = time.perf_counter()
1664
+
1665
  frame_names, fps, width, height = extract_frames_to_jpeg_dir(
1666
  input_video_path, frame_dir, max_frames=max_frames,
1667
  )
1668
+
1669
+ if _perf_metrics is not None:
1670
+ _perf_metrics["frame_extraction_ms"] = (time.perf_counter() - _t_ext) * 1000.0
1671
  total_frames = len(frame_names)
1672
  logging.info("Extracted %d frames to %s", total_frames, frame_dir)
1673
 
 
1681
  device_str = "cuda:0" if torch.cuda.is_available() else "cpu"
1682
  segmenter = load_segmenter_on_device(active_segmenter, device_str)
1683
  _check_cancellation(job_id)
1684
+
1685
+ if _perf_metrics is not None:
1686
+ segmenter._perf_metrics = _perf_metrics
1687
+ segmenter._perf_lock = None
1688
+
1689
+ if _perf_metrics is not None:
1690
+ _t_track = time.perf_counter()
1691
+
1692
  tracking_results = segmenter.process_video(
1693
  frame_dir, frame_names, queries,
1694
  )
1695
+
1696
+ if _perf_metrics is not None:
1697
+ _perf_metrics["tracking_total_ms"] = (time.perf_counter() - _t_track) * 1000.0
1698
+
1699
  logging.info(
1700
  "Single-GPU tracking complete: %d frames",
1701
  len(tracking_results),
 
1721
  segmenters = [f.result() for f in futs]
1722
  logging.info("Loaded %d segmenters", len(segmenters))
1723
 
1724
+ if _perf_metrics is not None:
1725
+ import threading as _th
1726
+ _actual_lock = _perf_lock or _th.Lock()
1727
+ for seg in segmenters:
1728
+ seg._perf_metrics = _perf_metrics
1729
+ seg._perf_lock = _actual_lock
1730
+
1731
  # Phase 2: Init SAM2 models/state per GPU (parallel)
1732
  def _init_seg_state(seg):
1733
  seg._ensure_models_loaded()
 
1741
  futs = [pool.submit(_init_seg_state, seg) for seg in segmenters]
1742
  inference_states = [f.result() for f in futs]
1743
 
1744
+ if _perf_metrics is not None:
1745
+ _t_track = time.perf_counter()
1746
+
1747
  # Phase 3: Parallel segment processing (queue-based workers)
1748
  segments = list(range(0, total_frames, step))
1749
  seg_queue_in: Queue = Queue()
 
1857
  segment_data[seg_idx] = (start_idx, mask_dict, results)
1858
 
1859
  # Phase 4: Sequential ID reconciliation
1860
+ if _perf_metrics is not None:
1861
+ _t_recon = time.perf_counter()
1862
+
1863
  global_id_counter = 0
1864
  sam2_masks = MaskDictionary()
1865
  tracking_results: Dict[int, Dict[int, ObjectInfo]] = {}
 
1947
  m.shape[-1] if m.ndim >= 2 else 0
1948
  )
1949
 
1950
+ if _perf_metrics is not None:
1951
+ _perf_metrics["id_reconciliation_ms"] = (time.perf_counter() - _t_recon) * 1000.0
1952
+ _perf_metrics["tracking_total_ms"] = (time.perf_counter() - _t_track) * 1000.0
1953
+
1954
  logging.info(
1955
  "Multi-GPU reconciliation complete: %d frames, %d objects",
1956
  len(tracking_results), global_id_counter,
 
1973
  break
1974
  fidx, fobjs = item
1975
  try:
1976
+ if _perf_metrics is not None:
1977
+ _t_r = time.perf_counter()
1978
+
1979
  frm = _gsam2_render_frame(
1980
  frame_dir, frame_names, fidx, fobjs,
1981
  height, width,
1982
  masks_only=enable_gpt,
1983
  )
1984
+
1985
+ if _perf_metrics is not None:
1986
+ _r_ms = (time.perf_counter() - _t_r) * 1000.0
1987
+ if _perf_lock:
1988
+ with _perf_lock: _perf_metrics["render_total_ms"] += _r_ms
1989
+ else:
1990
+ _perf_metrics["render_total_ms"] += _r_ms
1991
+
1992
  payload = (fidx, frm, fobjs) if enable_gpt else (fidx, frm, {})
1993
  while True:
1994
  try:
 
2038
  break
2039
  frame_idx, frame_data, gpt_dets, ms = item
2040
  try:
 
2041
  gpt_res = run_enrichment(
2042
  frame_idx, frame_data, gpt_dets, ms,
2043
  first_frame_gpt_results=first_frame_gpt_results,
2044
  job_id=job_id,
2045
  relevance_refined_event=_relevance_refined,
2046
  )
 
 
2047
 
2048
  # GSAM2-specific: store results in per-track dict and persist to job storage
2049
  if gpt_res:
 
2085
  job_id,
2086
  first_frame_detections=_st.first_frame_detections,
2087
  first_frame_gpt_results=gpt_res,
 
2088
  )
2089
  logging.info(
2090
  "GSAM2 enrichment: updated first_frame_detections in job storage for %s",
 
2229
  if job_id:
2230
  set_track_data(job_id, next_idx, [])
2231
 
2232
+ if _perf_metrics is not None:
2233
+ _t_w = time.perf_counter()
2234
+
2235
  writer.write(frm)
2236
 
2237
+ if _perf_metrics is not None:
2238
+ _perf_metrics["writer_total_ms"] += (time.perf_counter() - _t_w) * 1000.0
2239
+
2240
  if stream_queue:
2241
  try:
2242
  from jobs.streaming import (
 
2292
  t.join()
2293
  writer_thread.join()
2294
 
2295
+ if _perf_metrics is not None:
2296
+ _perf_metrics["end_to_end_ms"] = (time.perf_counter() - _t_e2e) * 1000.0
2297
+ if torch.cuda.is_available():
2298
+ _perf_metrics["gpu_peak_mem_mb"] = torch.cuda.max_memory_allocated() / (1024 * 1024)
2299
+
2300
  logging.info("Grounded-SAM-2 output written to: %s", output_video_path)
2301
  return output_video_path
2302
 
jobs/background.py CHANGED
@@ -1,6 +1,5 @@
1
  import asyncio
2
  import logging
3
- import time
4
  from datetime import datetime
5
 
6
  import torch
@@ -25,8 +24,6 @@ async def process_video_async(job_id: str) -> None:
25
  # Create stream for live view
26
  stream_queue = create_stream(job_id)
27
 
28
- t_video_start = time.monotonic()
29
-
30
  try:
31
  # Run detection or segmentation first
32
  if job.mode == "segmentation":
@@ -72,7 +69,7 @@ async def process_video_async(job_id: str) -> None:
72
  # If depth was ON, the video at video_path *has* depth overlays.
73
  # But the 'Depth Video' (heatmap only) is usually separate.
74
  # Our Plan says: "Unified loop... Write Frame to Disk".
75
- # If we want separate depth video, we need to instruct run_inference to write TWO videos?
76
  # Or just update 'depth_path' to be the same main video if it's merged?
77
  # Let's keep it simple: If depth enabled, the main video IS the depth view (overlay).
78
  # Or if we want separate `depth_output_path`, we need `run_inference` to handle it.
@@ -83,62 +80,42 @@ async def process_video_async(job_id: str) -> None:
83
  depth_path = detection_path
84
  logging.info("Depth estimation included in main video for job %s", job_id)
85
 
86
- video_elapsed = time.monotonic() - t_video_start
87
- completed_at = datetime.utcnow()
88
- total_elapsed = (completed_at - job.created_at).total_seconds()
89
- logging.info("TIMING video_processing=%.3fs for job %s", video_elapsed, job_id)
90
- logging.info("TIMING total=%.3fs for job %s", total_elapsed, job_id)
91
-
92
  # Mark as completed (with or without depth)
93
  storage.update(
94
  job_id,
95
  status=JobStatus.COMPLETED,
96
- completed_at=completed_at,
97
  output_video_path=detection_path,
98
  depth_output_path=depth_path,
99
  partial_success=partial_success,
100
  depth_error=depth_error,
101
- timing_video_processing_s=video_elapsed,
102
- timing_total_s=total_elapsed,
103
  )
104
 
105
  except RuntimeError as exc:
106
- video_elapsed = time.monotonic() - t_video_start
107
- completed_at = datetime.utcnow()
108
- total_elapsed = (completed_at - job.created_at).total_seconds()
109
  # Handle cancellation specifically
110
  if "cancelled" in str(exc).lower():
111
  logging.info("Job %s was cancelled", job_id)
112
  storage.update(
113
  job_id,
114
  status=JobStatus.CANCELLED,
115
- completed_at=completed_at,
116
  error="Cancelled by user",
117
- timing_video_processing_s=video_elapsed,
118
- timing_total_s=total_elapsed,
119
  )
120
  else:
121
  logging.exception("Background processing failed for job %s", job_id)
122
  storage.update(
123
  job_id,
124
  status=JobStatus.FAILED,
125
- completed_at=completed_at,
126
  error=str(exc),
127
- timing_video_processing_s=video_elapsed,
128
- timing_total_s=total_elapsed,
129
  )
130
  except Exception as exc:
131
- video_elapsed = time.monotonic() - t_video_start
132
- completed_at = datetime.utcnow()
133
- total_elapsed = (completed_at - job.created_at).total_seconds()
134
  logging.exception("Background processing failed for job %s", job_id)
135
  storage.update(
136
  job_id,
137
  status=JobStatus.FAILED,
138
- completed_at=completed_at,
139
  error=str(exc),
140
- timing_video_processing_s=video_elapsed,
141
- timing_total_s=total_elapsed,
142
  )
143
  finally:
144
  remove_stream(job_id)
 
1
  import asyncio
2
  import logging
 
3
  from datetime import datetime
4
 
5
  import torch
 
24
  # Create stream for live view
25
  stream_queue = create_stream(job_id)
26
 
 
 
27
  try:
28
  # Run detection or segmentation first
29
  if job.mode == "segmentation":
 
69
  # If depth was ON, the video at video_path *has* depth overlays.
70
  # But the 'Depth Video' (heatmap only) is usually separate.
71
  # Our Plan says: "Unified loop... Write Frame to Disk".
72
+ # If we want separate depth video, we need `run_inference` to handle it.
73
  # Or just update 'depth_path' to be the same main video if it's merged?
74
  # Let's keep it simple: If depth enabled, the main video IS the depth view (overlay).
75
  # Or if we want separate `depth_output_path`, we need `run_inference` to handle it.
 
80
  depth_path = detection_path
81
  logging.info("Depth estimation included in main video for job %s", job_id)
82
 
 
 
 
 
 
 
83
  # Mark as completed (with or without depth)
84
  storage.update(
85
  job_id,
86
  status=JobStatus.COMPLETED,
87
+ completed_at=datetime.utcnow(),
88
  output_video_path=detection_path,
89
  depth_output_path=depth_path,
90
  partial_success=partial_success,
91
  depth_error=depth_error,
 
 
92
  )
93
 
94
  except RuntimeError as exc:
 
 
 
95
  # Handle cancellation specifically
96
  if "cancelled" in str(exc).lower():
97
  logging.info("Job %s was cancelled", job_id)
98
  storage.update(
99
  job_id,
100
  status=JobStatus.CANCELLED,
101
+ completed_at=datetime.utcnow(),
102
  error="Cancelled by user",
 
 
103
  )
104
  else:
105
  logging.exception("Background processing failed for job %s", job_id)
106
  storage.update(
107
  job_id,
108
  status=JobStatus.FAILED,
109
+ completed_at=datetime.utcnow(),
110
  error=str(exc),
 
 
111
  )
112
  except Exception as exc:
 
 
 
113
  logging.exception("Background processing failed for job %s", job_id)
114
  storage.update(
115
  job_id,
116
  status=JobStatus.FAILED,
117
+ completed_at=datetime.utcnow(),
118
  error=str(exc),
 
 
119
  )
120
  finally:
121
  remove_stream(job_id)
jobs/models.py CHANGED
@@ -38,9 +38,3 @@ class JobInfo:
38
  mission_spec: Optional[Any] = None # utils.schemas.MissionSpecification
39
  mission_mode: str = "LEGACY" # "MISSION" or "LEGACY"
40
  first_frame_gpt_results: Optional[Dict[str, Any]] = None # Cached GPT results from process_first_frame
41
- # Latency measurements (seconds)
42
- timing_first_frame_s: Optional[float] = None
43
- timing_video_processing_s: Optional[float] = None
44
- timing_gpt_first_frame_s: Optional[float] = None
45
- timing_gpt_enrichment_s: Optional[float] = None
46
- timing_total_s: Optional[float] = None
 
38
  mission_spec: Optional[Any] = None # utils.schemas.MissionSpecification
39
  mission_mode: str = "LEGACY" # "MISSION" or "LEGACY"
40
  first_frame_gpt_results: Optional[Dict[str, Any]] = None # Cached GPT results from process_first_frame
 
 
 
 
 
 
models/segmenters/grounded_sam2.py CHANGED
@@ -10,6 +10,7 @@ Reference implementation:
10
 
11
  import copy
12
  import logging
 
13
  from contextlib import nullcontext
14
  from dataclasses import dataclass, field
15
  from typing import Any, Dict, List, Optional, Sequence, Tuple
@@ -357,11 +358,15 @@ class GroundedSAM2Segmenter(Segmenter):
357
  when no objects are detected.
358
  """
359
  self._ensure_models_loaded()
 
360
 
361
  prompt = self._gdino_detector._build_prompt(text_prompts)
362
  gdino_processor = self._gdino_detector.processor
363
  gdino_model = self._gdino_detector.model
364
 
 
 
 
365
  inputs = gdino_processor(
366
  images=image, text=prompt, return_tensors="pt"
367
  )
@@ -376,6 +381,14 @@ class GroundedSAM2Segmenter(Segmenter):
376
  target_sizes=[image.size[::-1]],
377
  )
378
 
 
 
 
 
 
 
 
 
379
  input_boxes = results[0]["boxes"]
380
  det_labels = results[0].get("text_labels") or results[0].get("labels", [])
381
  if torch.is_tensor(det_labels):
@@ -386,6 +399,9 @@ class GroundedSAM2Segmenter(Segmenter):
386
  return None, None, []
387
 
388
  # SAM2 image predictor
 
 
 
389
  self._image_predictor.set_image(np.array(image))
390
  masks, scores, logits = self._image_predictor.predict(
391
  point_coords=None,
@@ -394,6 +410,14 @@ class GroundedSAM2Segmenter(Segmenter):
394
  multimask_output=False,
395
  )
396
 
 
 
 
 
 
 
 
 
397
  if masks.ndim == 2:
398
  masks = masks[None]
399
  elif masks.ndim == 4:
@@ -423,6 +447,10 @@ class GroundedSAM2Segmenter(Segmenter):
423
  Dict mapping ``frame_idx`` → ``{obj_id: ObjectInfo}`` using the
424
  IDs from *mask_dict* (local, not yet reconciled).
425
  """
 
 
 
 
426
  self._video_predictor.reset_state(inference_state)
427
 
428
  for obj_id, obj_info in mask_dict.labels.items():
@@ -451,6 +479,14 @@ class GroundedSAM2Segmenter(Segmenter):
451
  frame_objects[out_obj_id] = info
452
  segment_results[out_frame_idx] = frame_objects
453
 
 
 
 
 
 
 
 
 
454
  return segment_results
455
 
456
  # -- Video-level tracking interface -------------------------------------
@@ -494,6 +530,7 @@ class GroundedSAM2Segmenter(Segmenter):
494
  device_type = device.split(":")[0]
495
  autocast_ctx = torch.autocast(device_type=device_type, dtype=torch.bfloat16) if device_type == "cuda" else nullcontext()
496
 
 
497
  sam2_masks = MaskDictionary()
498
  objects_count = 0
499
  all_results: Dict[int, Dict[int, ObjectInfo]] = {}
@@ -515,6 +552,9 @@ class GroundedSAM2Segmenter(Segmenter):
515
  mask_dict = MaskDictionary()
516
 
517
  # -- Grounding DINO detection on keyframe --
 
 
 
518
  inputs = gdino_processor(
519
  images=image, text=prompt, return_tensors="pt"
520
  )
@@ -530,6 +570,14 @@ class GroundedSAM2Segmenter(Segmenter):
530
  target_sizes=[image.size[::-1]],
531
  )
532
 
 
 
 
 
 
 
 
 
533
  input_boxes = results[0]["boxes"]
534
  det_labels = results[0].get("text_labels") or results[0].get("labels", [])
535
  if torch.is_tensor(det_labels):
@@ -554,6 +602,9 @@ class GroundedSAM2Segmenter(Segmenter):
554
  continue
555
 
556
  # -- SAM2 image predictor on keyframe --
 
 
 
557
  self._image_predictor.set_image(np.array(image))
558
  masks, scores, logits = self._image_predictor.predict(
559
  point_coords=None,
@@ -562,6 +613,14 @@ class GroundedSAM2Segmenter(Segmenter):
562
  multimask_output=False,
563
  )
564
 
 
 
 
 
 
 
 
 
565
  # Normalize mask dims
566
  if masks.ndim == 2:
567
  masks = masks[None]
@@ -577,18 +636,32 @@ class GroundedSAM2Segmenter(Segmenter):
577
  )
578
 
579
  # -- IoU matching to maintain persistent IDs --
 
 
 
580
  objects_count = mask_dict.update_masks(
581
  tracking_dict=sam2_masks,
582
  iou_threshold=self.iou_threshold,
583
  objects_count=objects_count,
584
  )
585
 
 
 
 
 
 
 
 
 
586
  if len(mask_dict.labels) == 0:
587
  for fi in range(start_idx, min(start_idx + step, total_frames)):
588
  all_results[fi] = {}
589
  continue
590
 
591
  # -- SAM2 video predictor: propagate masks --
 
 
 
592
  self._video_predictor.reset_state(inference_state)
593
 
594
  for obj_id, obj_info in mask_dict.labels.items():
@@ -625,6 +698,14 @@ class GroundedSAM2Segmenter(Segmenter):
625
  sam2_masks.mask_height = first_info.mask.shape[-2] if first_info.mask.ndim >= 2 else 0
626
  sam2_masks.mask_width = first_info.mask.shape[-1] if first_info.mask.ndim >= 2 else 0
627
 
 
 
 
 
 
 
 
 
628
  logging.info(
629
  "Grounded-SAM-2 tracking complete: %d frames, %d tracked objects",
630
  len(all_results), objects_count,
 
10
 
11
  import copy
12
  import logging
13
+ import time
14
  from contextlib import nullcontext
15
  from dataclasses import dataclass, field
16
  from typing import Any, Dict, List, Optional, Sequence, Tuple
 
358
  when no objects are detected.
359
  """
360
  self._ensure_models_loaded()
361
+ _pm = getattr(self, '_perf_metrics', None)
362
 
363
  prompt = self._gdino_detector._build_prompt(text_prompts)
364
  gdino_processor = self._gdino_detector.processor
365
  gdino_model = self._gdino_detector.model
366
 
367
+ if _pm is not None:
368
+ _t0 = time.perf_counter()
369
+
370
  inputs = gdino_processor(
371
  images=image, text=prompt, return_tensors="pt"
372
  )
 
381
  target_sizes=[image.size[::-1]],
382
  )
383
 
384
+ if _pm is not None:
385
+ _pl = getattr(self, '_perf_lock', None)
386
+ _d = (time.perf_counter() - _t0) * 1000.0
387
+ if _pl:
388
+ with _pl: _pm["gdino_total_ms"] += _d
389
+ else:
390
+ _pm["gdino_total_ms"] += _d
391
+
392
  input_boxes = results[0]["boxes"]
393
  det_labels = results[0].get("text_labels") or results[0].get("labels", [])
394
  if torch.is_tensor(det_labels):
 
399
  return None, None, []
400
 
401
  # SAM2 image predictor
402
+ if _pm is not None:
403
+ _t1 = time.perf_counter()
404
+
405
  self._image_predictor.set_image(np.array(image))
406
  masks, scores, logits = self._image_predictor.predict(
407
  point_coords=None,
 
410
  multimask_output=False,
411
  )
412
 
413
+ if _pm is not None:
414
+ _pl = getattr(self, '_perf_lock', None)
415
+ _d = (time.perf_counter() - _t1) * 1000.0
416
+ if _pl:
417
+ with _pl: _pm["sam_image_total_ms"] += _d
418
+ else:
419
+ _pm["sam_image_total_ms"] += _d
420
+
421
  if masks.ndim == 2:
422
  masks = masks[None]
423
  elif masks.ndim == 4:
 
447
  Dict mapping ``frame_idx`` → ``{obj_id: ObjectInfo}`` using the
448
  IDs from *mask_dict* (local, not yet reconciled).
449
  """
450
+ _pm = getattr(self, '_perf_metrics', None)
451
+ if _pm is not None:
452
+ _t0 = time.perf_counter()
453
+
454
  self._video_predictor.reset_state(inference_state)
455
 
456
  for obj_id, obj_info in mask_dict.labels.items():
 
479
  frame_objects[out_obj_id] = info
480
  segment_results[out_frame_idx] = frame_objects
481
 
482
+ if _pm is not None:
483
+ _pl = getattr(self, '_perf_lock', None)
484
+ _d = (time.perf_counter() - _t0) * 1000.0
485
+ if _pl:
486
+ with _pl: _pm["sam_video_total_ms"] += _d
487
+ else:
488
+ _pm["sam_video_total_ms"] += _d
489
+
490
  return segment_results
491
 
492
  # -- Video-level tracking interface -------------------------------------
 
530
  device_type = device.split(":")[0]
531
  autocast_ctx = torch.autocast(device_type=device_type, dtype=torch.bfloat16) if device_type == "cuda" else nullcontext()
532
 
533
+ _pm = getattr(self, '_perf_metrics', None)
534
  sam2_masks = MaskDictionary()
535
  objects_count = 0
536
  all_results: Dict[int, Dict[int, ObjectInfo]] = {}
 
552
  mask_dict = MaskDictionary()
553
 
554
  # -- Grounding DINO detection on keyframe --
555
+ if _pm is not None:
556
+ _t_gd = time.perf_counter()
557
+
558
  inputs = gdino_processor(
559
  images=image, text=prompt, return_tensors="pt"
560
  )
 
570
  target_sizes=[image.size[::-1]],
571
  )
572
 
573
+ if _pm is not None:
574
+ _pl = getattr(self, '_perf_lock', None)
575
+ _d = (time.perf_counter() - _t_gd) * 1000.0
576
+ if _pl:
577
+ with _pl: _pm["gdino_total_ms"] += _d
578
+ else:
579
+ _pm["gdino_total_ms"] += _d
580
+
581
  input_boxes = results[0]["boxes"]
582
  det_labels = results[0].get("text_labels") or results[0].get("labels", [])
583
  if torch.is_tensor(det_labels):
 
602
  continue
603
 
604
  # -- SAM2 image predictor on keyframe --
605
+ if _pm is not None:
606
+ _t_si = time.perf_counter()
607
+
608
  self._image_predictor.set_image(np.array(image))
609
  masks, scores, logits = self._image_predictor.predict(
610
  point_coords=None,
 
613
  multimask_output=False,
614
  )
615
 
616
+ if _pm is not None:
617
+ _pl = getattr(self, '_perf_lock', None)
618
+ _d = (time.perf_counter() - _t_si) * 1000.0
619
+ if _pl:
620
+ with _pl: _pm["sam_image_total_ms"] += _d
621
+ else:
622
+ _pm["sam_image_total_ms"] += _d
623
+
624
  # Normalize mask dims
625
  if masks.ndim == 2:
626
  masks = masks[None]
 
636
  )
637
 
638
  # -- IoU matching to maintain persistent IDs --
639
+ if _pm is not None:
640
+ _t_id = time.perf_counter()
641
+
642
  objects_count = mask_dict.update_masks(
643
  tracking_dict=sam2_masks,
644
  iou_threshold=self.iou_threshold,
645
  objects_count=objects_count,
646
  )
647
 
648
+ if _pm is not None:
649
+ _pl = getattr(self, '_perf_lock', None)
650
+ _d = (time.perf_counter() - _t_id) * 1000.0
651
+ if _pl:
652
+ with _pl: _pm["id_reconciliation_ms"] += _d
653
+ else:
654
+ _pm["id_reconciliation_ms"] += _d
655
+
656
  if len(mask_dict.labels) == 0:
657
  for fi in range(start_idx, min(start_idx + step, total_frames)):
658
  all_results[fi] = {}
659
  continue
660
 
661
  # -- SAM2 video predictor: propagate masks --
662
+ if _pm is not None:
663
+ _t_sv = time.perf_counter()
664
+
665
  self._video_predictor.reset_state(inference_state)
666
 
667
  for obj_id, obj_info in mask_dict.labels.items():
 
698
  sam2_masks.mask_height = first_info.mask.shape[-2] if first_info.mask.ndim >= 2 else 0
699
  sam2_masks.mask_width = first_info.mask.shape[-1] if first_info.mask.ndim >= 2 else 0
700
 
701
+ if _pm is not None:
702
+ _pl = getattr(self, '_perf_lock', None)
703
+ _d = (time.perf_counter() - _t_sv) * 1000.0
704
+ if _pl:
705
+ with _pl: _pm["sam_video_total_ms"] += _d
706
+ else:
707
+ _pm["sam_video_total_ms"] += _d
708
+
709
  logging.info(
710
  "Grounded-SAM-2 tracking complete: %d frames, %d tracked objects",
711
  len(all_results), objects_count,