""" Video Intelligence Platform — Frame Extraction Extracts frames from video at configurable FPS using OpenCV. """ import cv2 import numpy as np from pathlib import Path from PIL import Image from typing import List, Tuple from dataclasses import dataclass @dataclass class ExtractedFrame: """A single extracted frame with its timestamp.""" image: Image.Image timestamp_sec: float frame_idx: int def extract_frames( video_path: str, fps: float = 1.0, max_frames: int = 3600, ) -> List[ExtractedFrame]: """ Extract frames from video at given FPS. Args: video_path: Path to video file fps: Frames per second to extract (default 1.0) max_frames: Maximum number of frames to extract Returns: List of ExtractedFrame objects with PIL images and timestamps """ cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError(f"Cannot open video: {video_path}") video_fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration_sec = total_frames / video_fps if video_fps > 0 else 0 # Calculate which frames to extract frame_interval = max(1, int(video_fps / fps)) if fps < video_fps else 1 print(f"📹 Video: {video_path}") print(f" Duration: {duration_sec:.1f}s | FPS: {video_fps:.1f} | Total frames: {total_frames}") print(f" Extracting every {frame_interval}th frame ({fps} fps) ...") frames: List[ExtractedFrame] = [] frame_count = 0 while True: ret, frame = cap.read() if not ret: break if frame_count % frame_interval == 0: # Convert BGR (OpenCV) to RGB (PIL) rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(rgb_frame) timestamp = frame_count / video_fps if video_fps > 0 else 0 frames.append(ExtractedFrame( image=pil_image, timestamp_sec=timestamp, frame_idx=len(frames), )) if len(frames) >= max_frames: print(f" ⚠️ Reached max_frames limit ({max_frames})") break frame_count += 1 cap.release() print(f" ✅ Extracted {len(frames)} frames") return frames def get_video_info(video_path: str) -> dict: """Get video metadata without extracting frames.""" cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError(f"Cannot open video: {video_path}") info = { "path": video_path, "fps": cap.get(cv2.CAP_PROP_FPS), "total_frames": int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), "width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), "height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), } info["duration_sec"] = info["total_frames"] / info["fps"] if info["fps"] > 0 else 0 cap.release() return info