File size: 3,347 Bytes
157bd4f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9aa33d8
157bd4f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9aa33d8
157bd4f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a2f4d4e
9aa33d8
 
 
 
 
157bd4f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9aa33d8
 
157bd4f
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""Frame extraction and cropping from input videos.

All operations use on-demand cv2.VideoCapture seeking — no frames are
pre-extracted or stored in memory.
"""

import logging
from typing import List, Optional, Tuple

import cv2
import numpy as np

logger = logging.getLogger(__name__)


def extract_frame(video_path: str, frame_idx: int) -> np.ndarray:
    """Extract a single frame from a video by index.

    Args:
        video_path: Path to the video file.
        frame_idx: Zero-based frame index.

    Returns:
        HxWx3 BGR uint8 numpy array.

    Raises:
        ValueError: If frame_idx is out of range or video cannot be opened.
        FileNotFoundError: If video_path does not exist.
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError(f"Cannot open video file: {video_path}")

    try:
        total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        if frame_idx < 0 or frame_idx >= total:
            raise ValueError(
                f"Frame index {frame_idx} out of range [0, {total})"
            )
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
        success, frame = cap.read()
        if not success or frame is None:
            raise ValueError(f"Failed to read frame {frame_idx}")
        return frame
    finally:
        cap.release()


def get_video_info(video_path: str) -> dict:
    """Return video metadata (total_frames, fps, width, height)."""
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError(f"Cannot open video file: {video_path}")
    try:
        return {
            "total_frames": int(cap.get(cv2.CAP_PROP_FRAME_COUNT)),
            "fps": cap.get(cv2.CAP_PROP_FPS) or 30.0,
            "width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
            "height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
        }
    finally:
        cap.release()


def crop_frame(
    frame: np.ndarray,
    bbox: List[int],
    padding: float = 0.15,
) -> np.ndarray:
    """Crop a frame to a bounding box with optional padding.

    Args:
        frame: HxWx3 BGR numpy array.
        bbox: [x1, y1, x2, y2] in pixel coordinates.
        padding: Fractional padding around the bbox (0.15 = 15% each side).

    Returns:
        Cropped HxWx3 BGR numpy array.
    """
    x1, y1, x2, y2 = int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])
    if x2 <= x1 or y2 <= y1:
        raise ValueError(
            f"Invalid bbox: [{x1}, {y1}, {x2}, {y2}] — must have x2 > x1 and y2 > y1"
        )
    h, w = frame.shape[:2]

    bw = x2 - x1
    bh = y2 - y1
    pad_x = int(bw * padding)
    pad_y = int(bh * padding)

    cx1 = max(0, x1 - pad_x)
    cy1 = max(0, y1 - pad_y)
    cx2 = min(w, x2 + pad_x)
    cy2 = min(h, y2 + pad_y)

    return frame[cy1:cy2, cx1:cx2].copy()


def frame_to_jpeg(frame: np.ndarray, quality: int = 90) -> bytes:
    """Encode a BGR frame as JPEG bytes.

    Args:
        frame: HxWx3 BGR numpy array.
        quality: JPEG quality (1-100).

    Returns:
        JPEG bytes.
    """
    if frame.dtype != np.uint8:
        frame = frame.astype(np.uint8)
    encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), quality]
    success, buffer = cv2.imencode(".jpg", frame, encode_param)
    if not success:
        raise RuntimeError("Failed to encode frame as JPEG")
    return buffer.tobytes()