"""Frame extraction and cropping from input videos. All operations use on-demand cv2.VideoCapture seeking — no frames are pre-extracted or stored in memory. """ import logging from typing import List, Optional, Tuple import cv2 import numpy as np logger = logging.getLogger(__name__) def extract_frame(video_path: str, frame_idx: int) -> np.ndarray: """Extract a single frame from a video by index. Args: video_path: Path to the video file. frame_idx: Zero-based frame index. Returns: HxWx3 BGR uint8 numpy array. Raises: ValueError: If frame_idx is out of range or video cannot be opened. FileNotFoundError: If video_path does not exist. """ cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError(f"Cannot open video file: {video_path}") try: total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if frame_idx < 0 or frame_idx >= total: raise ValueError( f"Frame index {frame_idx} out of range [0, {total})" ) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) success, frame = cap.read() if not success or frame is None: raise ValueError(f"Failed to read frame {frame_idx}") return frame finally: cap.release() def get_video_info(video_path: str) -> dict: """Return video metadata (total_frames, fps, width, height).""" cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError(f"Cannot open video file: {video_path}") try: return { "total_frames": int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), "fps": cap.get(cv2.CAP_PROP_FPS) or 30.0, "width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), "height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), } finally: cap.release() def crop_frame( frame: np.ndarray, bbox: List[int], padding: float = 0.15, ) -> np.ndarray: """Crop a frame to a bounding box with optional padding. Args: frame: HxWx3 BGR numpy array. bbox: [x1, y1, x2, y2] in pixel coordinates. padding: Fractional padding around the bbox (0.15 = 15% each side). Returns: Cropped HxWx3 BGR numpy array. """ x1, y1, x2, y2 = int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3]) if x2 <= x1 or y2 <= y1: raise ValueError( f"Invalid bbox: [{x1}, {y1}, {x2}, {y2}] — must have x2 > x1 and y2 > y1" ) h, w = frame.shape[:2] bw = x2 - x1 bh = y2 - y1 pad_x = int(bw * padding) pad_y = int(bh * padding) cx1 = max(0, x1 - pad_x) cy1 = max(0, y1 - pad_y) cx2 = min(w, x2 + pad_x) cy2 = min(h, y2 + pad_y) return frame[cy1:cy2, cx1:cx2].copy() def frame_to_jpeg(frame: np.ndarray, quality: int = 90) -> bytes: """Encode a BGR frame as JPEG bytes. Args: frame: HxWx3 BGR numpy array. quality: JPEG quality (1-100). Returns: JPEG bytes. """ if frame.dtype != np.uint8: frame = frame.astype(np.uint8) encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), quality] success, buffer = cv2.imencode(".jpg", frame, encode_param) if not success: raise RuntimeError("Failed to encode frame as JPEG") return buffer.tobytes()