cfb40 / src /video /frame_reader.py
andytaylor-smg's picture
perfect mypy
719b8f7
"""
Threaded frame reader for video processing.
Provides a background thread for reading video frames using a producer-consumer pattern.
This overlaps video I/O with processing for better performance.
"""
import logging
import queue
import threading
import time
from typing import Any, Optional, Tuple
import cv2
import numpy as np
logger = logging.getLogger(__name__)
class ThreadedFrameReader:
"""
Background thread for reading video frames.
Uses a producer-consumer pattern to overlap video I/O with processing.
The reader thread reads frames ahead into a queue while the main thread
processes frames from the queue.
This provides significant speedup by hiding video decode latency.
"""
def __init__(self, cap: cv2.VideoCapture, start_frame: int, end_frame: int, frame_skip: int, queue_size: int = 32):
"""
Initialize the threaded frame reader.
Args:
cap: OpenCV VideoCapture object
start_frame: First frame to read
end_frame: Last frame to read
frame_skip: Number of frames to skip between reads
queue_size: Maximum frames to buffer (default 32)
"""
self.cap = cap
self.start_frame = start_frame
self.end_frame = end_frame
self.frame_skip = frame_skip
self.queue_size = queue_size
# Frame queue: (frame_number, frame_data) or None as end-of-stream sentinel
self.frame_queue: queue.Queue[Tuple[int, np.ndarray[Any, Any] | None] | None] = queue.Queue(maxsize=queue_size)
# Control flags
self.stop_flag = threading.Event()
self.reader_thread: Optional[threading.Thread] = None
# Timing stats
self.io_time = 0.0
self.frames_read = 0
def start(self) -> None:
"""Start the background reader thread."""
self.stop_flag.clear()
self.reader_thread = threading.Thread(target=self._reader_loop, daemon=True)
self.reader_thread.start()
logger.debug("Threaded frame reader started")
def stop(self) -> None:
"""Stop the background reader thread."""
self.stop_flag.set()
if self.reader_thread and self.reader_thread.is_alive():
# Drain the queue to unblock the reader thread
try:
while True:
self.frame_queue.get_nowait()
except queue.Empty:
pass
self.reader_thread.join(timeout=2.0)
logger.debug("Threaded frame reader stopped (read %d frames, %.2fs I/O)", self.frames_read, self.io_time)
def get_frame(self, timeout: float = 5.0) -> Optional[Tuple[int, Optional[np.ndarray[Any, Any]]]]:
"""
Get the next frame from the queue.
Args:
timeout: Maximum time to wait for a frame
Returns:
Tuple of (frame_number, frame_data) or None if queue is empty and reader is done
"""
try:
return self.frame_queue.get(timeout=timeout)
except queue.Empty:
return None
def _reader_loop(self) -> None:
"""Background thread that reads frames into the queue."""
# Seek to start position
t_start = time.perf_counter()
self.cap.set(cv2.CAP_PROP_POS_FRAMES, self.start_frame)
self.io_time += time.perf_counter() - t_start
current_frame = self.start_frame
while current_frame < self.end_frame and not self.stop_flag.is_set():
# Read frame
t_start = time.perf_counter()
ret, frame = self.cap.read()
self.io_time += time.perf_counter() - t_start
if ret:
self.frames_read += 1
# Put frame in queue (blocks if queue is full)
try:
self.frame_queue.put((current_frame, frame), timeout=5.0)
except queue.Full:
if self.stop_flag.is_set():
break
logger.warning("Frame queue full, dropping frame %d", current_frame)
else:
# Signal read failure
try:
self.frame_queue.put((current_frame, None), timeout=1.0)
except queue.Full:
pass
# Skip frames
t_start = time.perf_counter()
for _ in range(self.frame_skip - 1):
if self.stop_flag.is_set():
break
self.cap.grab()
self.io_time += time.perf_counter() - t_start
current_frame += self.frame_skip
# Signal end of stream
try:
self.frame_queue.put(None, timeout=1.0)
except queue.Full:
pass