File size: 1,577 Bytes
6bdfadc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import subprocess
import os
import tempfile
from typing import List, Dict


class FrameExtractor:
    def __init__(self, output_dir: str = None):
        self.output_dir = output_dir or tempfile.mkdtemp()
        os.makedirs(self.output_dir, exist_ok=True)

    def extract_frames(self, video_path: str, interval_seconds: float = 2.0) -> List[Dict]:
        """
        Extract frames at regular intervals using FFmpeg.

        Args:
            video_path: Path to video file
            interval_seconds: Extract one frame every N seconds

        Returns:
            List of dicts with timestamp and frame path:
            [
                {"timestamp": 0.0, "path": "/tmp/frame_001.jpg"},
                {"timestamp": 2.0, "path": "/tmp/frame_002.jpg"},
                ...
            ]
        """
        fps = 1 / interval_seconds
        output_pattern = os.path.join(self.output_dir, "frame_%03d.jpg")

        cmd = [
            'ffmpeg', '-i', video_path,
            '-vf', f'fps={fps}',
            '-q:v', '2',  # High quality
            output_pattern,
            '-y'  # Overwrite
        ]

        subprocess.run(cmd, capture_output=True, check=True)

        # Build result list with timestamps
        frames = []
        frame_files = sorted([f for f in os.listdir(self.output_dir) if f.startswith('frame_')])

        for i, frame_file in enumerate(frame_files):
            frames.append({
                "timestamp": i * interval_seconds,
                "path": os.path.join(self.output_dir, frame_file)
            })

        return frames