""" Threaded frame reader for video processing. Provides a background thread for reading video frames using a producer-consumer pattern. This overlaps video I/O with processing for better performance. """ import logging import queue import threading import time from typing import Any, Optional, Tuple import cv2 import numpy as np logger = logging.getLogger(__name__) class ThreadedFrameReader: """ Background thread for reading video frames. Uses a producer-consumer pattern to overlap video I/O with processing. The reader thread reads frames ahead into a queue while the main thread processes frames from the queue. This provides significant speedup by hiding video decode latency. """ def __init__(self, cap: cv2.VideoCapture, start_frame: int, end_frame: int, frame_skip: int, queue_size: int = 32): """ Initialize the threaded frame reader. Args: cap: OpenCV VideoCapture object start_frame: First frame to read end_frame: Last frame to read frame_skip: Number of frames to skip between reads queue_size: Maximum frames to buffer (default 32) """ self.cap = cap self.start_frame = start_frame self.end_frame = end_frame self.frame_skip = frame_skip self.queue_size = queue_size # Frame queue: (frame_number, frame_data) or None as end-of-stream sentinel self.frame_queue: queue.Queue[Tuple[int, np.ndarray[Any, Any] | None] | None] = queue.Queue(maxsize=queue_size) # Control flags self.stop_flag = threading.Event() self.reader_thread: Optional[threading.Thread] = None # Timing stats self.io_time = 0.0 self.frames_read = 0 def start(self) -> None: """Start the background reader thread.""" self.stop_flag.clear() self.reader_thread = threading.Thread(target=self._reader_loop, daemon=True) self.reader_thread.start() logger.debug("Threaded frame reader started") def stop(self) -> None: """Stop the background reader thread.""" self.stop_flag.set() if self.reader_thread and self.reader_thread.is_alive(): # Drain the queue to unblock the reader thread try: while True: self.frame_queue.get_nowait() except queue.Empty: pass self.reader_thread.join(timeout=2.0) logger.debug("Threaded frame reader stopped (read %d frames, %.2fs I/O)", self.frames_read, self.io_time) def get_frame(self, timeout: float = 5.0) -> Optional[Tuple[int, Optional[np.ndarray[Any, Any]]]]: """ Get the next frame from the queue. Args: timeout: Maximum time to wait for a frame Returns: Tuple of (frame_number, frame_data) or None if queue is empty and reader is done """ try: return self.frame_queue.get(timeout=timeout) except queue.Empty: return None def _reader_loop(self) -> None: """Background thread that reads frames into the queue.""" # Seek to start position t_start = time.perf_counter() self.cap.set(cv2.CAP_PROP_POS_FRAMES, self.start_frame) self.io_time += time.perf_counter() - t_start current_frame = self.start_frame while current_frame < self.end_frame and not self.stop_flag.is_set(): # Read frame t_start = time.perf_counter() ret, frame = self.cap.read() self.io_time += time.perf_counter() - t_start if ret: self.frames_read += 1 # Put frame in queue (blocks if queue is full) try: self.frame_queue.put((current_frame, frame), timeout=5.0) except queue.Full: if self.stop_flag.is_set(): break logger.warning("Frame queue full, dropping frame %d", current_frame) else: # Signal read failure try: self.frame_queue.put((current_frame, None), timeout=1.0) except queue.Full: pass # Skip frames t_start = time.perf_counter() for _ in range(self.frame_skip - 1): if self.stop_flag.is_set(): break self.cap.grab() self.io_time += time.perf_counter() - t_start current_frame += self.frame_skip # Signal end of stream try: self.frame_queue.put(None, timeout=1.0) except queue.Full: pass