Zhen Ye commited on
Commit
8d938e9
·
1 Parent(s): 64bbe44

Optimize streaming buffer and implement parallel depth pipeline

Browse files
Files changed (4) hide show
  1. app.py +34 -13
  2. inference.py +113 -15
  3. jobs/background.py +19 -39
  4. jobs/streaming.py +2 -1
app.py CHANGED
@@ -529,23 +529,40 @@ async def stream_video(job_id: str):
529
 
530
  async def stream_generator():
531
  loop = asyncio.get_running_loop()
 
 
532
  while True:
533
  q = get_stream(job_id)
534
  if not q:
535
  break
536
 
537
  try:
538
- # Get latest frame (skipping updated ones if laggy?)
539
- # Actually, standard queue get is fine if we consume fast enough.
540
- # To be super real-time, we could drain the queue?
541
- frame = q.get_nowait()
542
- while not q.empty():
543
- try:
544
- frame = q.get_nowait()
545
- except queue.Empty:
546
- break
 
 
 
 
 
 
 
 
 
 
 
 
 
 
547
 
548
  # Resize if too big (e.g. > 640 width)
 
549
  h, w = frame.shape[:2]
550
  if w > 640:
551
  scale = 640 / w
@@ -553,15 +570,19 @@ async def stream_video(job_id: str):
553
  frame = cv2.resize(frame, (640, new_h), interpolation=cv2.INTER_NEAREST)
554
 
555
  # Encode in thread
556
- # JPEG Quality = 50 (Balance between speed/size)
557
- encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 50]
558
  success, buffer = await loop.run_in_executor(None, cv2.imencode, '.jpg', frame, encode_param)
559
 
560
  if success:
561
  yield (b'--frame\r\n'
562
  b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')
563
- except queue.Empty:
564
- await asyncio.sleep(0.02)
 
 
 
 
565
  except Exception:
566
  await asyncio.sleep(0.1)
567
 
 
529
 
530
  async def stream_generator():
531
  loop = asyncio.get_running_loop()
532
+ buffered = False
533
+
534
  while True:
535
  q = get_stream(job_id)
536
  if not q:
537
  break
538
 
539
  try:
540
+ # Initial Buffer: Wait until we have enough frames or job is done
541
+ if not buffered:
542
+ if q.qsize() < 30:
543
+ # If queue is empty, wait a bit
544
+ await asyncio.sleep(0.1)
545
+ # Check if job is still running? For now just wait for buffer or stream close
546
+ continue
547
+ buffered = True
548
+
549
+ # Get ONE frame (no skipping)
550
+ # Use wait to allow generator to yield cleanly
551
+ try:
552
+ # Blocking get in executor to avoid hanging async loop?
553
+ # Actually standard queue.get() is blocking. get_nowait is not.
554
+ # We can sleep-poll for async compatibility
555
+ while q.empty():
556
+ await asyncio.sleep(0.01)
557
+ if not get_stream(job_id): # Stream closed
558
+ return
559
+
560
+ frame = q.get_nowait()
561
+ except queue.Empty:
562
+ continue
563
 
564
  # Resize if too big (e.g. > 640 width)
565
+ # Optimization: Only resize if needed
566
  h, w = frame.shape[:2]
567
  if w > 640:
568
  scale = 640 / w
 
570
  frame = cv2.resize(frame, (640, new_h), interpolation=cv2.INTER_NEAREST)
571
 
572
  # Encode in thread
573
+ # JPEG Quality = 60 (Better quality for smooth video)
574
+ encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 60]
575
  success, buffer = await loop.run_in_executor(None, cv2.imencode, '.jpg', frame, encode_param)
576
 
577
  if success:
578
  yield (b'--frame\r\n'
579
  b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')
580
+
581
+ # Control playback speed?
582
+ # If we blast frames as fast as possible, it might play accelerated.
583
+ # Ideally we want to sync to ~30fps.
584
+ await asyncio.sleep(0.033) # Simple pacer (~30fps)
585
+
586
  except Exception:
587
  await asyncio.sleep(0.1)
588
 
inference.py CHANGED
@@ -669,19 +669,72 @@ def run_inference(
669
  else:
670
  depth_estimators.append(None)
671
 
672
- # 4. Processing Queues
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
673
  # queue_in: (frame_idx, frame_data)
674
  # queue_out: (frame_idx, processed_frame, detections)
675
  queue_in = Queue(maxsize=16)
676
- # Bound queue_out to prevent OOM
677
- # Maxsize should be enough to keep writer busy but not explode memory
678
  queue_out_max = max(32, (len(detectors) if detectors else 1) * 4)
679
  queue_out = Queue(maxsize=queue_out_max)
680
 
681
- # 5. Worker Function
682
  def worker_task(gpu_idx: int):
683
  detector_instance = detectors[gpu_idx]
684
- depth_instance = depth_estimators[gpu_idx] if depth_estimators[gpu_idx] else None
685
 
686
  batch_size = detector_instance.max_batch_size if detector_instance.supports_batch else 1
687
  batch_accum = [] # List[Tuple[idx, frame]]
@@ -692,20 +745,65 @@ def run_inference(
692
  indices = [item[0] for item in batch_accum]
693
  frames = [item[1] for item in batch_accum]
694
 
695
- batch_outputs = infer_batch(
696
- frames, indices, queries, detector_instance,
697
- depth_estimator_instance=depth_instance,
698
- depth_scale=depth_scale
699
- )
700
-
701
- for out_item in batch_outputs:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
702
  while True:
703
  try:
704
- queue_out.put(out_item, timeout=1.0)
705
  break
706
  except Full:
707
  if job_id: _check_cancellation(job_id)
708
-
709
  batch_accum.clear()
710
 
711
  while True:
@@ -790,7 +888,7 @@ def run_inference(
790
 
791
  if stream_queue:
792
  try:
793
- stream_queue.put_nowait(p_frame)
794
  except:
795
  pass
796
 
 
669
  else:
670
  depth_estimators.append(None)
671
 
672
+ # 4. Phase 1: Pre-Scan (Depth Normalization Stats) - ONLY IF DEPTH ENABLED
673
+ global_min, global_max = 0.0, 1.0
674
+ if depth_estimator_name and depth_estimators[0]:
675
+ logging.info("Starting Phase 1: Pre-scan for depth stats...")
676
+
677
+ # We need a quick scan logic here.
678
+ # Since we have loaded models, we can use one of them to scan a few frames.
679
+ # Let's pick 0-th GPU model.
680
+ scan_est = depth_estimators[0]
681
+ scan_values = []
682
+
683
+ # Sample frames: First 10, Middle 10, Last 10
684
+ target_indices = set(list(range(0, 10)) +
685
+ list(range(total_frames//2, total_frames//2 + 10)) +
686
+ list(range(max(0, total_frames-10), total_frames)))
687
+ target_indices = sorted([i for i in target_indices if i < total_frames])
688
+
689
+ try:
690
+ # Quick reader scan
691
+ reader_scan = VideoReader(input_video_path)
692
+ scan_frames = []
693
+ for i, frame in enumerate(reader_scan):
694
+ if i in target_indices:
695
+ scan_frames.append(frame)
696
+ if i > max(target_indices):
697
+ break
698
+ reader_scan.close()
699
+
700
+ # Predict
701
+ with scan_est.lock:
702
+ # Batch if supported, else loop
703
+ if scan_est.supports_batch and scan_frames:
704
+ scan_res = scan_est.predict_batch(scan_frames)
705
+ else:
706
+ scan_res = [scan_est.predict(f) for f in scan_frames]
707
+
708
+ for r in scan_res:
709
+ if r.depth_map is not None:
710
+ scan_values.append(r.depth_map)
711
+
712
+ # Stats
713
+ if scan_values:
714
+ all_vals = np.concatenate([v.ravel() for v in scan_values])
715
+ valid = all_vals[np.isfinite(all_vals)]
716
+ if valid.size > 0:
717
+ global_min = float(np.percentile(valid, 1))
718
+ global_max = float(np.percentile(valid, 99))
719
+ # Prevent zero range
720
+ if abs(global_max - global_min) < 1e-6: global_max = global_min + 1.0
721
+
722
+ logging.info("Global Depth Range: %.2f - %.2f", global_min, global_max)
723
+
724
+ except Exception as e:
725
+ logging.warning("Pre-scan failed, using default range: %s", e)
726
+
727
+ # 5. Processing Queues
728
  # queue_in: (frame_idx, frame_data)
729
  # queue_out: (frame_idx, processed_frame, detections)
730
  queue_in = Queue(maxsize=16)
 
 
731
  queue_out_max = max(32, (len(detectors) if detectors else 1) * 4)
732
  queue_out = Queue(maxsize=queue_out_max)
733
 
734
+ # 6. Worker Function (Unified)
735
  def worker_task(gpu_idx: int):
736
  detector_instance = detectors[gpu_idx]
737
+ depth_instance = depth_estimators[gpu_idx] if gpu_idx < len(depth_estimators) else None # Handle mismatched lists safely
738
 
739
  batch_size = detector_instance.max_batch_size if detector_instance.supports_batch else 1
740
  batch_accum = [] # List[Tuple[idx, frame]]
 
745
  indices = [item[0] for item in batch_accum]
746
  frames = [item[1] for item in batch_accum]
747
 
748
+ # --- UNIFIED INFERENCE ---
749
+ # Run detection batch
750
+ try:
751
+ if detector_instance.supports_batch:
752
+ with detector_instance.lock:
753
+ det_results = detector_instance.predict_batch(frames, queries) # Assuming predict_batch takes queries
754
+ else:
755
+ with detector_instance.lock:
756
+ det_results = [detector_instance.predict(f, queries) for f in frames]
757
+ except Exception:
758
+ logging.exception("Batch detection failed")
759
+ det_results = [None] * len(frames)
760
+
761
+ # Run depth batch (if enabled)
762
+ depth_results = [None] * len(frames)
763
+ if depth_instance and depth_estimator_name:
764
+ try:
765
+ with depth_instance.lock:
766
+ if depth_instance.supports_batch:
767
+ depth_results = depth_instance.predict_batch(frames)
768
+ else:
769
+ depth_results = [depth_instance.predict(f) for f in frames]
770
+ except Exception:
771
+ logging.exception("Batch depth failed")
772
+
773
+ # --- POST PROCESSING ---
774
+ for i, (idx, frame, d_res, dep_res) in enumerate(zip(indices, frames, det_results, depth_results)):
775
+ # 1. Detections
776
+ detections = []
777
+ if d_res:
778
+ detections = _build_detection_records(
779
+ d_res.boxes, d_res.scores, d_res.labels, queries, d_res.label_names
780
+ )
781
+
782
+ # 2. Frame Rendering
783
+ processed = frame.copy()
784
+
785
+ # A. Render Depth Heatmap (if enabled)
786
+ # Overwrites original frame visual
787
+ if dep_res and dep_res.depth_map is not None:
788
+ processed = colorize_depth_map(dep_res.depth_map, global_min, global_max)
789
+ # Also optionally attach 'depth_rel' to detections based on this map?
790
+ try:
791
+ _attach_depth_from_result(detections, dep_res, depth_scale)
792
+ except: pass
793
+
794
+ # B. Render Boxes (on top of whatever visual we have)
795
+ display_labels = [_build_display_label(d) for d in detections]
796
+ if d_res:
797
+ processed = draw_boxes(processed, d_res.boxes, label_names=display_labels)
798
+
799
+ # 3. Output
800
  while True:
801
  try:
802
+ queue_out.put((idx, processed, detections), timeout=1.0)
803
  break
804
  except Full:
805
  if job_id: _check_cancellation(job_id)
806
+
807
  batch_accum.clear()
808
 
809
  while True:
 
888
 
889
  if stream_queue:
890
  try:
891
+ stream_queue.put(p_frame, timeout=0.01)
892
  except:
893
  pass
894
 
jobs/background.py CHANGED
@@ -39,6 +39,7 @@ async def process_video_async(job_id: str) -> None:
39
  )
40
  else:
41
  detections_list = None
 
42
  result_pkg = await asyncio.to_thread(
43
  run_inference,
44
  job.input_video_path,
@@ -47,50 +48,29 @@ async def process_video_async(job_id: str) -> None:
47
  None,
48
  job.detector_name,
49
  job_id,
50
- job.depth_estimator_name,
51
  job.depth_scale,
52
  stream_queue,
53
  )
54
- # run_inference now returns (path, detections)
55
  detection_path, detections_list = result_pkg
56
 
57
- # Try to run depth estimation (only if requested)
58
- if job.depth_estimator_name:
59
- try:
60
- depth_path = await asyncio.to_thread(
61
- run_depth_inference,
62
- job.input_video_path,
63
- str(get_depth_output_path(job_id)),
64
- detections_list, # Pass detections for overlay
65
- None, # max_frames
66
- job.depth_estimator_name,
67
- str(get_first_frame_depth_path(job_id)),
68
- job_id,
69
- stream_queue,
70
- )
71
- logging.info("Depth estimation completed for job %s", job_id)
72
- except (ImportError, ModuleNotFoundError) as exc:
73
- logging.exception("Depth model not available for job %s", job_id)
74
- depth_error = f"Depth model import failed: {exc}"
75
- partial_success = True
76
- except torch.cuda.OutOfMemoryError:
77
- logging.exception("Depth estimation failed due to GPU OOM for job %s", job_id)
78
- depth_error = "Depth estimation failed due to GPU memory limits"
79
- partial_success = True
80
- except RuntimeError as exc:
81
- # Handle cancellation specifically for depth
82
- if "cancelled" in str(exc).lower():
83
- logging.info("Depth processing cancelled for job %s", job_id)
84
- depth_error = "Depth processing cancelled"
85
- partial_success = True
86
- else:
87
- logging.exception("Depth estimation failed for job %s", job_id)
88
- depth_error = f"Depth processing error: {str(exc)}"
89
- partial_success = True
90
- except Exception as exc:
91
- logging.exception("Depth estimation failed for job %s", job_id)
92
- depth_error = f"Depth processing error: {str(exc)}"
93
- partial_success = True
94
 
95
  # Mark as completed (with or without depth)
96
  storage.update(
 
39
  )
40
  else:
41
  detections_list = None
42
+ # Unified inference pipeline (handles depth internally if enabled)
43
  result_pkg = await asyncio.to_thread(
44
  run_inference,
45
  job.input_video_path,
 
48
  None,
49
  job.detector_name,
50
  job_id,
51
+ job.depth_estimator_name, # Pass depth estimator to trigger unified loop
52
  job.depth_scale,
53
  stream_queue,
54
  )
 
55
  detection_path, detections_list = result_pkg
56
 
57
+ # If depth was requested, checking if output path exists for depth
58
+ # The unified pipeline creates 'output_video_path'.
59
+ # If depth enabled, it might have written depth there?
60
+ # Actually run_inference returns (video_path, detections).
61
+ # If depth was ON, the video at video_path *has* depth overlays.
62
+ # But the 'Depth Video' (heatmap only) is usually separate.
63
+ # Our Plan says: "Unified loop... Write Frame to Disk".
64
+ # If we want separate depth video, we need to instruct run_inference to write TWO videos?
65
+ # Or just update 'depth_path' to be the same main video if it's merged?
66
+ # Let's keep it simple: If depth enabled, the main video IS the depth view (overlay).
67
+ # Or if we want separate `depth_output_path`, we need `run_inference` to handle it.
68
+ # Let's assume for now `run_inference` writes the main visualization path.
69
+
70
+ if job.depth_estimator_name:
71
+ # In unified mode, the main video contains the depth viz
72
+ depth_path = detection_path
73
+ logging.info("Depth estimation included in main video for job %s", job_id)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74
 
75
  # Mark as completed (with or without depth)
76
  storage.update(
jobs/streaming.py CHANGED
@@ -12,7 +12,8 @@ def create_stream(job_id: str) -> queue.Queue:
12
  with _LOCK:
13
  # standard Queue, thread-safe
14
  # maxsize to prevent memory explosion if consumer is slow
15
- q = queue.Queue(maxsize=10)
 
16
  _STREAMS[job_id] = q
17
  return q
18
 
 
12
  with _LOCK:
13
  # standard Queue, thread-safe
14
  # maxsize to prevent memory explosion if consumer is slow
15
+ # Buffer increased to 300 (approx 10s at 30fps) for smooth streaming
16
+ q = queue.Queue(maxsize=300)
17
  _STREAMS[job_id] = q
18
  return q
19