Spaces:
Sleeping
Sleeping
| import logging | |
| import os | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| from typing import List, Tuple | |
| import cv2 | |
| import numpy as np | |
| def extract_frames(video_path: str) -> Tuple[List[np.ndarray], float, int, int]: | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise ValueError("Unable to open video.") | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 0.0 | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| frames: List[np.ndarray] = [] | |
| success, frame = cap.read() | |
| while success: | |
| frames.append(frame) | |
| success, frame = cap.read() | |
| cap.release() | |
| if not frames: | |
| raise ValueError("Video decode produced zero frames.") | |
| return frames, fps, width, height | |
| def _transcode_with_ffmpeg(src_path: str, dst_path: str) -> None: | |
| cmd = [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", | |
| src_path, | |
| "-c:v", | |
| "libx264", | |
| "-preset", | |
| "veryfast", | |
| "-pix_fmt", | |
| "yuv420p", | |
| "-movflags", | |
| "+faststart", | |
| dst_path, | |
| ] | |
| process = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False) | |
| if process.returncode != 0: | |
| err_msg = process.stderr.decode("utf-8", errors="ignore") | |
| logging.error("ffmpeg failed with code %d: %s", process.returncode, err_msg) | |
| raise RuntimeError(err_msg) | |
| else: | |
| logging.info("ffmpeg success") | |
| def write_video(frames: List[np.ndarray], output_path: str, fps: float, width: int, height: int) -> None: | |
| if not frames: | |
| raise ValueError("No frames available for writing.") | |
| temp_fd, temp_path = tempfile.mkstemp(prefix="raw_", suffix=".mp4") | |
| os.close(temp_fd) | |
| writer = cv2.VideoWriter(temp_path, cv2.VideoWriter_fourcc(*"mp4v"), fps or 1.0, (width, height)) | |
| if not writer.isOpened(): | |
| os.remove(temp_path) | |
| raise ValueError("Failed to open VideoWriter.") | |
| for frame in frames: | |
| writer.write(frame) | |
| writer.release() | |
| try: | |
| _transcode_with_ffmpeg(temp_path, output_path) | |
| logging.debug("Transcoded video to H.264 for browser compatibility.") | |
| os.remove(temp_path) | |
| except FileNotFoundError: | |
| logging.warning("ffmpeg not found; serving fallback MP4V output.") | |
| shutil.move(temp_path, output_path) | |
| except RuntimeError as exc: | |
| logging.warning("ffmpeg transcode failed (%s); serving fallback MP4V output.", exc) | |
| shutil.move(temp_path, output_path) | |
| class VideoReader: | |
| def __init__(self, video_path: str): | |
| self.video_path = video_path | |
| self.cap = cv2.VideoCapture(video_path) | |
| if not self.cap.isOpened(): | |
| raise ValueError("Unable to open video.") | |
| self.fps = self.cap.get(cv2.CAP_PROP_FPS) or 30.0 | |
| self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| def __iter__(self): | |
| return self | |
| def __next__(self) -> np.ndarray: | |
| if not self.cap.isOpened(): | |
| raise StopIteration | |
| success, frame = self.cap.read() | |
| if not success: | |
| self.cap.release() | |
| raise StopIteration | |
| return frame | |
| def close(self): | |
| if self.cap.isOpened(): | |
| self.cap.release() | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.close() | |
| class AsyncVideoReader: | |
| """ | |
| Async video reader that decodes frames in a background thread. | |
| This prevents GPU starvation on multi-GPU systems by prefetching frames | |
| while the main thread is busy dispatching work to GPUs. | |
| """ | |
| def __init__(self, video_path: str, prefetch_size: int = 32): | |
| """ | |
| Initialize async video reader. | |
| Args: | |
| video_path: Path to video file | |
| prefetch_size: Number of frames to prefetch (default 32) | |
| """ | |
| from queue import Queue | |
| from threading import Thread | |
| self.video_path = video_path | |
| self.prefetch_size = prefetch_size | |
| # Open video to get metadata | |
| self._cap = cv2.VideoCapture(video_path) | |
| if not self._cap.isOpened(): | |
| raise ValueError(f"Unable to open video: {video_path}") | |
| self.fps = self._cap.get(cv2.CAP_PROP_FPS) or 30.0 | |
| self.width = int(self._cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| self.height = int(self._cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| self.total_frames = int(self._cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # Prefetch queue | |
| self._queue: Queue = Queue(maxsize=prefetch_size) | |
| self._error: Exception = None | |
| self._finished = False | |
| # Start decoder thread | |
| self._thread = Thread(target=self._decode_loop, daemon=True) | |
| self._thread.start() | |
| def _decode_loop(self): | |
| """Background thread that continuously decodes frames.""" | |
| try: | |
| while True: | |
| success, frame = self._cap.read() | |
| if not success: | |
| break | |
| self._queue.put(frame) # Blocks when queue is full (backpressure) | |
| except Exception as e: | |
| self._error = e | |
| logging.error(f"AsyncVideoReader decode error: {e}") | |
| finally: | |
| self._cap.release() | |
| self._queue.put(None) # Sentinel to signal end | |
| self._finished = True | |
| def __iter__(self): | |
| return self | |
| def __next__(self) -> np.ndarray: | |
| if self._error: | |
| raise self._error | |
| frame = self._queue.get() | |
| if frame is None: | |
| raise StopIteration | |
| return frame | |
| def close(self): | |
| """Stop the decoder thread and release resources.""" | |
| # Signal thread to stop by releasing cap (if not already done) | |
| if self._cap.isOpened(): | |
| self._cap.release() | |
| # Drain queue to unblock thread if it's waiting on put() | |
| while not self._queue.empty(): | |
| try: | |
| self._queue.get_nowait() | |
| except: | |
| break | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.close() | |
| class VideoWriter: | |
| def __init__(self, output_path: str, fps: float, width: int, height: int): | |
| self.output_path = output_path | |
| self.fps = fps | |
| self.width = width | |
| self.height = height | |
| self.temp_fd, self.temp_path = tempfile.mkstemp(prefix="raw_", suffix=".mp4") | |
| os.close(self.temp_fd) | |
| # Use mp4v for speed during writing, then transcode | |
| self.writer = cv2.VideoWriter(self.temp_path, cv2.VideoWriter_fourcc(*"mp4v"), self.fps, (self.width, self.height)) | |
| if not self.writer.isOpened(): | |
| os.remove(self.temp_path) | |
| raise ValueError("Failed to open VideoWriter.") | |
| def write(self, frame: np.ndarray): | |
| self.writer.write(frame) | |
| def close(self): | |
| if self.writer.isOpened(): | |
| self.writer.release() | |
| # Transcode phase | |
| try: | |
| _transcode_with_ffmpeg(self.temp_path, self.output_path) | |
| logging.debug("Transcoded video to H.264 for browser compatibility.") | |
| os.remove(self.temp_path) | |
| except FileNotFoundError: | |
| logging.warning("ffmpeg not found; serving fallback MP4V output.") | |
| shutil.move(self.temp_path, self.output_path) | |
| except RuntimeError as exc: | |
| logging.warning("ffmpeg transcode failed (%s); serving fallback MP4V output.", exc) | |
| shutil.move(self.temp_path, self.output_path) | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.close() | |