File size: 2,403 Bytes
fca155a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import cv2
import numpy as np
from pathlib import Path
from typing import Generator, Tuple, List
import decord
from decord import VideoReader, cpu

# Fix decord seed to avoid warnings
decord.bridge.set_bridge('torch')

def extract_frames_decord(video_path: Path, fps: float = 1.0) -> Generator[Tuple[float, np.ndarray], None, None]:
    """Efficiently extracts frames from a video using Decord."""
    if not video_path.exists():
        raise FileNotFoundError(f"Video not found: {video_path}")
        
    vr = VideoReader(str(video_path), ctx=cpu(0))
    original_fps = vr.get_avg_fps()
    
    # Calculate indices
    step = int(original_fps / fps)
    if step < 1: step = 1
    
    indices = list(range(0, len(vr), step))
    
    # Batch extraction
    batch_size = 32
    for i in range(0, len(indices), batch_size):
        batch_indices = indices[i : i + batch_size]
        frames = vr.get_batch(batch_indices).asnumpy()
        
        for j, frame in enumerate(frames):
            idx = batch_indices[j]
            timestamp = idx / original_fps
            yield timestamp, frame

def calculate_ssim_simplified(img1: np.ndarray, img2: np.ndarray) -> float:
    """Calculates a simple structural similarity score (MSE based)."""
    if img1.shape != img2.shape:
        img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0]))
        
    g1 = cv2.cvtColor(img1, cv2.COLOR_RGB2GRAY)
    g2 = cv2.cvtColor(img2, cv2.COLOR_RGB2GRAY)
    
    mse = np.mean((g1 - g2) ** 2)
    if mse == 0: return 1.0
    return 1.0 / (1.0 + (mse / 1000.0))

def extract_key_scenes(video_path: Path, threshold: float = 0.85) -> List[Tuple[float, np.ndarray]]:
    """
    Extracts ONLY significant scene changes (Keyframes).
    Reduces 60 frames -> 5-10 keyframes.
    """
    print("🎬 Detecting Scenes...")
    keyframes = []
    last_frame = None
    
    # Scan at 1 FPS
    for ts, frame in extract_frames_decord(video_path, fps=1.0):
        if last_frame is None:
            keyframes.append((ts, frame))
            last_frame = frame
            continue
            
        score = calculate_ssim_simplified(last_frame, frame)
        
        # If scene changed significantly (score < threshold)
        if score < threshold:
            keyframes.append((ts, frame))
            last_frame = frame
            
    print(f"🎬 Found {len(keyframes)} unique scenes.")
    return keyframes