Spaces:
Paused
Paused
| import logging | |
| import os | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| from typing import List, Tuple | |
| import cv2 | |
| import numpy as np | |
| def extract_frames_to_jpeg_dir( | |
| video_path: str, | |
| output_dir: str, | |
| max_frames: int = None, | |
| ) -> Tuple[List[str], float, int, int]: | |
| """Extract video frames as numbered JPEG files for SAM2 video predictor. | |
| Args: | |
| video_path: Path to input video. | |
| output_dir: Directory to write JPEG files into. | |
| max_frames: Optional cap on number of frames to extract. | |
| Returns: | |
| (frame_names, fps, width, height) where *frame_names* is a sorted | |
| list of filenames like ``000000.jpg``, ``000001.jpg``, etc. | |
| """ | |
| os.makedirs(output_dir, exist_ok=True) | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise ValueError(f"Unable to open video: {video_path}") | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| frame_names: List[str] = [] | |
| idx = 0 | |
| while True: | |
| if max_frames is not None and idx >= max_frames: | |
| break | |
| success, frame = cap.read() | |
| if not success: | |
| break | |
| fname = f"{idx:06d}.jpg" | |
| cv2.imwrite(os.path.join(output_dir, fname), frame, [cv2.IMWRITE_JPEG_QUALITY, 100]) | |
| frame_names.append(fname) | |
| idx += 1 | |
| cap.release() | |
| if not frame_names: | |
| raise ValueError("Video decode produced zero frames.") | |
| return frame_names, fps, width, height | |
| def _transcode_with_ffmpeg(src_path: str, dst_path: str) -> None: | |
| cmd = [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", | |
| src_path, | |
| "-c:v", | |
| "libx264", | |
| "-preset", | |
| "veryfast", | |
| "-pix_fmt", | |
| "yuv420p", | |
| "-movflags", | |
| "+faststart", | |
| dst_path, | |
| ] | |
| process = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False) | |
| if process.returncode != 0: | |
| err_msg = process.stderr.decode("utf-8", errors="ignore") | |
| logging.error("ffmpeg failed with code %d: %s", process.returncode, err_msg) | |
| raise RuntimeError(err_msg) | |
| else: | |
| logging.info("ffmpeg success") | |
| class VideoReader: | |
| def __init__(self, video_path: str): | |
| self.video_path = video_path | |
| self.cap = cv2.VideoCapture(video_path) | |
| if not self.cap.isOpened(): | |
| raise ValueError("Unable to open video.") | |
| self.fps = self.cap.get(cv2.CAP_PROP_FPS) or 30.0 | |
| self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| def __iter__(self): | |
| return self | |
| def __next__(self) -> np.ndarray: | |
| if not self.cap.isOpened(): | |
| raise StopIteration | |
| success, frame = self.cap.read() | |
| if not success: | |
| self.cap.release() | |
| raise StopIteration | |
| return frame | |
| def close(self): | |
| if self.cap.isOpened(): | |
| self.cap.release() | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.close() | |
| class VideoWriter: | |
| def __init__(self, output_path: str, fps: float, width: int, height: int): | |
| self.output_path = output_path | |
| self.fps = fps | |
| self.width = width | |
| self.height = height | |
| self.temp_fd, self.temp_path = tempfile.mkstemp(prefix="raw_", suffix=".mp4") | |
| os.close(self.temp_fd) | |
| # Use mp4v for speed during writing, then transcode | |
| self.writer = cv2.VideoWriter(self.temp_path, cv2.VideoWriter_fourcc(*"mp4v"), self.fps, (self.width, self.height)) | |
| if not self.writer.isOpened(): | |
| os.remove(self.temp_path) | |
| raise ValueError("Failed to open VideoWriter.") | |
| def write(self, frame: np.ndarray): | |
| self.writer.write(frame) | |
| def close(self): | |
| if self.writer.isOpened(): | |
| self.writer.release() | |
| # Transcode phase | |
| try: | |
| _transcode_with_ffmpeg(self.temp_path, self.output_path) | |
| logging.debug("Transcoded video to H.264 for browser compatibility.") | |
| os.remove(self.temp_path) | |
| except FileNotFoundError: | |
| logging.warning("ffmpeg not found; serving fallback MP4V output.") | |
| shutil.move(self.temp_path, self.output_path) | |
| except RuntimeError as exc: | |
| logging.warning("ffmpeg transcode failed (%s); serving fallback MP4V output.", exc) | |
| shutil.move(self.temp_path, self.output_path) | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.close() | |
| def _ffmpeg_available() -> bool: | |
| """Check if ffmpeg is available on the system PATH.""" | |
| return shutil.which("ffmpeg") is not None | |
| class StreamingVideoWriter: | |
| """ | |
| Pipes raw BGR frames directly to an ffmpeg subprocess for H.264 encoding. | |
| Eliminates the cv2.VideoWriter + post-transcode round-trip. | |
| Falls back to VideoWriter if ffmpeg is unavailable. | |
| """ | |
| def __init__(self, output_path: str, fps: float, width: int, height: int): | |
| self.output_path = output_path | |
| self._fallback = None | |
| if not _ffmpeg_available(): | |
| logging.warning("ffmpeg not found; StreamingVideoWriter falling back to VideoWriter.") | |
| self._fallback = VideoWriter(output_path, fps, width, height) | |
| return | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-f", "rawvideo", | |
| "-pix_fmt", "bgr24", | |
| "-s", f"{width}x{height}", | |
| "-r", str(fps), | |
| "-i", "pipe:", | |
| "-c:v", "libx264", | |
| "-preset", "veryfast", | |
| "-pix_fmt", "yuv420p", | |
| "-movflags", "+faststart", | |
| output_path, | |
| ] | |
| try: | |
| self.proc = subprocess.Popen( | |
| cmd, | |
| stdin=subprocess.PIPE, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| ) | |
| except OSError as e: | |
| logging.warning("Failed to start ffmpeg (%s); falling back to VideoWriter.", e) | |
| self._fallback = VideoWriter(output_path, fps, width, height) | |
| def write(self, frame: np.ndarray): | |
| if self._fallback is not None: | |
| self._fallback.write(frame) | |
| return | |
| try: | |
| self.proc.stdin.write(frame.tobytes()) | |
| except BrokenPipeError: | |
| logging.error("ffmpeg pipe broken; frames may be lost.") | |
| def close(self): | |
| if self._fallback is not None: | |
| self._fallback.close() | |
| return | |
| try: | |
| self.proc.stdin.close() | |
| except OSError: | |
| pass | |
| self.proc.wait() | |
| if self.proc.returncode != 0: | |
| stderr = self.proc.stderr.read().decode("utf-8", errors="ignore") | |
| logging.error("StreamingVideoWriter ffmpeg exited with code %d: %s", | |
| self.proc.returncode, stderr) | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.close() | |