Zhen Ye Claude Opus 4.6 commited on
Commit
f09ca9c
·
1 Parent(s): b1974e2

fix: replace gated GSAM2 streaming with adaptive-rate publisher thread

Browse files

The old streaming approach (60-frame startup buffer + 20-frame safety
threshold + 3x frame duplication) prevented any frames from reaching
the frontend during processing. Replace with a dedicated publisher
thread that measures production rate during a 5s startup window, then
streams at 85% of measured rate with adaptive adjustments and heartbeat
keepalives. Reduce endpoint startup buffer from 30 to 5 frames.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

Files changed (3) hide show
  1. app.py +1 -1
  2. inference.py +124 -54
  3. jobs/streaming.py +1 -9
app.py CHANGED
@@ -775,7 +775,7 @@ async def stream_video(job_id: str):
775
  try:
776
  # Initial Buffer: Wait until we have enough frames or job is done
777
  if not buffered:
778
- if q.qsize() < 30:
779
  await asyncio.sleep(0.1)
780
  continue
781
  buffered = True
 
775
  try:
776
  # Initial Buffer: Wait until we have enough frames or job is done
777
  if not buffered:
778
+ if q.qsize() < 5:
779
  await asyncio.sleep(0.1)
780
  continue
781
  buffered = True
inference.py CHANGED
@@ -5,6 +5,7 @@
5
  # del os.environ["CUDA_VISIBLE_DEVICES"]
6
  import os
7
 
 
8
  import logging
9
  import time
10
  from threading import Event, RLock, Thread
@@ -1820,16 +1821,16 @@ def run_grounded_sam2_tracking(
1820
  except Exception as e:
1821
  logging.error("GSAM2 enrichment thread failed for frame %d: %s", frame_idx, e)
1822
 
 
 
 
 
 
1823
  def _writer_loop():
1824
  nonlocal render_done
1825
  next_idx = 0
1826
  buf: Dict[int, Tuple] = {}
1827
 
1828
- # Streaming constants
1829
- STARTUP_BUFFER = 60
1830
- SAFETY_THRESHOLD = 20
1831
- FRAME_DUP = 3
1832
-
1833
  # Per-track bbox history (replaces ByteTracker for GSAM2)
1834
  track_history: Dict[int, List] = {}
1835
  speed_est = SpeedEstimator(fps=fps) if enable_gpt else None
@@ -1845,28 +1846,7 @@ def run_grounded_sam2_tracking(
1845
  with StreamingVideoWriter(
1846
  output_video_path, fps, width, height
1847
  ) as writer:
1848
- # --- Phase 1: Startup buffering ---
1849
- playback_started = False
1850
- while not playback_started:
1851
- try:
1852
- idx, frm, fobjs = render_out.get(timeout=1.0)
1853
- buf[idx] = (frm, fobjs)
1854
- except Empty:
1855
- if not any(t.is_alive() for t in r_workers) and render_out.empty():
1856
- playback_started = True
1857
- break
1858
- continue
1859
-
1860
- ahead = sum(1 for k in buf if k >= next_idx)
1861
- if ahead >= STARTUP_BUFFER or ahead >= total_frames:
1862
- playback_started = True
1863
-
1864
- logging.info(
1865
- "Startup buffer filled (%d frames), beginning playback",
1866
- len(buf),
1867
- )
1868
-
1869
- # --- Phase 2: Write + stream with safety gating ---
1870
  while next_idx < total_frames:
1871
  try:
1872
  while next_idx not in buf:
@@ -1985,34 +1965,10 @@ def run_grounded_sam2_tracking(
1985
  if _perf_metrics is not None:
1986
  _perf_metrics["writer_total_ms"] += (time.perf_counter() - _t_w) * 1000.0
1987
 
1988
- # --- Streaming with buffer gating + frame duplication ---
1989
  if stream_queue or job_id:
1990
- # Drain any immediately available frames for accurate buffer level
1991
- while True:
1992
- try:
1993
- idx2, frm2, fobjs2 = render_out.get_nowait()
1994
- buf[idx2] = (frm2, fobjs2)
1995
- except Empty:
1996
- break
1997
-
1998
- buffer_ahead = sum(1 for k in buf if k > next_idx)
1999
-
2000
- if buffer_ahead >= SAFETY_THRESHOLD or next_idx >= total_frames - 1:
2001
- from jobs.streaming import publish_frame as _pub
2002
- if job_id:
2003
- for _ in range(FRAME_DUP):
2004
- _pub(job_id, frm)
2005
- else:
2006
- for _ in range(FRAME_DUP):
2007
- try:
2008
- stream_queue.put(frm, timeout=0.01)
2009
- except Exception:
2010
- pass
2011
- else:
2012
- logging.debug(
2013
- "Stream paused: buffer=%d < threshold=%d at frame %d",
2014
- buffer_ahead, SAFETY_THRESHOLD, next_idx,
2015
- )
2016
 
2017
  next_idx += 1
2018
  if next_idx % 30 == 0:
@@ -2032,6 +1988,7 @@ def run_grounded_sam2_tracking(
2032
  continue
2033
  finally:
2034
  render_done = True
 
2035
  # Shut down enrichment thread
2036
  if enrich_thread:
2037
  try:
@@ -2040,9 +1997,120 @@ def run_grounded_sam2_tracking(
2040
  except Exception:
2041
  logging.warning("GSAM2 enrichment thread shutdown timed out")
2042
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2043
  writer_thread = Thread(target=_writer_loop, daemon=True)
2044
  writer_thread.start()
2045
 
 
 
 
 
 
2046
  # ==================================================================
2047
  # Phase 1-4: Tracking (single-GPU fallback vs multi-GPU pipeline)
2048
  # Segments are fed incrementally to render_in as they complete.
@@ -2397,6 +2465,8 @@ def run_grounded_sam2_tracking(
2397
  for t in r_workers:
2398
  t.join()
2399
  writer_thread.join()
 
 
2400
 
2401
  if _perf_metrics is not None:
2402
  _perf_metrics["end_to_end_ms"] = (time.perf_counter() - _t_e2e) * 1000.0
 
5
  # del os.environ["CUDA_VISIBLE_DEVICES"]
6
  import os
7
 
8
+ import collections
9
  import logging
10
  import time
11
  from threading import Event, RLock, Thread
 
1821
  except Exception as e:
1822
  logging.error("GSAM2 enrichment thread failed for frame %d: %s", frame_idx, e)
1823
 
1824
+ # Shared streaming state (publisher ↔ writer)
1825
+ _stream_deque: collections.deque = collections.deque(maxlen=200)
1826
+ _stream_lock = RLock()
1827
+ _stream_writer_done = Event()
1828
+
1829
  def _writer_loop():
1830
  nonlocal render_done
1831
  next_idx = 0
1832
  buf: Dict[int, Tuple] = {}
1833
 
 
 
 
 
 
1834
  # Per-track bbox history (replaces ByteTracker for GSAM2)
1835
  track_history: Dict[int, List] = {}
1836
  speed_est = SpeedEstimator(fps=fps) if enable_gpt else None
 
1846
  with StreamingVideoWriter(
1847
  output_video_path, fps, width, height
1848
  ) as writer:
1849
+ # --- Write + stream (publisher handles pacing) ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1850
  while next_idx < total_frames:
1851
  try:
1852
  while next_idx not in buf:
 
1965
  if _perf_metrics is not None:
1966
  _perf_metrics["writer_total_ms"] += (time.perf_counter() - _t_w) * 1000.0
1967
 
1968
+ # --- Deposit frame for stream publisher ---
1969
  if stream_queue or job_id:
1970
+ with _stream_lock:
1971
+ _stream_deque.append(frm)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1972
 
1973
  next_idx += 1
1974
  if next_idx % 30 == 0:
 
1988
  continue
1989
  finally:
1990
  render_done = True
1991
+ _stream_writer_done.set()
1992
  # Shut down enrichment thread
1993
  if enrich_thread:
1994
  try:
 
1997
  except Exception:
1998
  logging.warning("GSAM2 enrichment thread shutdown timed out")
1999
 
2000
+ def _stream_publisher_thread():
2001
+ """Adaptive-rate publisher: reads from _stream_deque, publishes at measured pace."""
2002
+ from jobs.streaming import publish_frame as _pub
2003
+
2004
+ STARTUP_WAIT = 5.0 # max seconds to accumulate before streaming
2005
+ MIN_FPS = 2.0
2006
+ MAX_FPS = 30.0
2007
+ HEARTBEAT_INTERVAL = 0.5 # re-publish last frame if deque empty
2008
+ LOW_WATER = 10
2009
+ HIGH_WATER = 50
2010
+ ADJUST_INTERVAL = 1.0
2011
+
2012
+ last_frame = None
2013
+ published = 0
2014
+
2015
+ # --- Phase 1: startup accumulation ---
2016
+ t_start = time.perf_counter()
2017
+ while True:
2018
+ elapsed = time.perf_counter() - t_start
2019
+ if elapsed >= STARTUP_WAIT:
2020
+ break
2021
+ if _stream_writer_done.is_set():
2022
+ break
2023
+ time.sleep(0.1)
2024
+
2025
+ with _stream_lock:
2026
+ accumulated = len(_stream_deque)
2027
+ elapsed = max(time.perf_counter() - t_start, 0.1)
2028
+ r_prod = accumulated / elapsed if accumulated > 0 else 10.0
2029
+ r_stream = max(MIN_FPS, min(MAX_FPS, 0.85 * r_prod))
2030
+
2031
+ logging.info(
2032
+ "Stream publisher started: R_prod=%.1f fps, R_stream=%.1f fps, "
2033
+ "accumulated=%d frames in %.1fs",
2034
+ r_prod, r_stream, accumulated, elapsed,
2035
+ )
2036
+
2037
+ # --- Phase 2: adaptive streaming ---
2038
+ last_adjust = time.perf_counter()
2039
+ last_publish_time = 0.0
2040
+
2041
+ while True:
2042
+ frame_interval = 1.0 / r_stream
2043
+
2044
+ # Try to pop a frame
2045
+ frame = None
2046
+ with _stream_lock:
2047
+ if _stream_deque:
2048
+ frame = _stream_deque.popleft()
2049
+
2050
+ if frame is not None:
2051
+ last_frame = frame
2052
+ if job_id:
2053
+ _pub(job_id, frame)
2054
+ elif stream_queue:
2055
+ try:
2056
+ stream_queue.put(frame, timeout=0.01)
2057
+ except Exception:
2058
+ pass
2059
+ published += 1
2060
+ last_publish_time = time.perf_counter()
2061
+ time.sleep(frame_interval)
2062
+ else:
2063
+ # Deque empty — check termination
2064
+ if _stream_writer_done.is_set():
2065
+ with _stream_lock:
2066
+ if not _stream_deque:
2067
+ break
2068
+ continue
2069
+
2070
+ # Heartbeat: re-publish last frame to keep MJPEG alive
2071
+ now = time.perf_counter()
2072
+ if last_frame is not None and (now - last_publish_time) >= HEARTBEAT_INTERVAL:
2073
+ if job_id:
2074
+ _pub(job_id, last_frame)
2075
+ elif stream_queue:
2076
+ try:
2077
+ stream_queue.put(last_frame, timeout=0.01)
2078
+ except Exception:
2079
+ pass
2080
+ last_publish_time = now
2081
+ time.sleep(0.05)
2082
+
2083
+ # Adaptive rate adjustment (every ~1s)
2084
+ now = time.perf_counter()
2085
+ if now - last_adjust >= ADJUST_INTERVAL:
2086
+ with _stream_lock:
2087
+ level = len(_stream_deque)
2088
+ if level < LOW_WATER:
2089
+ r_stream = max(MIN_FPS, r_stream * 0.9)
2090
+ elif level > HIGH_WATER:
2091
+ r_stream = min(MAX_FPS, r_stream * 1.05)
2092
+ last_adjust = now
2093
+
2094
+ # Publish final frame
2095
+ if last_frame is not None:
2096
+ if job_id:
2097
+ _pub(job_id, last_frame)
2098
+ elif stream_queue:
2099
+ try:
2100
+ stream_queue.put(last_frame, timeout=0.01)
2101
+ except Exception:
2102
+ pass
2103
+
2104
+ logging.info("Stream publisher finished: published %d frames", published)
2105
+
2106
  writer_thread = Thread(target=_writer_loop, daemon=True)
2107
  writer_thread.start()
2108
 
2109
+ _publisher_thread = None
2110
+ if stream_queue or job_id:
2111
+ _publisher_thread = Thread(target=_stream_publisher_thread, daemon=True)
2112
+ _publisher_thread.start()
2113
+
2114
  # ==================================================================
2115
  # Phase 1-4: Tracking (single-GPU fallback vs multi-GPU pipeline)
2116
  # Segments are fed incrementally to render_in as they complete.
 
2465
  for t in r_workers:
2466
  t.join()
2467
  writer_thread.join()
2468
+ if _publisher_thread is not None:
2469
+ _publisher_thread.join(timeout=15)
2470
 
2471
  if _perf_metrics is not None:
2472
  _perf_metrics["end_to_end_ms"] = (time.perf_counter() - _t_e2e) * 1000.0
jobs/streaming.py CHANGED
@@ -27,7 +27,7 @@ def _resize_for_stream(frame: np.ndarray) -> np.ndarray:
27
  def create_stream(job_id: str) -> queue.Queue:
28
  """Create a new stream queue for a job. Returns the queue for backward compat."""
29
  with _LOCK:
30
- q = queue.Queue(maxsize=60)
31
  # Create event — will be lazily bound to the correct event loop
32
  event = None # Lazily created in get_stream_event
33
  _STREAMS[job_id] = (q, event)
@@ -90,11 +90,3 @@ def publish_frame(job_id: str, frame: Any) -> None:
90
  # Wake the async consumer if waiting
91
  if event is not None:
92
  event.set()
93
-
94
-
95
- def publish_frame_to_queue(q: queue.Queue, frame: Any) -> None:
96
- """Publish to a specific queue object. Non-blocking drop."""
97
- try:
98
- q.put_nowait(frame)
99
- except queue.Full:
100
- pass
 
27
  def create_stream(job_id: str) -> queue.Queue:
28
  """Create a new stream queue for a job. Returns the queue for backward compat."""
29
  with _LOCK:
30
+ q = queue.Queue(maxsize=120)
31
  # Create event — will be lazily bound to the correct event loop
32
  event = None # Lazily created in get_stream_event
33
  _STREAMS[job_id] = (q, event)
 
90
  # Wake the async consumer if waiting
91
  if event is not None:
92
  event.set()