Spaces:
Sleeping
Sleeping
| """ | |
| Threaded frame reader for video processing. | |
| Provides a background thread for reading video frames using a producer-consumer pattern. | |
| This overlaps video I/O with processing for better performance. | |
| """ | |
| import logging | |
| import queue | |
| import threading | |
| import time | |
| from typing import Any, Optional, Tuple | |
| import cv2 | |
| import numpy as np | |
| logger = logging.getLogger(__name__) | |
| class ThreadedFrameReader: | |
| """ | |
| Background thread for reading video frames. | |
| Uses a producer-consumer pattern to overlap video I/O with processing. | |
| The reader thread reads frames ahead into a queue while the main thread | |
| processes frames from the queue. | |
| This provides significant speedup by hiding video decode latency. | |
| """ | |
| def __init__(self, cap: cv2.VideoCapture, start_frame: int, end_frame: int, frame_skip: int, queue_size: int = 32): | |
| """ | |
| Initialize the threaded frame reader. | |
| Args: | |
| cap: OpenCV VideoCapture object | |
| start_frame: First frame to read | |
| end_frame: Last frame to read | |
| frame_skip: Number of frames to skip between reads | |
| queue_size: Maximum frames to buffer (default 32) | |
| """ | |
| self.cap = cap | |
| self.start_frame = start_frame | |
| self.end_frame = end_frame | |
| self.frame_skip = frame_skip | |
| self.queue_size = queue_size | |
| # Frame queue: (frame_number, frame_data) or None as end-of-stream sentinel | |
| self.frame_queue: queue.Queue[Tuple[int, np.ndarray[Any, Any] | None] | None] = queue.Queue(maxsize=queue_size) | |
| # Control flags | |
| self.stop_flag = threading.Event() | |
| self.reader_thread: Optional[threading.Thread] = None | |
| # Timing stats | |
| self.io_time = 0.0 | |
| self.frames_read = 0 | |
| def start(self) -> None: | |
| """Start the background reader thread.""" | |
| self.stop_flag.clear() | |
| self.reader_thread = threading.Thread(target=self._reader_loop, daemon=True) | |
| self.reader_thread.start() | |
| logger.debug("Threaded frame reader started") | |
| def stop(self) -> None: | |
| """Stop the background reader thread.""" | |
| self.stop_flag.set() | |
| if self.reader_thread and self.reader_thread.is_alive(): | |
| # Drain the queue to unblock the reader thread | |
| try: | |
| while True: | |
| self.frame_queue.get_nowait() | |
| except queue.Empty: | |
| pass | |
| self.reader_thread.join(timeout=2.0) | |
| logger.debug("Threaded frame reader stopped (read %d frames, %.2fs I/O)", self.frames_read, self.io_time) | |
| def get_frame(self, timeout: float = 5.0) -> Optional[Tuple[int, Optional[np.ndarray[Any, Any]]]]: | |
| """ | |
| Get the next frame from the queue. | |
| Args: | |
| timeout: Maximum time to wait for a frame | |
| Returns: | |
| Tuple of (frame_number, frame_data) or None if queue is empty and reader is done | |
| """ | |
| try: | |
| return self.frame_queue.get(timeout=timeout) | |
| except queue.Empty: | |
| return None | |
| def _reader_loop(self) -> None: | |
| """Background thread that reads frames into the queue.""" | |
| # Seek to start position | |
| t_start = time.perf_counter() | |
| self.cap.set(cv2.CAP_PROP_POS_FRAMES, self.start_frame) | |
| self.io_time += time.perf_counter() - t_start | |
| current_frame = self.start_frame | |
| while current_frame < self.end_frame and not self.stop_flag.is_set(): | |
| # Read frame | |
| t_start = time.perf_counter() | |
| ret, frame = self.cap.read() | |
| self.io_time += time.perf_counter() - t_start | |
| if ret: | |
| self.frames_read += 1 | |
| # Put frame in queue (blocks if queue is full) | |
| try: | |
| self.frame_queue.put((current_frame, frame), timeout=5.0) | |
| except queue.Full: | |
| if self.stop_flag.is_set(): | |
| break | |
| logger.warning("Frame queue full, dropping frame %d", current_frame) | |
| else: | |
| # Signal read failure | |
| try: | |
| self.frame_queue.put((current_frame, None), timeout=1.0) | |
| except queue.Full: | |
| pass | |
| # Skip frames | |
| t_start = time.perf_counter() | |
| for _ in range(self.frame_skip - 1): | |
| if self.stop_flag.is_set(): | |
| break | |
| self.cap.grab() | |
| self.io_time += time.perf_counter() - t_start | |
| current_frame += self.frame_skip | |
| # Signal end of stream | |
| try: | |
| self.frame_queue.put(None, timeout=1.0) | |
| except queue.Full: | |
| pass | |