import logging import os import shutil import subprocess import tempfile from typing import List, Tuple import cv2 import numpy as np def extract_frames(video_path: str) -> Tuple[List[np.ndarray], float, int, int]: cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError("Unable to open video.") fps = cap.get(cv2.CAP_PROP_FPS) or 0.0 width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) frames: List[np.ndarray] = [] success, frame = cap.read() while success: frames.append(frame) success, frame = cap.read() cap.release() if not frames: raise ValueError("Video decode produced zero frames.") return frames, fps, width, height def _transcode_with_ffmpeg(src_path: str, dst_path: str) -> None: cmd = [ "ffmpeg", "-y", "-i", src_path, "-c:v", "libx264", "-preset", "veryfast", "-pix_fmt", "yuv420p", "-movflags", "+faststart", dst_path, ] process = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False) if process.returncode != 0: err_msg = process.stderr.decode("utf-8", errors="ignore") logging.error("ffmpeg failed with code %d: %s", process.returncode, err_msg) raise RuntimeError(err_msg) else: logging.info("ffmpeg success") def write_video(frames: List[np.ndarray], output_path: str, fps: float, width: int, height: int) -> None: if not frames: raise ValueError("No frames available for writing.") temp_fd, temp_path = tempfile.mkstemp(prefix="raw_", suffix=".mp4") os.close(temp_fd) writer = cv2.VideoWriter(temp_path, cv2.VideoWriter_fourcc(*"mp4v"), fps or 1.0, (width, height)) if not writer.isOpened(): os.remove(temp_path) raise ValueError("Failed to open VideoWriter.") for frame in frames: writer.write(frame) writer.release() try: _transcode_with_ffmpeg(temp_path, output_path) logging.debug("Transcoded video to H.264 for browser compatibility.") os.remove(temp_path) except FileNotFoundError: logging.warning("ffmpeg not found; serving fallback MP4V output.") shutil.move(temp_path, output_path) except RuntimeError as exc: logging.warning("ffmpeg transcode failed (%s); serving fallback MP4V output.", exc) shutil.move(temp_path, output_path) class VideoReader: def __init__(self, video_path: str): self.video_path = video_path self.cap = cv2.VideoCapture(video_path) if not self.cap.isOpened(): raise ValueError("Unable to open video.") self.fps = self.cap.get(cv2.CAP_PROP_FPS) or 30.0 self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) def __iter__(self): return self def __next__(self) -> np.ndarray: if not self.cap.isOpened(): raise StopIteration success, frame = self.cap.read() if not success: self.cap.release() raise StopIteration return frame def close(self): if self.cap.isOpened(): self.cap.release() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() class AsyncVideoReader: """ Async video reader that decodes frames in a background thread. This prevents GPU starvation on multi-GPU systems by prefetching frames while the main thread is busy dispatching work to GPUs. """ def __init__(self, video_path: str, prefetch_size: int = 32): """ Initialize async video reader. Args: video_path: Path to video file prefetch_size: Number of frames to prefetch (default 32) """ from queue import Queue from threading import Thread self.video_path = video_path self.prefetch_size = prefetch_size # Open video to get metadata self._cap = cv2.VideoCapture(video_path) if not self._cap.isOpened(): raise ValueError(f"Unable to open video: {video_path}") self.fps = self._cap.get(cv2.CAP_PROP_FPS) or 30.0 self.width = int(self._cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.height = int(self._cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.total_frames = int(self._cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Prefetch queue self._queue: Queue = Queue(maxsize=prefetch_size) self._error: Exception = None self._finished = False # Start decoder thread self._thread = Thread(target=self._decode_loop, daemon=True) self._thread.start() def _decode_loop(self): """Background thread that continuously decodes frames.""" try: while True: success, frame = self._cap.read() if not success: break self._queue.put(frame) # Blocks when queue is full (backpressure) except Exception as e: self._error = e logging.error(f"AsyncVideoReader decode error: {e}") finally: self._cap.release() self._queue.put(None) # Sentinel to signal end self._finished = True def __iter__(self): return self def __next__(self) -> np.ndarray: if self._error: raise self._error frame = self._queue.get() if frame is None: raise StopIteration return frame def close(self): """Stop the decoder thread and release resources.""" # Signal thread to stop by releasing cap (if not already done) if self._cap.isOpened(): self._cap.release() # Drain queue to unblock thread if it's waiting on put() while not self._queue.empty(): try: self._queue.get_nowait() except: break def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() class VideoWriter: def __init__(self, output_path: str, fps: float, width: int, height: int): self.output_path = output_path self.fps = fps self.width = width self.height = height self.temp_fd, self.temp_path = tempfile.mkstemp(prefix="raw_", suffix=".mp4") os.close(self.temp_fd) # Use mp4v for speed during writing, then transcode self.writer = cv2.VideoWriter(self.temp_path, cv2.VideoWriter_fourcc(*"mp4v"), self.fps, (self.width, self.height)) if not self.writer.isOpened(): os.remove(self.temp_path) raise ValueError("Failed to open VideoWriter.") def write(self, frame: np.ndarray): self.writer.write(frame) def close(self): if self.writer.isOpened(): self.writer.release() # Transcode phase try: _transcode_with_ffmpeg(self.temp_path, self.output_path) logging.debug("Transcoded video to H.264 for browser compatibility.") os.remove(self.temp_path) except FileNotFoundError: logging.warning("ffmpeg not found; serving fallback MP4V output.") shutil.move(self.temp_path, self.output_path) except RuntimeError as exc: logging.warning("ffmpeg transcode failed (%s); serving fallback MP4V output.", exc) shutil.move(self.temp_path, self.output_path) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()