File size: 5,255 Bytes
d605fc8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import os
import tempfile
import ffmpeg
import yt_dlp
from pathlib import Path
from typing import Optional, List

# Global list to track temp files for cleanup
_temp_files: List[str] = []

def cleanup_temp_files():
    """Clean up temporary files created during processing"""
    global _temp_files
    for temp_file in _temp_files:
        try:
            if os.path.exists(temp_file):
                os.unlink(temp_file)
        except Exception:
            pass
    _temp_files.clear()

def download_from_url(url: str) -> Optional[str]:
    """
    Download media from URL using yt-dlp
    
    Args:
        url: URL to download from
    
    Returns:
        Path to downloaded file or None if failed
    """
    try:
        # Create temp directory
        temp_dir = tempfile.mkdtemp()
        
        # Configure yt-dlp options
        ydl_opts = {
            'format': 'best[ext=mp4]/best',
            'outtmpl': os.path.join(temp_dir, '%(id)s.%(ext)s'),
            'quiet': True,
            'no_warnings': True,
            'extract_flat': False,
        }
        
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            # Extract info and download
            info = ydl.extract_info(url, download=True)
            if not info:
                return None
            
            # Find downloaded file
            video_id = info.get('id', '')
            ext = info.get('ext', 'mp4')
            downloaded_file = os.path.join(temp_dir, f"{video_id}.{ext}")
            
            if os.path.exists(downloaded_file):
                _temp_files.append(downloaded_file)
                return downloaded_file
            
        return None
        
    except Exception as e:
        print(f"Download error: {e}")
        return None

def convert_media_file(input_path: str, output_format: str) -> Optional[str]:
    """
    Convert media file to target format using ffmpeg
    
    Args:
        input_path: Path to input file
        output_format: Target format (e.g., 'mp4', 'mp3')
    
    Returns:
        Path to converted file or None if failed
    """
    try:
        # Create output path
        temp_dir = tempfile.mkdtemp()
        base_name = Path(input_path).stem
        output_path = os.path.join(temp_dir, f"{base_name}.{output_format}")
        
        # Determine if we need audio-only conversion
        audio_formats = ["mp3", "wav", "aac", "flac", "ogg", "m4a", "wma", "opus", "aiff"]
        is_audio_only = output_format in audio_formats
        
        # Build ffmpeg command based on output format
        if is_audio_only:
            # Extract audio only
            stream = ffmpeg.input(input_path)
            audio = stream.audio
            stream = ffmpeg.output(audio, output_path, **{'q:a': 0, 'map_a': 0})
        else:
            # Full video conversion
            if output_format == "gif":
                # Special handling for GIF
                stream = ffmpeg.input(input_path)
                stream = ffmpeg.output(
                    stream, 
                    output_path, 
                    vf="fps=10,scale=320:-1:flags=lanczos", 
                    **{'loop': 0}
                )
            else:
                # Standard video conversion
                stream = ffmpeg.input(input_path)
                stream = ffmpeg.output(stream, output_path, **{'c:v': 'libx264', 'crf': 23})
        
        # Run conversion
        ffmpeg.run(stream, overwrite_output=True, quiet=True)
        
        if os.path.exists(output_path):
            _temp_files.append(output_path)
            return output_path
        
        return None
        
    except ffmpeg.Error as e:
        print(f"FFmpeg error: {e}")
        return None
    except Exception as e:
        print(f"Conversion error: {e}")
        return None

def get_media_info(file_path: str) -> dict:
    """
    Get media file information using ffmpeg
    
    Args:
        file_path: Path to media file
    
    Returns:
        Dictionary with media info
    """
    try:
        probe = ffmpeg.probe(file_path)
        video_streams = [stream for stream in probe['streams'] if stream['codec_type'] == 'video']
        audio_streams = [stream for stream in probe['streams'] if stream['codec_type'] == 'audio']
        
        info = {
            'duration': float(probe['format'].get('duration', 0)),
            'size': int(probe['format'].get('size', 0)),
            'bitrate': int(probe['format'].get('bit_rate', 0)),
            'has_video': len(video_streams) > 0,
            'has_audio': len(audio_streams) > 0,
        }
        
        if video_streams:
            info['video_codec'] = video_streams[0].get('codec_name')
            info['width'] = video_streams[0].get('width')
            info['height'] = video_streams[0].get('height')
            info['fps'] = eval(video_streams[0].get('r_frame_rate', '0'))
        
        if audio_streams:
            info['audio_codec'] = audio_streams[0].get('codec_name')
            info['sample_rate'] = audio_streams[0].get('sample_rate')
            info['channels'] = audio_streams[0].get('channels')
        
        return info
        
    except Exception as e:
        print(f"Error getting media info: {e}")
        return {}