notRaphael commited on
Commit
7335d00
·
verified ·
1 Parent(s): 947c505

Add frame extractor

Browse files
video_intelligence/frame_extractor.py ADDED
@@ -0,0 +1,98 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Video Intelligence Platform — Frame Extraction
3
+ Extracts frames from video at configurable FPS using OpenCV.
4
+ """
5
+ import cv2
6
+ import numpy as np
7
+ from pathlib import Path
8
+ from PIL import Image
9
+ from typing import List, Tuple
10
+ from dataclasses import dataclass
11
+
12
+
13
+ @dataclass
14
+ class ExtractedFrame:
15
+ """A single extracted frame with its timestamp."""
16
+ image: Image.Image
17
+ timestamp_sec: float
18
+ frame_idx: int
19
+
20
+
21
+ def extract_frames(
22
+ video_path: str,
23
+ fps: float = 1.0,
24
+ max_frames: int = 3600,
25
+ ) -> List[ExtractedFrame]:
26
+ """
27
+ Extract frames from video at given FPS.
28
+
29
+ Args:
30
+ video_path: Path to video file
31
+ fps: Frames per second to extract (default 1.0)
32
+ max_frames: Maximum number of frames to extract
33
+
34
+ Returns:
35
+ List of ExtractedFrame objects with PIL images and timestamps
36
+ """
37
+ cap = cv2.VideoCapture(video_path)
38
+ if not cap.isOpened():
39
+ raise ValueError(f"Cannot open video: {video_path}")
40
+
41
+ video_fps = cap.get(cv2.CAP_PROP_FPS)
42
+ total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
43
+ duration_sec = total_frames / video_fps if video_fps > 0 else 0
44
+
45
+ # Calculate which frames to extract
46
+ frame_interval = int(video_fps / fps) if fps < video_fps else 1
47
+
48
+ print(f"📹 Video: {video_path}")
49
+ print(f" Duration: {duration_sec:.1f}s | FPS: {video_fps:.1f} | Total frames: {total_frames}")
50
+ print(f" Extracting every {frame_interval}th frame ({fps} fps) ...")
51
+
52
+ frames: List[ExtractedFrame] = []
53
+ frame_count = 0
54
+
55
+ while True:
56
+ ret, frame = cap.read()
57
+ if not ret:
58
+ break
59
+
60
+ if frame_count % frame_interval == 0:
61
+ # Convert BGR (OpenCV) to RGB (PIL)
62
+ rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
63
+ pil_image = Image.fromarray(rgb_frame)
64
+ timestamp = frame_count / video_fps
65
+
66
+ frames.append(ExtractedFrame(
67
+ image=pil_image,
68
+ timestamp_sec=timestamp,
69
+ frame_idx=len(frames),
70
+ ))
71
+
72
+ if len(frames) >= max_frames:
73
+ print(f" ⚠️ Reached max_frames limit ({max_frames})")
74
+ break
75
+
76
+ frame_count += 1
77
+
78
+ cap.release()
79
+ print(f" ✅ Extracted {len(frames)} frames")
80
+ return frames
81
+
82
+
83
+ def get_video_info(video_path: str) -> dict:
84
+ """Get video metadata without extracting frames."""
85
+ cap = cv2.VideoCapture(video_path)
86
+ if not cap.isOpened():
87
+ raise ValueError(f"Cannot open video: {video_path}")
88
+
89
+ info = {
90
+ "path": video_path,
91
+ "fps": cap.get(cv2.CAP_PROP_FPS),
92
+ "total_frames": int(cap.get(cv2.CAP_PROP_FRAME_COUNT)),
93
+ "width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
94
+ "height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
95
+ }
96
+ info["duration_sec"] = info["total_frames"] / info["fps"] if info["fps"] > 0 else 0
97
+ cap.release()
98
+ return info