File size: 2,124 Bytes
6bdfadc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import tempfile
import os
import subprocess
import json
import shutil
from typing import Optional

import yt_dlp


class VideoLoader:
    def __init__(self):
        self.temp_dir = tempfile.mkdtemp()

    def load_youtube(self, url: str) -> Optional[str]:
        """
        Download YouTube video.

        Args:
            url: YouTube URL

        Returns:
            Path to downloaded video file, or None if failed
        """
        output_path = os.path.join(self.temp_dir, "video.mp4")

        ydl_opts = {
            'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/bestvideo+bestaudio/best',
            'outtmpl': output_path,
            'merge_output_format': 'mp4',
            'quiet': True,
            'no_warnings': True,
            'postprocessors': [{
                'key': 'FFmpegVideoConvertor',
                'preferedformat': 'mp4',
            }],
        }

        try:
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                ydl.download([url])
            return output_path
        except Exception as e:
            print(f"Error downloading YouTube video: {e}")
            return None

    def load_local(self, uploaded_file) -> Optional[str]:
        """
        Save uploaded file to temp directory.

        Args:
            uploaded_file: Streamlit UploadedFile object

        Returns:
            Path to saved file
        """
        output_path = os.path.join(self.temp_dir, uploaded_file.name)

        with open(output_path, "wb") as f:
            f.write(uploaded_file.read())

        return output_path

    def get_video_duration(self, video_path: str) -> float:
        """Get video duration in seconds using ffprobe."""
        cmd = [
            'ffprobe', '-v', 'quiet', '-print_format', 'json',
            '-show_format', video_path
        ]

        result = subprocess.run(cmd, capture_output=True, text=True)
        data = json.loads(result.stdout)

        return float(data['format']['duration'])

    def cleanup(self):
        """Remove temp files."""
        if os.path.exists(self.temp_dir):
            shutil.rmtree(self.temp_dir)