| """ |
| Video Intelligence Platform — Frame Extraction |
| Extracts frames from video at configurable FPS using OpenCV. |
| """ |
| import cv2 |
| import numpy as np |
| from pathlib import Path |
| from PIL import Image |
| from typing import List, Tuple |
| from dataclasses import dataclass |
|
|
|
|
| @dataclass |
| class ExtractedFrame: |
| """A single extracted frame with its timestamp.""" |
| image: Image.Image |
| timestamp_sec: float |
| frame_idx: int |
|
|
|
|
| def extract_frames( |
| video_path: str, |
| fps: float = 1.0, |
| max_frames: int = 3600, |
| ) -> List[ExtractedFrame]: |
| """ |
| Extract frames from video at given FPS. |
| |
| Args: |
| video_path: Path to video file |
| fps: Frames per second to extract (default 1.0) |
| max_frames: Maximum number of frames to extract |
| |
| Returns: |
| List of ExtractedFrame objects with PIL images and timestamps |
| """ |
| cap = cv2.VideoCapture(video_path) |
| if not cap.isOpened(): |
| raise ValueError(f"Cannot open video: {video_path}") |
|
|
| video_fps = cap.get(cv2.CAP_PROP_FPS) |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| duration_sec = total_frames / video_fps if video_fps > 0 else 0 |
|
|
| |
| frame_interval = max(1, int(video_fps / fps)) if fps < video_fps else 1 |
|
|
| print(f"📹 Video: {video_path}") |
| print(f" Duration: {duration_sec:.1f}s | FPS: {video_fps:.1f} | Total frames: {total_frames}") |
| print(f" Extracting every {frame_interval}th frame ({fps} fps) ...") |
|
|
| frames: List[ExtractedFrame] = [] |
| frame_count = 0 |
|
|
| while True: |
| ret, frame = cap.read() |
| if not ret: |
| break |
|
|
| if frame_count % frame_interval == 0: |
| |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| pil_image = Image.fromarray(rgb_frame) |
| timestamp = frame_count / video_fps if video_fps > 0 else 0 |
|
|
| frames.append(ExtractedFrame( |
| image=pil_image, |
| timestamp_sec=timestamp, |
| frame_idx=len(frames), |
| )) |
|
|
| if len(frames) >= max_frames: |
| print(f" ⚠️ Reached max_frames limit ({max_frames})") |
| break |
|
|
| frame_count += 1 |
|
|
| cap.release() |
| print(f" ✅ Extracted {len(frames)} frames") |
| return frames |
|
|
|
|
| def get_video_info(video_path: str) -> dict: |
| """Get video metadata without extracting frames.""" |
| cap = cv2.VideoCapture(video_path) |
| if not cap.isOpened(): |
| raise ValueError(f"Cannot open video: {video_path}") |
|
|
| info = { |
| "path": video_path, |
| "fps": cap.get(cv2.CAP_PROP_FPS), |
| "total_frames": int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), |
| "width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), |
| "height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), |
| } |
| info["duration_sec"] = info["total_frames"] / info["fps"] if info["fps"] > 0 else 0 |
| cap.release() |
| return info |
|
|