Zhen Ye commited on
Commit
301e154
·
1 Parent(s): a4e3c2b

feat(ttfs): instrument segmentation time-to-first-stream

Browse files
app.py CHANGED
@@ -28,6 +28,7 @@ except Exception as e:
28
  import asyncio
29
  import shutil
30
  import tempfile
 
31
  import uuid
32
  from contextlib import asynccontextmanager
33
  from datetime import timedelta
@@ -409,6 +410,8 @@ async def detect_async_endpoint(
409
  enable_depth: bool = Form(False),
410
  enable_gpt: bool = Form(True),
411
  ):
 
 
412
  if mode not in VALID_MODES:
413
  raise HTTPException(
414
  status_code=400,
@@ -434,6 +437,8 @@ async def detect_async_endpoint(
434
  finally:
435
  await video.close()
436
 
 
 
437
  # --- Mission-Driven Query Parsing ---
438
  mission_spec = None
439
  mission_mode = "LEGACY"
@@ -476,6 +481,8 @@ async def detect_async_endpoint(
476
  "LEGACY mode: no mission text, defaults=%s, GPT disabled", query_list
477
  )
478
 
 
 
479
  available_depth_estimators = set(list_depth_estimators())
480
  if depth_estimator not in available_depth_estimators:
481
  raise HTTPException(
@@ -490,6 +497,7 @@ async def detect_async_endpoint(
490
  active_depth = depth_estimator if enable_depth else None
491
 
492
  try:
 
493
  processed_frame, detections = process_first_frame(
494
  str(input_path),
495
  query_list,
@@ -498,6 +506,7 @@ async def detect_async_endpoint(
498
  segmenter_name=segmenter,
499
  )
500
  cv2.imwrite(str(first_frame_path), processed_frame)
 
501
  # GPT and depth are now handled in the async pipeline (enrichment thread)
502
  first_frame_gpt_results = None
503
  except Exception:
@@ -524,6 +533,7 @@ async def detect_async_endpoint(
524
  mission_spec=mission_spec,
525
  mission_mode=mission_mode,
526
  first_frame_gpt_results=first_frame_gpt_results,
 
527
  )
528
  get_job_storage().create(job)
529
  asyncio.create_task(process_video_async(job_id))
@@ -771,6 +781,15 @@ async def stream_video(job_id: str):
771
  loop = asyncio.get_running_loop()
772
  buffered = False
773
 
 
 
 
 
 
 
 
 
 
774
  # Get or create the asyncio.Event for this stream (must be in async context)
775
  event = get_stream_event(job_id)
776
 
@@ -782,10 +801,15 @@ async def stream_video(job_id: str):
782
  try:
783
  # Initial Buffer: Wait until we have enough frames or job is done
784
  if not buffered:
 
 
 
785
  if q.qsize() < 5:
786
  await asyncio.sleep(0.1)
787
  continue
788
  buffered = True
 
 
789
 
790
  # Event-driven wait — replaces busy-wait polling
791
  if event is not None:
@@ -811,6 +835,10 @@ async def stream_video(job_id: str):
811
  success, buffer = await loop.run_in_executor(None, cv2.imencode, '.jpg', frame, encode_param)
812
 
813
  if success:
 
 
 
 
814
  yield (b'--frame\r\n'
815
  b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')
816
 
 
28
  import asyncio
29
  import shutil
30
  import tempfile
31
+ import time
32
  import uuid
33
  from contextlib import asynccontextmanager
34
  from datetime import timedelta
 
410
  enable_depth: bool = Form(False),
411
  enable_gpt: bool = Form(True),
412
  ):
413
+ _ttfs_t0 = time.perf_counter()
414
+
415
  if mode not in VALID_MODES:
416
  raise HTTPException(
417
  status_code=400,
 
437
  finally:
438
  await video.close()
439
 
440
+ logging.info("[TTFS:%s] +%.1fs upload_saved", job_id, time.perf_counter() - _ttfs_t0)
441
+
442
  # --- Mission-Driven Query Parsing ---
443
  mission_spec = None
444
  mission_mode = "LEGACY"
 
481
  "LEGACY mode: no mission text, defaults=%s, GPT disabled", query_list
482
  )
483
 
484
+ logging.info("[TTFS:%s] +%.1fs mission_parsed", job_id, time.perf_counter() - _ttfs_t0)
485
+
486
  available_depth_estimators = set(list_depth_estimators())
487
  if depth_estimator not in available_depth_estimators:
488
  raise HTTPException(
 
497
  active_depth = depth_estimator if enable_depth else None
498
 
499
  try:
500
+ logging.info("[TTFS:%s] +%.1fs process_first_frame start", job_id, time.perf_counter() - _ttfs_t0)
501
  processed_frame, detections = process_first_frame(
502
  str(input_path),
503
  query_list,
 
506
  segmenter_name=segmenter,
507
  )
508
  cv2.imwrite(str(first_frame_path), processed_frame)
509
+ logging.info("[TTFS:%s] +%.1fs process_first_frame done", job_id, time.perf_counter() - _ttfs_t0)
510
  # GPT and depth are now handled in the async pipeline (enrichment thread)
511
  first_frame_gpt_results = None
512
  except Exception:
 
533
  mission_spec=mission_spec,
534
  mission_mode=mission_mode,
535
  first_frame_gpt_results=first_frame_gpt_results,
536
+ ttfs_t0=_ttfs_t0,
537
  )
538
  get_job_storage().create(job)
539
  asyncio.create_task(process_video_async(job_id))
 
781
  loop = asyncio.get_running_loop()
782
  buffered = False
783
 
784
+ # TTFS instrumentation
785
+ _first_yielded = False
786
+ _buffer_wait_logged = False
787
+ _job = get_job_storage().get(job_id)
788
+ _stream_t0 = _job.ttfs_t0 if _job else None
789
+
790
+ if _stream_t0:
791
+ logging.info("[TTFS:%s] +%.1fs stream_subscribed", job_id, time.perf_counter() - _stream_t0)
792
+
793
  # Get or create the asyncio.Event for this stream (must be in async context)
794
  event = get_stream_event(job_id)
795
 
 
801
  try:
802
  # Initial Buffer: Wait until we have enough frames or job is done
803
  if not buffered:
804
+ if not _buffer_wait_logged and _stream_t0:
805
+ logging.info("[TTFS:%s] +%.1fs stream_buffer_wait (qsize=%d)", job_id, time.perf_counter() - _stream_t0, q.qsize())
806
+ _buffer_wait_logged = True
807
  if q.qsize() < 5:
808
  await asyncio.sleep(0.1)
809
  continue
810
  buffered = True
811
+ if _stream_t0:
812
+ logging.info("[TTFS:%s] +%.1fs stream_buffer_ready", job_id, time.perf_counter() - _stream_t0)
813
 
814
  # Event-driven wait — replaces busy-wait polling
815
  if event is not None:
 
835
  success, buffer = await loop.run_in_executor(None, cv2.imencode, '.jpg', frame, encode_param)
836
 
837
  if success:
838
+ if not _first_yielded:
839
+ _first_yielded = True
840
+ if _stream_t0:
841
+ logging.info("[TTFS:%s] +%.1fs first_yield_to_client", job_id, time.perf_counter() - _stream_t0)
842
  yield (b'--frame\r\n'
843
  b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')
844
 
inference.py CHANGED
@@ -1277,6 +1277,7 @@ def run_grounded_sam2_tracking(
1277
  _perf_lock=None,
1278
  num_maskmem: Optional[int] = None,
1279
  detector_name: Optional[str] = None,
 
1280
  ) -> str:
1281
  """Run Grounded-SAM-2 video tracking pipeline.
1282
 
@@ -1292,6 +1293,13 @@ def run_grounded_sam2_tracking(
1292
  from models.segmenters.grounded_sam2 import MaskDictionary, ObjectInfo, LazyFrameObjects
1293
 
1294
  active_segmenter = segmenter_name or "GSAM2-L"
 
 
 
 
 
 
 
1295
  logging.info(
1296
  "Grounded-SAM-2 tracking: segmenter=%s, queries=%s, step=%d",
1297
  active_segmenter, queries, step,
@@ -1315,6 +1323,7 @@ def run_grounded_sam2_tracking(
1315
  if _perf_metrics is not None:
1316
  _perf_metrics["frame_extraction_ms"] = (time.perf_counter() - _t_ext) * 1000.0
1317
  total_frames = len(frame_names)
 
1318
  logging.info("Extracted %d frames to %s", total_frames, frame_dir)
1319
 
1320
  num_gpus = torch.cuda.device_count()
@@ -1474,6 +1483,7 @@ def run_grounded_sam2_tracking(
1474
 
1475
  def _writer_loop():
1476
  nonlocal render_done
 
1477
  next_idx = 0
1478
  buf: Dict[int, Tuple] = {}
1479
 
@@ -1615,6 +1625,9 @@ def run_grounded_sam2_tracking(
1615
  if stream_queue or job_id:
1616
  with _stream_lock:
1617
  _stream_deque.append(frm)
 
 
 
1618
 
1619
  next_idx += 1
1620
  if next_idx % 30 == 0:
@@ -1679,6 +1692,7 @@ def run_grounded_sam2_tracking(
1679
  "accumulated=%d frames in %.1fs",
1680
  r_prod, r_stream, accumulated, elapsed,
1681
  )
 
1682
 
1683
  # --- Phase 2: adaptive streaming ---
1684
  last_adjust = time.perf_counter()
@@ -1702,6 +1716,8 @@ def run_grounded_sam2_tracking(
1702
  stream_queue.put(frame, timeout=0.01)
1703
  except Exception:
1704
  pass
 
 
1705
  published += 1
1706
  last_publish_time = time.perf_counter()
1707
  time.sleep(frame_interval)
@@ -1757,6 +1773,8 @@ def run_grounded_sam2_tracking(
1757
  _publisher_thread = Thread(target=_stream_publisher_thread, daemon=True)
1758
  _publisher_thread.start()
1759
 
 
 
1760
  # ==================================================================
1761
  # Phase 1-4: Tracking (single-GPU fallback vs multi-GPU pipeline)
1762
  # Segments are fed incrementally to render_in as they complete.
@@ -1780,6 +1798,8 @@ def run_grounded_sam2_tracking(
1780
  segmenter._perf_metrics = _perf_metrics
1781
  segmenter._perf_lock = None
1782
 
 
 
1783
  if _perf_metrics is not None:
1784
  _t_track = time.perf_counter()
1785
 
@@ -1797,10 +1817,13 @@ def run_grounded_sam2_tracking(
1797
  seen.add(fi)
1798
  render_in.put((fi, LazyFrameObjects(segment_output, fi)))
1799
 
 
1800
  tracking_results = segmenter.process_video(
1801
  frame_dir, frame_names, queries,
1802
  on_segment=_feed_segment,
1803
  on_segment_output=_feed_segment_gpu,
 
 
1804
  )
1805
 
1806
  if _perf_metrics is not None:
@@ -1846,6 +1869,8 @@ def run_grounded_sam2_tracking(
1846
  seg._perf_metrics = _perf_metrics
1847
  seg._perf_lock = _actual_lock
1848
 
 
 
1849
  # Phase 2: Init SAM2 models/state per GPU (parallel)
1850
  if _perf_metrics is not None:
1851
  _t_init = time.perf_counter()
@@ -1866,6 +1891,8 @@ def run_grounded_sam2_tracking(
1866
  _perf_metrics["init_state_ms"] = (time.perf_counter() - _t_init) * 1000.0
1867
  _t_track = time.perf_counter()
1868
 
 
 
1869
  # Phase 3: Parallel segment processing (queue-based workers)
1870
  segments = list(range(0, total_frames, step))
1871
  num_total_segments = len(segments)
@@ -2028,6 +2055,8 @@ def run_grounded_sam2_tracking(
2028
  else {}
2029
  )
2030
  render_in.put((fi, tracking_results.get(fi, {})))
 
 
2031
  next_seg_idx += 1
2032
  continue
2033
 
@@ -2055,6 +2084,8 @@ def run_grounded_sam2_tracking(
2055
  ):
2056
  tracking_results[fi] = {}
2057
  render_in.put((fi, {}))
 
 
2058
  next_seg_idx += 1
2059
  continue
2060
 
@@ -2091,6 +2122,8 @@ def run_grounded_sam2_tracking(
2091
  LazyFrameObjects(segment_output, fi, remapping),
2092
  ))
2093
 
 
 
2094
  next_seg_idx += 1
2095
 
2096
  for t in seg_workers:
 
1277
  _perf_lock=None,
1278
  num_maskmem: Optional[int] = None,
1279
  detector_name: Optional[str] = None,
1280
+ _ttfs_t0: Optional[float] = None,
1281
  ) -> str:
1282
  """Run Grounded-SAM-2 video tracking pipeline.
1283
 
 
1293
  from models.segmenters.grounded_sam2 import MaskDictionary, ObjectInfo, LazyFrameObjects
1294
 
1295
  active_segmenter = segmenter_name or "GSAM2-L"
1296
+
1297
+ def _ttfs(msg):
1298
+ if _ttfs_t0 is not None:
1299
+ logging.info("[TTFS:%s] +%.1fs %s", job_id, time.perf_counter() - _ttfs_t0, msg)
1300
+
1301
+ _ttfs("enter run_grounded_sam2_tracking")
1302
+
1303
  logging.info(
1304
  "Grounded-SAM-2 tracking: segmenter=%s, queries=%s, step=%d",
1305
  active_segmenter, queries, step,
 
1323
  if _perf_metrics is not None:
1324
  _perf_metrics["frame_extraction_ms"] = (time.perf_counter() - _t_ext) * 1000.0
1325
  total_frames = len(frame_names)
1326
+ _ttfs(f"frame_extraction done ({total_frames} frames)")
1327
  logging.info("Extracted %d frames to %s", total_frames, frame_dir)
1328
 
1329
  num_gpus = torch.cuda.device_count()
 
1483
 
1484
  def _writer_loop():
1485
  nonlocal render_done
1486
+ _first_deposit = False
1487
  next_idx = 0
1488
  buf: Dict[int, Tuple] = {}
1489
 
 
1625
  if stream_queue or job_id:
1626
  with _stream_lock:
1627
  _stream_deque.append(frm)
1628
+ if not _first_deposit:
1629
+ _first_deposit = True
1630
+ _ttfs("first_frame_deposited_to_deque")
1631
 
1632
  next_idx += 1
1633
  if next_idx % 30 == 0:
 
1692
  "accumulated=%d frames in %.1fs",
1693
  r_prod, r_stream, accumulated, elapsed,
1694
  )
1695
+ _ttfs(f"publisher: startup_wait done ({accumulated} frames in {elapsed:.1f}s)")
1696
 
1697
  # --- Phase 2: adaptive streaming ---
1698
  last_adjust = time.perf_counter()
 
1716
  stream_queue.put(frame, timeout=0.01)
1717
  except Exception:
1718
  pass
1719
+ if published == 0:
1720
+ _ttfs("first_publish_frame")
1721
  published += 1
1722
  last_publish_time = time.perf_counter()
1723
  time.sleep(frame_interval)
 
1773
  _publisher_thread = Thread(target=_stream_publisher_thread, daemon=True)
1774
  _publisher_thread.start()
1775
 
1776
+ _ttfs("writer+publisher threads started")
1777
+
1778
  # ==================================================================
1779
  # Phase 1-4: Tracking (single-GPU fallback vs multi-GPU pipeline)
1780
  # Segments are fed incrementally to render_in as they complete.
 
1798
  segmenter._perf_metrics = _perf_metrics
1799
  segmenter._perf_lock = None
1800
 
1801
+ _ttfs(f"model loaded ({active_segmenter})")
1802
+
1803
  if _perf_metrics is not None:
1804
  _t_track = time.perf_counter()
1805
 
 
1817
  seen.add(fi)
1818
  render_in.put((fi, LazyFrameObjects(segment_output, fi)))
1819
 
1820
+ _ttfs("process_video started")
1821
  tracking_results = segmenter.process_video(
1822
  frame_dir, frame_names, queries,
1823
  on_segment=_feed_segment,
1824
  on_segment_output=_feed_segment_gpu,
1825
+ _ttfs_t0=_ttfs_t0,
1826
+ _ttfs_job_id=job_id,
1827
  )
1828
 
1829
  if _perf_metrics is not None:
 
1869
  seg._perf_metrics = _perf_metrics
1870
  seg._perf_lock = _actual_lock
1871
 
1872
+ _ttfs(f"model loaded ({active_segmenter}, {num_gpus} GPUs)")
1873
+
1874
  # Phase 2: Init SAM2 models/state per GPU (parallel)
1875
  if _perf_metrics is not None:
1876
  _t_init = time.perf_counter()
 
1891
  _perf_metrics["init_state_ms"] = (time.perf_counter() - _t_init) * 1000.0
1892
  _t_track = time.perf_counter()
1893
 
1894
+ _ttfs("multi-GPU tracking started")
1895
+
1896
  # Phase 3: Parallel segment processing (queue-based workers)
1897
  segments = list(range(0, total_frames, step))
1898
  num_total_segments = len(segments)
 
2055
  else {}
2056
  )
2057
  render_in.put((fi, tracking_results.get(fi, {})))
2058
+ if next_seg_idx == 0:
2059
+ _ttfs("first_segment_reconciled (multi-GPU, no detections)")
2060
  next_seg_idx += 1
2061
  continue
2062
 
 
2084
  ):
2085
  tracking_results[fi] = {}
2086
  render_in.put((fi, {}))
2087
+ if next_seg_idx == 0:
2088
+ _ttfs("first_segment_reconciled (multi-GPU, empty masks)")
2089
  next_seg_idx += 1
2090
  continue
2091
 
 
2122
  LazyFrameObjects(segment_output, fi, remapping),
2123
  ))
2124
 
2125
+ if next_seg_idx == 0:
2126
+ _ttfs("first_segment_reconciled (multi-GPU)")
2127
  next_seg_idx += 1
2128
 
2129
  for t in seg_workers:
jobs/background.py CHANGED
@@ -40,6 +40,7 @@ async def process_video_async(job_id: str) -> None:
40
  first_frame_gpt_results=job.first_frame_gpt_results,
41
  num_maskmem=7,
42
  detector_name=job.detector_name,
 
43
  )
44
  else:
45
  detections_list = None
 
40
  first_frame_gpt_results=job.first_frame_gpt_results,
41
  num_maskmem=7,
42
  detector_name=job.detector_name,
43
+ _ttfs_t0=job.ttfs_t0,
44
  )
45
  else:
46
  detections_list = None
jobs/models.py CHANGED
@@ -38,3 +38,4 @@ class JobInfo:
38
  mission_spec: Optional[Any] = None # utils.schemas.MissionSpecification
39
  mission_mode: str = "LEGACY" # "MISSION" or "LEGACY"
40
  first_frame_gpt_results: Optional[Dict[str, Any]] = None # Cached GPT results from process_first_frame
 
 
38
  mission_spec: Optional[Any] = None # utils.schemas.MissionSpecification
39
  mission_mode: str = "LEGACY" # "MISSION" or "LEGACY"
40
  first_frame_gpt_results: Optional[Dict[str, Any]] = None # Cached GPT results from process_first_frame
41
+ ttfs_t0: Optional[float] = None # TTFS anchor: time.perf_counter() at endpoint entry
models/segmenters/grounded_sam2.py CHANGED
@@ -715,6 +715,8 @@ class GroundedSAM2Segmenter(Segmenter):
715
  text_prompts: List[str],
716
  on_segment: Optional[Callable[[Dict[int, Dict[int, "ObjectInfo"]]], None]] = None,
717
  on_segment_output: Optional[Callable[["SegmentOutput"], None]] = None,
 
 
718
  ) -> Dict[int, Dict[int, ObjectInfo]]:
719
  """Run full Grounded-SAM-2 tracking pipeline on extracted JPEG frames.
720
 
@@ -819,6 +821,9 @@ class GroundedSAM2Segmenter(Segmenter):
819
  seg_results[fi] = all_results[fi]
820
  if on_segment and seg_results:
821
  on_segment(seg_results)
 
 
 
822
  continue
823
 
824
  # -- SAM2 image predictor on keyframe --
@@ -867,6 +872,9 @@ class GroundedSAM2Segmenter(Segmenter):
867
  seg_results_empty[fi] = {}
868
  if on_segment:
869
  on_segment(seg_results_empty)
 
 
 
870
  continue
871
 
872
  # -- SAM2 video predictor: propagate masks --
@@ -892,6 +900,9 @@ class GroundedSAM2Segmenter(Segmenter):
892
  sam2_masks.mask_width = first_info.mask.shape[-1] if first_info.mask.ndim >= 2 else 0
893
  if on_segment_output is not None:
894
  on_segment_output(segment_output)
 
 
 
895
 
896
  logging.info(
897
  "Grounded-SAM-2 tracking complete: %d frames, %d tracked objects",
 
715
  text_prompts: List[str],
716
  on_segment: Optional[Callable[[Dict[int, Dict[int, "ObjectInfo"]]], None]] = None,
717
  on_segment_output: Optional[Callable[["SegmentOutput"], None]] = None,
718
+ _ttfs_t0: Optional[float] = None,
719
+ _ttfs_job_id: Optional[str] = None,
720
  ) -> Dict[int, Dict[int, ObjectInfo]]:
721
  """Run full Grounded-SAM-2 tracking pipeline on extracted JPEG frames.
722
 
 
821
  seg_results[fi] = all_results[fi]
822
  if on_segment and seg_results:
823
  on_segment(seg_results)
824
+ if start_idx == 0 and _ttfs_t0 is not None:
825
+ logging.info("[TTFS:%s] +%.1fs first_segment_complete (no detections, step=%d)",
826
+ _ttfs_job_id, time.perf_counter() - _ttfs_t0, step)
827
  continue
828
 
829
  # -- SAM2 image predictor on keyframe --
 
872
  seg_results_empty[fi] = {}
873
  if on_segment:
874
  on_segment(seg_results_empty)
875
+ if start_idx == 0 and _ttfs_t0 is not None:
876
+ logging.info("[TTFS:%s] +%.1fs first_segment_complete (empty masks, step=%d)",
877
+ _ttfs_job_id, time.perf_counter() - _ttfs_t0, step)
878
  continue
879
 
880
  # -- SAM2 video predictor: propagate masks --
 
900
  sam2_masks.mask_width = first_info.mask.shape[-1] if first_info.mask.ndim >= 2 else 0
901
  if on_segment_output is not None:
902
  on_segment_output(segment_output)
903
+ if start_idx == 0 and _ttfs_t0 is not None:
904
+ logging.info("[TTFS:%s] +%.1fs first_segment_complete (step=%d)",
905
+ _ttfs_job_id, time.perf_counter() - _ttfs_t0, step)
906
 
907
  logging.info(
908
  "Grounded-SAM-2 tracking complete: %d frames, %d tracked objects",