Zhen Ye Claude Opus 4.6 commited on
Commit
395185d
·
1 Parent(s): f06dd83

fix: steady 24fps MJPEG streaming with no frame drops

Browse files

Restructure stream consumer to drain queue first before blocking on
event, preventing stalls when producer batches frames faster than
consumer reads. Make all queues/deques unbounded so frames are never
silently dropped.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

Files changed (3) hide show
  1. app.py +43 -23
  2. inference.py +9 -90
  3. jobs/streaming.py +3 -7
app.py CHANGED
@@ -551,8 +551,12 @@ async def stream_video(job_id: str):
551
  import queue as queue_mod
552
 
553
  async def stream_generator():
 
 
 
554
  loop = asyncio.get_running_loop()
555
  buffered = False
 
556
 
557
  # TTFS instrumentation
558
  _first_yielded = False
@@ -566,48 +570,67 @@ async def stream_video(job_id: str):
566
  # Get or create the asyncio.Event for this stream (must be in async context)
567
  event = get_stream_event(job_id)
568
 
569
- while True:
570
- q = get_stream(job_id)
571
- if not q:
572
- break
 
 
573
 
 
574
  try:
575
  # Initial Buffer: Wait until we have enough frames or job is done
576
  if not buffered:
577
  if not _buffer_wait_logged and _stream_t0:
578
  logging.info("[TTFS:%s] +%.1fs stream_buffer_wait (qsize=%d)", job_id, time.perf_counter() - _stream_t0, q.qsize())
579
  _buffer_wait_logged = True
580
- if q.qsize() < 5:
581
  await asyncio.sleep(0.1)
 
582
  continue
583
  buffered = True
584
  if _stream_t0:
585
  logging.info("[TTFS:%s] +%.1fs stream_buffer_ready", job_id, time.perf_counter() - _stream_t0)
586
 
587
- # Event-driven wait replaces busy-wait polling
588
- if event is not None:
589
- try:
590
- await asyncio.wait_for(event.wait(), timeout=1.0)
591
- event.clear()
592
- except asyncio.TimeoutError:
593
- if not get_stream(job_id):
594
- return
595
- continue
596
- else:
597
- # Fallback if no event (shouldn't happen)
598
- await asyncio.sleep(0.033)
599
-
600
- # Drain available frame (already pre-resized by publish_frame)
601
  try:
602
  frame = q.get_nowait()
603
  except queue_mod.Empty:
604
- continue
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
605
 
606
  # Encode in thread (frame already resized by publish_frame)
607
  encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 60]
608
  success, buffer = await loop.run_in_executor(None, cv2.imencode, '.jpg', frame, encode_param)
609
 
610
  if success:
 
611
  if not _first_yielded:
612
  _first_yielded = True
613
  if _stream_t0:
@@ -615,9 +638,6 @@ async def stream_video(job_id: str):
615
  yield (b'--frame\r\n'
616
  b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')
617
 
618
- # Simple pacer (~30fps)
619
- await asyncio.sleep(0.033)
620
-
621
  except Exception:
622
  await asyncio.sleep(0.1)
623
 
 
551
  import queue as queue_mod
552
 
553
  async def stream_generator():
554
+ STREAM_FPS = 24
555
+ FRAME_INTERVAL = 1.0 / STREAM_FPS # ~41.7ms
556
+
557
  loop = asyncio.get_running_loop()
558
  buffered = False
559
+ last_yield_time = 0.0
560
 
561
  # TTFS instrumentation
562
  _first_yielded = False
 
570
  # Get or create the asyncio.Event for this stream (must be in async context)
571
  event = get_stream_event(job_id)
572
 
573
+ # Hold a local ref to the queue so we can drain it even after remove_stream()
574
+ q = get_stream(job_id)
575
+ if not q:
576
+ return
577
+
578
+ stream_removed = False
579
 
580
+ while True:
581
  try:
582
  # Initial Buffer: Wait until we have enough frames or job is done
583
  if not buffered:
584
  if not _buffer_wait_logged and _stream_t0:
585
  logging.info("[TTFS:%s] +%.1fs stream_buffer_wait (qsize=%d)", job_id, time.perf_counter() - _stream_t0, q.qsize())
586
  _buffer_wait_logged = True
587
+ if q.qsize() < 5 and not stream_removed:
588
  await asyncio.sleep(0.1)
589
+ stream_removed = get_stream(job_id) is None
590
  continue
591
  buffered = True
592
  if _stream_t0:
593
  logging.info("[TTFS:%s] +%.1fs stream_buffer_ready", job_id, time.perf_counter() - _stream_t0)
594
 
595
+ # Try to get a frame from the queue first (non-blocking)
596
+ frame = None
 
 
 
 
 
 
 
 
 
 
 
 
597
  try:
598
  frame = q.get_nowait()
599
  except queue_mod.Empty:
600
+ pass
601
+
602
+ # Only block on event when queue is actually empty
603
+ if frame is None:
604
+ if stream_removed:
605
+ break # stream ended and queue fully drained
606
+ if event is not None:
607
+ try:
608
+ await asyncio.wait_for(event.wait(), timeout=1.0)
609
+ event.clear()
610
+ except asyncio.TimeoutError:
611
+ stream_removed = get_stream(job_id) is None
612
+ continue
613
+ else:
614
+ await asyncio.sleep(FRAME_INTERVAL)
615
+ # After waking, try the queue again
616
+ try:
617
+ frame = q.get_nowait()
618
+ except queue_mod.Empty:
619
+ stream_removed = get_stream(job_id) is None
620
+ continue
621
+
622
+ # Pace output at fixed 24fps
623
+ now = time.perf_counter()
624
+ wait = FRAME_INTERVAL - (now - last_yield_time)
625
+ if wait > 0:
626
+ await asyncio.sleep(wait)
627
 
628
  # Encode in thread (frame already resized by publish_frame)
629
  encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 60]
630
  success, buffer = await loop.run_in_executor(None, cv2.imencode, '.jpg', frame, encode_param)
631
 
632
  if success:
633
+ last_yield_time = time.perf_counter()
634
  if not _first_yielded:
635
  _first_yielded = True
636
  if _stream_t0:
 
638
  yield (b'--frame\r\n'
639
  b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')
640
 
 
 
 
641
  except Exception:
642
  await asyncio.sleep(0.1)
643
 
inference.py CHANGED
@@ -916,14 +916,11 @@ def run_inference(
916
  writer.write(p_frame)
917
 
918
  if stream_queue:
919
- try:
920
- from jobs.streaming import publish_frame as _publish
921
- if job_id:
922
- _publish(job_id, p_frame)
923
- else:
924
- stream_queue.put(p_frame, timeout=0.01)
925
- except:
926
- pass
927
 
928
  all_detections_map[next_idx] = dets
929
 
@@ -1244,7 +1241,7 @@ def run_grounded_sam2_tracking(
1244
  return dets
1245
 
1246
  # Shared streaming state (publisher ↔ writer)
1247
- _stream_deque: collections.deque = collections.deque(maxlen=200)
1248
  _stream_lock = RLock()
1249
  _stream_writer_done = Event()
1250
 
@@ -1337,70 +1334,25 @@ def run_grounded_sam2_tracking(
1337
  _stream_writer_done.set()
1338
 
1339
  def _stream_publisher_thread():
1340
- """Adaptive-rate publisher: reads from _stream_deque, publishes at measured pace."""
1341
  from jobs.streaming import publish_frame as _pub
1342
 
1343
- STARTUP_WAIT = 5.0 # max seconds to accumulate before streaming
1344
- MIN_FPS = 2.0
1345
- MAX_FPS = 30.0
1346
- HEARTBEAT_INTERVAL = 0.5 # re-publish last frame if deque empty
1347
- LOW_WATER = 10
1348
- HIGH_WATER = 50
1349
- ADJUST_INTERVAL = 1.0
1350
-
1351
- last_frame = None
1352
  published = 0
1353
 
1354
- # --- Phase 1: startup accumulation ---
1355
- t_start = time.perf_counter()
1356
  while True:
1357
- elapsed = time.perf_counter() - t_start
1358
- if elapsed >= STARTUP_WAIT:
1359
- break
1360
- if _stream_writer_done.is_set():
1361
- break
1362
- time.sleep(0.1)
1363
-
1364
- with _stream_lock:
1365
- accumulated = len(_stream_deque)
1366
- elapsed = max(time.perf_counter() - t_start, 0.1)
1367
- r_prod = accumulated / elapsed if accumulated > 0 else 10.0
1368
- r_stream = max(MIN_FPS, min(MAX_FPS, 0.85 * r_prod))
1369
-
1370
- logging.info(
1371
- "Stream publisher started: R_prod=%.1f fps, R_stream=%.1f fps, "
1372
- "accumulated=%d frames in %.1fs",
1373
- r_prod, r_stream, accumulated, elapsed,
1374
- )
1375
- _ttfs(f"publisher: startup_wait done ({accumulated} frames in {elapsed:.1f}s)")
1376
-
1377
- # --- Phase 2: adaptive streaming ---
1378
- last_adjust = time.perf_counter()
1379
- last_publish_time = 0.0
1380
-
1381
- while True:
1382
- frame_interval = 1.0 / r_stream
1383
-
1384
- # Try to pop a frame
1385
  frame = None
1386
  with _stream_lock:
1387
  if _stream_deque:
1388
  frame = _stream_deque.popleft()
1389
 
1390
  if frame is not None:
1391
- last_frame = frame
1392
  if job_id:
1393
  _pub(job_id, frame)
1394
  elif stream_queue:
1395
- try:
1396
- stream_queue.put(frame, timeout=0.01)
1397
- except Exception:
1398
- pass
1399
  if published == 0:
1400
  _ttfs("first_publish_frame")
1401
  published += 1
1402
- last_publish_time = time.perf_counter()
1403
- time.sleep(frame_interval)
1404
  else:
1405
  # Deque empty — check termination
1406
  if _stream_writer_done.is_set():
@@ -1408,40 +1360,7 @@ def run_grounded_sam2_tracking(
1408
  if not _stream_deque:
1409
  break
1410
  continue
1411
-
1412
- # Heartbeat: re-publish last frame to keep MJPEG alive
1413
- now = time.perf_counter()
1414
- if last_frame is not None and (now - last_publish_time) >= HEARTBEAT_INTERVAL:
1415
- if job_id:
1416
- _pub(job_id, last_frame)
1417
- elif stream_queue:
1418
- try:
1419
- stream_queue.put(last_frame, timeout=0.01)
1420
- except Exception:
1421
- pass
1422
- last_publish_time = now
1423
- time.sleep(0.05)
1424
-
1425
- # Adaptive rate adjustment (every ~1s)
1426
- now = time.perf_counter()
1427
- if now - last_adjust >= ADJUST_INTERVAL:
1428
- with _stream_lock:
1429
- level = len(_stream_deque)
1430
- if level < LOW_WATER:
1431
- r_stream = max(MIN_FPS, r_stream * 0.9)
1432
- elif level > HIGH_WATER:
1433
- r_stream = min(MAX_FPS, r_stream * 1.05)
1434
- last_adjust = now
1435
-
1436
- # Publish final frame
1437
- if last_frame is not None:
1438
- if job_id:
1439
- _pub(job_id, last_frame)
1440
- elif stream_queue:
1441
- try:
1442
- stream_queue.put(last_frame, timeout=0.01)
1443
- except Exception:
1444
- pass
1445
 
1446
  logging.info("Stream publisher finished: published %d frames", published)
1447
 
 
916
  writer.write(p_frame)
917
 
918
  if stream_queue:
919
+ from jobs.streaming import publish_frame as _publish
920
+ if job_id:
921
+ _publish(job_id, p_frame)
922
+ else:
923
+ stream_queue.put(p_frame)
 
 
 
924
 
925
  all_detections_map[next_idx] = dets
926
 
 
1241
  return dets
1242
 
1243
  # Shared streaming state (publisher ↔ writer)
1244
+ _stream_deque: collections.deque = collections.deque() # unbounded — publisher drains at its own pace
1245
  _stream_lock = RLock()
1246
  _stream_writer_done = Event()
1247
 
 
1334
  _stream_writer_done.set()
1335
 
1336
  def _stream_publisher_thread():
1337
+ """Forward every frame from deque to stream queue. Consumer handles pacing."""
1338
  from jobs.streaming import publish_frame as _pub
1339
 
 
 
 
 
 
 
 
 
 
1340
  published = 0
1341
 
 
 
1342
  while True:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1343
  frame = None
1344
  with _stream_lock:
1345
  if _stream_deque:
1346
  frame = _stream_deque.popleft()
1347
 
1348
  if frame is not None:
 
1349
  if job_id:
1350
  _pub(job_id, frame)
1351
  elif stream_queue:
1352
+ stream_queue.put(frame)
 
 
 
1353
  if published == 0:
1354
  _ttfs("first_publish_frame")
1355
  published += 1
 
 
1356
  else:
1357
  # Deque empty — check termination
1358
  if _stream_writer_done.is_set():
 
1360
  if not _stream_deque:
1361
  break
1362
  continue
1363
+ time.sleep(0.01) # brief wait for next frame
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1364
 
1365
  logging.info("Stream publisher finished: published %d frames", published)
1366
 
jobs/streaming.py CHANGED
@@ -27,7 +27,7 @@ def _resize_for_stream(frame: np.ndarray) -> np.ndarray:
27
  def create_stream(job_id: str) -> queue.Queue:
28
  """Create a new stream queue for a job. Returns the queue for backward compat."""
29
  with _LOCK:
30
- q = queue.Queue(maxsize=120)
31
  # Create event — will be lazily bound to the correct event loop
32
  event = None # Lazily created in get_stream_event
33
  _STREAMS[job_id] = (q, event)
@@ -67,7 +67,7 @@ def remove_stream(job_id: str) -> None:
67
 
68
 
69
  def publish_frame(job_id: str, frame: Any) -> None:
70
- """Publish a pre-resized frame to a job's stream. Non-blocking drop if full.
71
 
72
  Also sets the asyncio.Event to wake the stream consumer immediately.
73
  """
@@ -81,11 +81,7 @@ def publish_frame(job_id: str, frame: Any) -> None:
81
  # Pre-resize for streaming (avoids resize in async handler)
82
  resized = _resize_for_stream(frame)
83
 
84
- try:
85
- q.put_nowait(resized)
86
- except queue.Full:
87
- # Drop frame if consumer is too slow
88
- pass
89
 
90
  # Wake the async consumer if waiting
91
  if event is not None:
 
27
  def create_stream(job_id: str) -> queue.Queue:
28
  """Create a new stream queue for a job. Returns the queue for backward compat."""
29
  with _LOCK:
30
+ q = queue.Queue() # unbounded — no frame drops
31
  # Create event — will be lazily bound to the correct event loop
32
  event = None # Lazily created in get_stream_event
33
  _STREAMS[job_id] = (q, event)
 
67
 
68
 
69
  def publish_frame(job_id: str, frame: Any) -> None:
70
+ """Publish a pre-resized frame to a job's stream. Never drops frames.
71
 
72
  Also sets the asyncio.Event to wake the stream consumer immediately.
73
  """
 
81
  # Pre-resize for streaming (avoids resize in async handler)
82
  resized = _resize_for_stream(frame)
83
 
84
+ q.put(resized) # blocking put — no frame loss
 
 
 
 
85
 
86
  # Wake the async consumer if waiting
87
  if event is not None: