Spaces:
Runtime error
Runtime error
| import cv2 | |
| import numpy as np | |
| from pathlib import Path | |
| from typing import Generator, Tuple, List | |
| import decord | |
| from decord import VideoReader, cpu | |
| # Fix decord seed to avoid warnings | |
| decord.bridge.set_bridge('torch') | |
| def extract_frames_decord(video_path: Path, fps: float = 1.0) -> Generator[Tuple[float, np.ndarray], None, None]: | |
| """Efficiently extracts frames from a video using Decord.""" | |
| if not video_path.exists(): | |
| raise FileNotFoundError(f"Video not found: {video_path}") | |
| vr = VideoReader(str(video_path), ctx=cpu(0)) | |
| original_fps = vr.get_avg_fps() | |
| # Calculate indices | |
| step = int(original_fps / fps) | |
| if step < 1: step = 1 | |
| indices = list(range(0, len(vr), step)) | |
| # Batch extraction | |
| batch_size = 32 | |
| for i in range(0, len(indices), batch_size): | |
| batch_indices = indices[i : i + batch_size] | |
| frames = vr.get_batch(batch_indices).asnumpy() | |
| for j, frame in enumerate(frames): | |
| idx = batch_indices[j] | |
| timestamp = idx / original_fps | |
| yield timestamp, frame | |
| def calculate_ssim_simplified(img1: np.ndarray, img2: np.ndarray) -> float: | |
| """Calculates a simple structural similarity score (MSE based).""" | |
| if img1.shape != img2.shape: | |
| img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) | |
| g1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY) | |
| g2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY) | |
| mse = np.mean((g1 - g2) ** 2) | |
| if mse == 0: return 1.0 | |
| return 1.0 / (1.0 + (mse / 1000.0)) | |
| def extract_key_scenes(video_path: Path, threshold: float = 0.85) -> List[Tuple[float, np.ndarray]]: | |
| """ | |
| Extracts ONLY significant scene changes (Keyframes). | |
| Reduces 60 frames -> 5-10 keyframes. | |
| """ | |
| print("🎬 Detecting Scenes...") | |
| keyframes = [] | |
| last_frame = None | |
| # Scan at 1 FPS | |
| for ts, frame in extract_frames_decord(video_path, fps=1.0): | |
| if last_frame is None: | |
| keyframes.append((ts, frame)) | |
| last_frame = frame | |
| continue | |
| score = calculate_ssim_simplified(last_frame, frame) | |
| # If scene changed significantly (score < threshold) | |
| if score < threshold: | |
| keyframes.append((ts, frame)) | |
| last_frame = frame | |
| print(f"🎬 Found {len(keyframes)} unique scenes.") | |
| return keyframes |