Zhen Ye commited on
Commit
356dce8
·
1 Parent(s): 2392266

feat: Implement AsyncVideoReader for parallel video decoding

Browse files

Addresses CPU bottleneck during inference by moving video decoding to a
background thread with a prefetch buffer. Integrated into inference pipeline.

Files changed (2) hide show
  1. inference.py +8 -8
  2. utils/video.py +88 -0
inference.py CHANGED
@@ -23,7 +23,7 @@ from models.model_loader import load_detector, load_detector_on_device
23
  from models.segmenters.model_loader import load_segmenter, load_segmenter_on_device
24
  from models.depth_estimators.model_loader import load_depth_estimator, load_depth_estimator_on_device
25
  from models.depth_estimators.base import DepthEstimator
26
- from utils.video import extract_frames, write_video, VideoReader, VideoWriter
27
  from utils.gpt_distance import estimate_distance_gpt
28
  import tempfile
29
 
@@ -591,7 +591,7 @@ def run_inference(
591
 
592
  # 1. Setup Video Reader
593
  try:
594
- reader = VideoReader(input_video_path)
595
  except ValueError:
596
  logging.exception("Failed to open video at %s", input_video_path)
597
  raise
@@ -688,7 +688,7 @@ def run_inference(
688
 
689
  try:
690
  # Quick reader scan
691
- reader_scan = VideoReader(input_video_path)
692
  scan_frames = []
693
  for i, frame in enumerate(reader_scan):
694
  if i in target_indices:
@@ -965,7 +965,7 @@ def run_segmentation(
965
  ) -> str:
966
  # 1. Setup Reader
967
  try:
968
- reader = VideoReader(input_video_path)
969
  except ValueError:
970
  logging.exception("Failed to open video at %s", input_video_path)
971
  raise
@@ -1128,7 +1128,7 @@ def run_segmentation(
1128
 
1129
  # Feeder
1130
  try:
1131
- reader = VideoReader(input_video_path)
1132
  for i, frame in enumerate(reader):
1133
  _check_cancellation(job_id)
1134
  if max_frames is not None and i >= max_frames:
@@ -1161,7 +1161,7 @@ def run_depth_inference(
1161
  ) -> str:
1162
  # 1. Setup Reader
1163
  try:
1164
- reader = VideoReader(input_video_path)
1165
  except ValueError:
1166
  logging.exception("Failed to open video at %s", input_video_path)
1167
  raise
@@ -1264,7 +1264,7 @@ def run_depth_inference(
1264
  # We will iterate and pick.
1265
 
1266
  cnt = 0
1267
- reader_scan = VideoReader(input_video_path)
1268
  for i, frame in enumerate(reader_scan):
1269
  if i in target_indices:
1270
  scan_frames_data.append(frame)
@@ -1440,7 +1440,7 @@ def run_depth_inference(
1440
 
1441
  # Feeder
1442
  try:
1443
- reader = VideoReader(input_video_path)
1444
  for i, frame in enumerate(reader):
1445
  _check_cancellation(job_id)
1446
  if max_frames is not None and i >= max_frames:
 
23
  from models.segmenters.model_loader import load_segmenter, load_segmenter_on_device
24
  from models.depth_estimators.model_loader import load_depth_estimator, load_depth_estimator_on_device
25
  from models.depth_estimators.base import DepthEstimator
26
+ from utils.video import extract_frames, write_video, VideoReader, VideoWriter, AsyncVideoReader
27
  from utils.gpt_distance import estimate_distance_gpt
28
  import tempfile
29
 
 
591
 
592
  # 1. Setup Video Reader
593
  try:
594
+ reader = AsyncVideoReader(input_video_path)
595
  except ValueError:
596
  logging.exception("Failed to open video at %s", input_video_path)
597
  raise
 
688
 
689
  try:
690
  # Quick reader scan
691
+ reader_scan = AsyncVideoReader(input_video_path)
692
  scan_frames = []
693
  for i, frame in enumerate(reader_scan):
694
  if i in target_indices:
 
965
  ) -> str:
966
  # 1. Setup Reader
967
  try:
968
+ reader = AsyncVideoReader(input_video_path)
969
  except ValueError:
970
  logging.exception("Failed to open video at %s", input_video_path)
971
  raise
 
1128
 
1129
  # Feeder
1130
  try:
1131
+ # reader = VideoReader(input_video_path) # Reusing existing reader
1132
  for i, frame in enumerate(reader):
1133
  _check_cancellation(job_id)
1134
  if max_frames is not None and i >= max_frames:
 
1161
  ) -> str:
1162
  # 1. Setup Reader
1163
  try:
1164
+ reader = AsyncVideoReader(input_video_path)
1165
  except ValueError:
1166
  logging.exception("Failed to open video at %s", input_video_path)
1167
  raise
 
1264
  # We will iterate and pick.
1265
 
1266
  cnt = 0
1267
+ reader_scan = AsyncVideoReader(input_video_path)
1268
  for i, frame in enumerate(reader_scan):
1269
  if i in target_indices:
1270
  scan_frames_data.append(frame)
 
1440
 
1441
  # Feeder
1442
  try:
1443
+ # reader = VideoReader(input_video_path) # Reusing existing reader
1444
  for i, frame in enumerate(reader):
1445
  _check_cancellation(job_id)
1446
  if max_frames is not None and i >= max_frames:
utils/video.py CHANGED
@@ -114,6 +114,94 @@ class VideoReader:
114
  self.close()
115
 
116
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
117
  class VideoWriter:
118
  def __init__(self, output_path: str, fps: float, width: int, height: int):
119
  self.output_path = output_path
 
114
  self.close()
115
 
116
 
117
+ class AsyncVideoReader:
118
+ """
119
+ Async video reader that decodes frames in a background thread.
120
+
121
+ This prevents GPU starvation on multi-GPU systems by prefetching frames
122
+ while the main thread is busy dispatching work to GPUs.
123
+ """
124
+
125
+ def __init__(self, video_path: str, prefetch_size: int = 32):
126
+ """
127
+ Initialize async video reader.
128
+
129
+ Args:
130
+ video_path: Path to video file
131
+ prefetch_size: Number of frames to prefetch (default 32)
132
+ """
133
+ from queue import Queue
134
+ from threading import Thread
135
+
136
+ self.video_path = video_path
137
+ self.prefetch_size = prefetch_size
138
+
139
+ # Open video to get metadata
140
+ self._cap = cv2.VideoCapture(video_path)
141
+ if not self._cap.isOpened():
142
+ raise ValueError(f"Unable to open video: {video_path}")
143
+
144
+ self.fps = self._cap.get(cv2.CAP_PROP_FPS) or 30.0
145
+ self.width = int(self._cap.get(cv2.CAP_PROP_FRAME_WIDTH))
146
+ self.height = int(self._cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
147
+ self.total_frames = int(self._cap.get(cv2.CAP_PROP_FRAME_COUNT))
148
+
149
+ # Prefetch queue
150
+ self._queue: Queue = Queue(maxsize=prefetch_size)
151
+ self._error: Exception = None
152
+ self._finished = False
153
+
154
+ # Start decoder thread
155
+ self._thread = Thread(target=self._decode_loop, daemon=True)
156
+ self._thread.start()
157
+
158
+ def _decode_loop(self):
159
+ """Background thread that continuously decodes frames."""
160
+ try:
161
+ while True:
162
+ success, frame = self._cap.read()
163
+ if not success:
164
+ break
165
+ self._queue.put(frame) # Blocks when queue is full (backpressure)
166
+ except Exception as e:
167
+ self._error = e
168
+ logging.error(f"AsyncVideoReader decode error: {e}")
169
+ finally:
170
+ self._cap.release()
171
+ self._queue.put(None) # Sentinel to signal end
172
+ self._finished = True
173
+
174
+ def __iter__(self):
175
+ return self
176
+
177
+ def __next__(self) -> np.ndarray:
178
+ if self._error:
179
+ raise self._error
180
+
181
+ frame = self._queue.get()
182
+ if frame is None:
183
+ raise StopIteration
184
+ return frame
185
+
186
+ def close(self):
187
+ """Stop the decoder thread and release resources."""
188
+ # Signal thread to stop by releasing cap (if not already done)
189
+ if self._cap.isOpened():
190
+ self._cap.release()
191
+ # Drain queue to unblock thread if it's waiting on put()
192
+ while not self._queue.empty():
193
+ try:
194
+ self._queue.get_nowait()
195
+ except:
196
+ break
197
+
198
+ def __enter__(self):
199
+ return self
200
+
201
+ def __exit__(self, exc_type, exc_val, exc_tb):
202
+ self.close()
203
+
204
+
205
  class VideoWriter:
206
  def __init__(self, output_path: str, fps: float, width: int, height: int):
207
  self.output_path = output_path