Spaces:
Sleeping
Sleeping
| """ | |
| FFmpeg-based frame reader for accurate VFR (Variable Frame Rate) video handling. | |
| This module provides frame extraction using FFmpeg's accurate timestamp seeking, | |
| which correctly handles VFR videos where OpenCV's seeking fails. | |
| Key advantages over OpenCV seeking: | |
| - Accurate timestamp handling for VFR videos | |
| - ~36x faster than OpenCV's CAP_PROP_POS_FRAMES seeking | |
| - Frames are returned in correct chronological order | |
| """ | |
| import logging | |
| import subprocess | |
| from typing import Any, Callable, Generator, Optional, Tuple | |
| import cv2 | |
| import numpy as np | |
| logger = logging.getLogger(__name__) | |
| def get_video_dimensions(video_path: str) -> Tuple[int, int]: | |
| """ | |
| Get video dimensions (width, height) using OpenCV. | |
| Args: | |
| video_path: Path to video file. | |
| Returns: | |
| Tuple of (width, height). | |
| Raises: | |
| ValueError: If video cannot be opened. | |
| """ | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise ValueError(f"Could not open video: {video_path}") | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| cap.release() | |
| return width, height | |
| def extract_frames_ffmpeg_pipe( | |
| video_path: str, | |
| start_time: float, | |
| end_time: float, | |
| frame_interval: float, | |
| callback: Callable[[float, np.ndarray[Any, Any]], bool], | |
| ) -> Tuple[int, float]: | |
| """ | |
| Extract frames using FFmpeg pipe for accurate VFR handling. | |
| FFmpeg seeks accurately to the start position and outputs frames at the | |
| specified interval. Frames are piped directly to Python as raw BGR data, | |
| avoiding temp files and providing accurate timestamps. | |
| Args: | |
| video_path: Path to video file. | |
| start_time: Start time in seconds. | |
| end_time: End time in seconds. | |
| frame_interval: Interval between frames in seconds (e.g., 0.5 for 2 fps). | |
| callback: Function called for each frame. | |
| Signature: callback(timestamp: float, frame: np.ndarray) -> bool | |
| Return False to stop processing early. | |
| Returns: | |
| Tuple of (frames_processed, io_time). | |
| """ | |
| import time | |
| # Get video dimensions | |
| width, height = get_video_dimensions(video_path) | |
| frame_size = width * height * 3 # BGR format | |
| # Calculate output fps from interval | |
| output_fps = 1.0 / frame_interval | |
| duration = end_time - start_time | |
| t_io_start = time.perf_counter() | |
| # Build ffmpeg command | |
| # -ss before -i enables fast seeking to keyframe, then accurate frame output | |
| cmd = [ | |
| "ffmpeg", | |
| "-ss", | |
| str(start_time), | |
| "-i", | |
| str(video_path), | |
| "-t", | |
| str(duration), | |
| "-vf", | |
| f"fps={output_fps}", # Output at specified fps | |
| "-f", | |
| "rawvideo", | |
| "-pix_fmt", | |
| "bgr24", # OpenCV uses BGR format | |
| "-loglevel", | |
| "error", | |
| "-", # Output to stdout | |
| ] | |
| # Start ffmpeg process | |
| process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| frames_processed = 0 | |
| current_time = start_time | |
| try: | |
| while True: | |
| # Read raw frame data from stdout | |
| raw_frame = process.stdout.read(frame_size) | |
| # Check for end of stream | |
| if len(raw_frame) != frame_size: | |
| break | |
| # Convert to numpy array (BGR format, same as OpenCV) | |
| frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape((height, width, 3)) | |
| # Call the callback with timestamp and frame | |
| # Make a copy to ensure the frame data is not overwritten | |
| continue_processing = callback(current_time, frame.copy()) | |
| frames_processed += 1 | |
| if not continue_processing: | |
| break | |
| current_time += frame_interval | |
| finally: | |
| # Clean up process | |
| process.stdout.close() | |
| process.stderr.close() | |
| process.terminate() | |
| process.wait() | |
| io_time = time.perf_counter() - t_io_start | |
| return frames_processed, io_time | |
| def iter_frames_ffmpeg( | |
| video_path: str, | |
| start_time: float, | |
| end_time: float, | |
| frame_interval: float, | |
| ) -> Generator[Tuple[float, np.ndarray[Any, Any]], None, None]: | |
| """ | |
| Generator that yields frames using FFmpeg pipe. | |
| This is an alternative interface for iterating over frames without a callback. | |
| Args: | |
| video_path: Path to video file. | |
| start_time: Start time in seconds. | |
| end_time: End time in seconds. | |
| frame_interval: Interval between frames in seconds. | |
| Yields: | |
| Tuple of (timestamp, frame) for each frame. | |
| """ | |
| import time | |
| # Get video dimensions | |
| width, height = get_video_dimensions(video_path) | |
| frame_size = width * height * 3 | |
| # Calculate output fps from interval | |
| output_fps = 1.0 / frame_interval | |
| duration = end_time - start_time | |
| # Build ffmpeg command | |
| cmd = [ | |
| "ffmpeg", | |
| "-ss", | |
| str(start_time), | |
| "-i", | |
| str(video_path), | |
| "-t", | |
| str(duration), | |
| "-vf", | |
| f"fps={output_fps}", | |
| "-f", | |
| "rawvideo", | |
| "-pix_fmt", | |
| "bgr24", | |
| "-loglevel", | |
| "error", | |
| "-", | |
| ] | |
| process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| current_time = start_time | |
| try: | |
| while True: | |
| raw_frame = process.stdout.read(frame_size) | |
| if len(raw_frame) != frame_size: | |
| break | |
| frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape((height, width, 3)) | |
| yield current_time, frame.copy() | |
| current_time += frame_interval | |
| finally: | |
| process.stdout.close() | |
| process.stderr.close() | |
| process.terminate() | |
| process.wait() | |
| class FFmpegFrameReader: | |
| """ | |
| Context manager for reading frames from video using FFmpeg pipe. | |
| This class provides a cleaner interface for reading frames in a processing loop, | |
| handling resource cleanup automatically. | |
| Example: | |
| with FFmpegFrameReader(video_path, start, end, interval) as reader: | |
| for timestamp, frame in reader: | |
| process_frame(timestamp, frame) | |
| """ | |
| def __init__(self, video_path: str, start_time: float, end_time: float, frame_interval: float): | |
| """ | |
| Initialize the FFmpeg frame reader. | |
| Args: | |
| video_path: Path to video file. | |
| start_time: Start time in seconds. | |
| end_time: End time in seconds. | |
| frame_interval: Interval between frames in seconds. | |
| """ | |
| self.video_path = video_path | |
| self.start_time = start_time | |
| self.end_time = end_time | |
| self.frame_interval = frame_interval | |
| self.process: Optional[subprocess.Popen[bytes]] = None | |
| self.width = 0 | |
| self.height = 0 | |
| self.frame_size = 0 | |
| self.current_time = start_time | |
| self.frames_read = 0 | |
| self.io_time = 0.0 | |
| def __enter__(self) -> "FFmpegFrameReader": | |
| """Start the FFmpeg process.""" | |
| import time | |
| # Get video dimensions | |
| self.width, self.height = get_video_dimensions(self.video_path) | |
| self.frame_size = self.width * self.height * 3 | |
| # Calculate parameters | |
| output_fps = 1.0 / self.frame_interval | |
| duration = self.end_time - self.start_time | |
| # Build and start ffmpeg command | |
| cmd = [ | |
| "ffmpeg", | |
| "-ss", | |
| str(self.start_time), | |
| "-i", | |
| str(self.video_path), | |
| "-t", | |
| str(duration), | |
| "-vf", | |
| f"fps={output_fps}", | |
| "-f", | |
| "rawvideo", | |
| "-pix_fmt", | |
| "bgr24", | |
| "-loglevel", | |
| "error", | |
| "-", | |
| ] | |
| t_start = time.perf_counter() | |
| self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| self.io_time = time.perf_counter() - t_start | |
| self.current_time = self.start_time | |
| self.frames_read = 0 | |
| return self | |
| def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: | |
| """Clean up the FFmpeg process.""" | |
| if self.process: | |
| self.process.stdout.close() | |
| self.process.stderr.close() | |
| self.process.terminate() | |
| self.process.wait() | |
| def __iter__(self) -> "FFmpegFrameReader": | |
| """Return self as iterator.""" | |
| return self | |
| def __next__(self) -> Tuple[float, np.ndarray[Any, Any]]: | |
| """Read and return the next frame.""" | |
| import time | |
| if self.process is None: | |
| raise StopIteration | |
| t_start = time.perf_counter() | |
| raw_frame = self.process.stdout.read(self.frame_size) | |
| self.io_time += time.perf_counter() - t_start | |
| if len(raw_frame) != self.frame_size: | |
| raise StopIteration | |
| frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape((self.height, self.width, 3)) | |
| timestamp = self.current_time | |
| self.current_time += self.frame_interval | |
| self.frames_read += 1 | |
| return timestamp, frame.copy() | |
| def get_stats(self) -> Tuple[int, float]: | |
| """ | |
| Get reading statistics. | |
| Returns: | |
| Tuple of (frames_read, io_time). | |
| """ | |
| return self.frames_read, self.io_time | |