import logging import os import shutil import subprocess import tempfile from typing import List, Tuple import cv2 import numpy as np def extract_frames_to_jpeg_dir( video_path: str, output_dir: str, max_frames: int = None, ) -> Tuple[List[str], float, int, int]: """Extract video frames as numbered JPEG files for SAM2 video predictor. Args: video_path: Path to input video. output_dir: Directory to write JPEG files into. max_frames: Optional cap on number of frames to extract. Returns: (frame_names, fps, width, height) where *frame_names* is a sorted list of filenames like ``000000.jpg``, ``000001.jpg``, etc. """ os.makedirs(output_dir, exist_ok=True) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError(f"Unable to open video: {video_path}") fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) frame_names: List[str] = [] idx = 0 while True: if max_frames is not None and idx >= max_frames: break success, frame = cap.read() if not success: break fname = f"{idx:06d}.jpg" cv2.imwrite(os.path.join(output_dir, fname), frame, [cv2.IMWRITE_JPEG_QUALITY, 100]) frame_names.append(fname) idx += 1 cap.release() if not frame_names: raise ValueError("Video decode produced zero frames.") return frame_names, fps, width, height def _transcode_with_ffmpeg(src_path: str, dst_path: str) -> None: cmd = [ "ffmpeg", "-y", "-i", src_path, "-c:v", "libx264", "-preset", "veryfast", "-pix_fmt", "yuv420p", "-movflags", "+faststart", dst_path, ] process = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False) if process.returncode != 0: err_msg = process.stderr.decode("utf-8", errors="ignore") logging.error("ffmpeg failed with code %d: %s", process.returncode, err_msg) raise RuntimeError(err_msg) else: logging.info("ffmpeg success") class VideoReader: def __init__(self, video_path: str): self.video_path = video_path self.cap = cv2.VideoCapture(video_path) if not self.cap.isOpened(): raise ValueError("Unable to open video.") self.fps = self.cap.get(cv2.CAP_PROP_FPS) or 30.0 self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) def __iter__(self): return self def __next__(self) -> np.ndarray: if not self.cap.isOpened(): raise StopIteration success, frame = self.cap.read() if not success: self.cap.release() raise StopIteration return frame def close(self): if self.cap.isOpened(): self.cap.release() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() class VideoWriter: def __init__(self, output_path: str, fps: float, width: int, height: int): self.output_path = output_path self.fps = fps self.width = width self.height = height self.temp_fd, self.temp_path = tempfile.mkstemp(prefix="raw_", suffix=".mp4") os.close(self.temp_fd) # Use mp4v for speed during writing, then transcode self.writer = cv2.VideoWriter(self.temp_path, cv2.VideoWriter_fourcc(*"mp4v"), self.fps, (self.width, self.height)) if not self.writer.isOpened(): os.remove(self.temp_path) raise ValueError("Failed to open VideoWriter.") def write(self, frame: np.ndarray): self.writer.write(frame) def close(self): if self.writer.isOpened(): self.writer.release() # Transcode phase try: _transcode_with_ffmpeg(self.temp_path, self.output_path) logging.debug("Transcoded video to H.264 for browser compatibility.") os.remove(self.temp_path) except FileNotFoundError: logging.warning("ffmpeg not found; serving fallback MP4V output.") shutil.move(self.temp_path, self.output_path) except RuntimeError as exc: logging.warning("ffmpeg transcode failed (%s); serving fallback MP4V output.", exc) shutil.move(self.temp_path, self.output_path) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def _ffmpeg_available() -> bool: """Check if ffmpeg is available on the system PATH.""" return shutil.which("ffmpeg") is not None class StreamingVideoWriter: """ Pipes raw BGR frames directly to an ffmpeg subprocess for H.264 encoding. Eliminates the cv2.VideoWriter + post-transcode round-trip. Falls back to VideoWriter if ffmpeg is unavailable. """ def __init__(self, output_path: str, fps: float, width: int, height: int): self.output_path = output_path self._fallback = None if not _ffmpeg_available(): logging.warning("ffmpeg not found; StreamingVideoWriter falling back to VideoWriter.") self._fallback = VideoWriter(output_path, fps, width, height) return cmd = [ "ffmpeg", "-y", "-f", "rawvideo", "-pix_fmt", "bgr24", "-s", f"{width}x{height}", "-r", str(fps), "-i", "pipe:", "-c:v", "libx264", "-preset", "veryfast", "-pix_fmt", "yuv420p", "-movflags", "+faststart", output_path, ] try: self.proc = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) except OSError as e: logging.warning("Failed to start ffmpeg (%s); falling back to VideoWriter.", e) self._fallback = VideoWriter(output_path, fps, width, height) def write(self, frame: np.ndarray): if self._fallback is not None: self._fallback.write(frame) return try: self.proc.stdin.write(frame.tobytes()) except BrokenPipeError: logging.error("ffmpeg pipe broken; frames may be lost.") def close(self): if self._fallback is not None: self._fallback.close() return try: self.proc.stdin.close() except OSError: pass self.proc.wait() if self.proc.returncode != 0: stderr = self.proc.stderr.read().decode("utf-8", errors="ignore") logging.error("StreamingVideoWriter ffmpeg exited with code %d: %s", self.proc.returncode, stderr) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()