File size: 2,948 Bytes
7335d00
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89c35c3
7335d00
 
 
 
89c35c3
7335d00
 
 
 
 
 
 
 
 
 
 
 
89c35c3
 
7335d00
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89c35c3
7335d00
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""
Video Intelligence Platform — Frame Extraction
Extracts frames from video at configurable FPS using OpenCV.
"""
import cv2
import numpy as np
from pathlib import Path
from PIL import Image
from typing import List, Tuple
from dataclasses import dataclass


@dataclass
class ExtractedFrame:
    """A single extracted frame with its timestamp."""
    image: Image.Image
    timestamp_sec: float
    frame_idx: int


def extract_frames(
    video_path: str,
    fps: float = 1.0,
    max_frames: int = 3600,
) -> List[ExtractedFrame]:
    """
    Extract frames from video at given FPS.

    Args:
        video_path: Path to video file
        fps: Frames per second to extract (default 1.0)
        max_frames: Maximum number of frames to extract

    Returns:
        List of ExtractedFrame objects with PIL images and timestamps
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError(f"Cannot open video: {video_path}")

    video_fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    duration_sec = total_frames / video_fps if video_fps > 0 else 0

    # Calculate which frames to extract
    frame_interval = max(1, int(video_fps / fps)) if fps < video_fps else 1

    print(f"📹 Video: {video_path}")
    print(f"   Duration: {duration_sec:.1f}s | FPS: {video_fps:.1f} | Total frames: {total_frames}")
    print(f"   Extracting every {frame_interval}th frame ({fps} fps) ...")

    frames: List[ExtractedFrame] = []
    frame_count = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if frame_count % frame_interval == 0:
            # Convert BGR (OpenCV) to RGB (PIL)
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            pil_image = Image.fromarray(rgb_frame)
            timestamp = frame_count / video_fps if video_fps > 0 else 0

            frames.append(ExtractedFrame(
                image=pil_image,
                timestamp_sec=timestamp,
                frame_idx=len(frames),
            ))

            if len(frames) >= max_frames:
                print(f"   ⚠️ Reached max_frames limit ({max_frames})")
                break

        frame_count += 1

    cap.release()
    print(f"   ✅ Extracted {len(frames)} frames")
    return frames


def get_video_info(video_path: str) -> dict:
    """Get video metadata without extracting frames."""
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError(f"Cannot open video: {video_path}")

    info = {
        "path": video_path,
        "fps": cap.get(cv2.CAP_PROP_FPS),
        "total_frames": int(cap.get(cv2.CAP_PROP_FRAME_COUNT)),
        "width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
        "height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
    }
    info["duration_sec"] = info["total_frames"] / info["fps"] if info["fps"] > 0 else 0
    cap.release()
    return info