Spaces:
No application file
No application file
File size: 5,255 Bytes
d605fc8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
import os
import tempfile
import ffmpeg
import yt_dlp
from pathlib import Path
from typing import Optional, List
# Global list to track temp files for cleanup
_temp_files: List[str] = []
def cleanup_temp_files():
"""Clean up temporary files created during processing"""
global _temp_files
for temp_file in _temp_files:
try:
if os.path.exists(temp_file):
os.unlink(temp_file)
except Exception:
pass
_temp_files.clear()
def download_from_url(url: str) -> Optional[str]:
"""
Download media from URL using yt-dlp
Args:
url: URL to download from
Returns:
Path to downloaded file or None if failed
"""
try:
# Create temp directory
temp_dir = tempfile.mkdtemp()
# Configure yt-dlp options
ydl_opts = {
'format': 'best[ext=mp4]/best',
'outtmpl': os.path.join(temp_dir, '%(id)s.%(ext)s'),
'quiet': True,
'no_warnings': True,
'extract_flat': False,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
# Extract info and download
info = ydl.extract_info(url, download=True)
if not info:
return None
# Find downloaded file
video_id = info.get('id', '')
ext = info.get('ext', 'mp4')
downloaded_file = os.path.join(temp_dir, f"{video_id}.{ext}")
if os.path.exists(downloaded_file):
_temp_files.append(downloaded_file)
return downloaded_file
return None
except Exception as e:
print(f"Download error: {e}")
return None
def convert_media_file(input_path: str, output_format: str) -> Optional[str]:
"""
Convert media file to target format using ffmpeg
Args:
input_path: Path to input file
output_format: Target format (e.g., 'mp4', 'mp3')
Returns:
Path to converted file or None if failed
"""
try:
# Create output path
temp_dir = tempfile.mkdtemp()
base_name = Path(input_path).stem
output_path = os.path.join(temp_dir, f"{base_name}.{output_format}")
# Determine if we need audio-only conversion
audio_formats = ["mp3", "wav", "aac", "flac", "ogg", "m4a", "wma", "opus", "aiff"]
is_audio_only = output_format in audio_formats
# Build ffmpeg command based on output format
if is_audio_only:
# Extract audio only
stream = ffmpeg.input(input_path)
audio = stream.audio
stream = ffmpeg.output(audio, output_path, **{'q:a': 0, 'map_a': 0})
else:
# Full video conversion
if output_format == "gif":
# Special handling for GIF
stream = ffmpeg.input(input_path)
stream = ffmpeg.output(
stream,
output_path,
vf="fps=10,scale=320:-1:flags=lanczos",
**{'loop': 0}
)
else:
# Standard video conversion
stream = ffmpeg.input(input_path)
stream = ffmpeg.output(stream, output_path, **{'c:v': 'libx264', 'crf': 23})
# Run conversion
ffmpeg.run(stream, overwrite_output=True, quiet=True)
if os.path.exists(output_path):
_temp_files.append(output_path)
return output_path
return None
except ffmpeg.Error as e:
print(f"FFmpeg error: {e}")
return None
except Exception as e:
print(f"Conversion error: {e}")
return None
def get_media_info(file_path: str) -> dict:
"""
Get media file information using ffmpeg
Args:
file_path: Path to media file
Returns:
Dictionary with media info
"""
try:
probe = ffmpeg.probe(file_path)
video_streams = [stream for stream in probe['streams'] if stream['codec_type'] == 'video']
audio_streams = [stream for stream in probe['streams'] if stream['codec_type'] == 'audio']
info = {
'duration': float(probe['format'].get('duration', 0)),
'size': int(probe['format'].get('size', 0)),
'bitrate': int(probe['format'].get('bit_rate', 0)),
'has_video': len(video_streams) > 0,
'has_audio': len(audio_streams) > 0,
}
if video_streams:
info['video_codec'] = video_streams[0].get('codec_name')
info['width'] = video_streams[0].get('width')
info['height'] = video_streams[0].get('height')
info['fps'] = eval(video_streams[0].get('r_frame_rate', '0'))
if audio_streams:
info['audio_codec'] = audio_streams[0].get('codec_name')
info['sample_rate'] = audio_streams[0].get('sample_rate')
info['channels'] = audio_streams[0].get('channels')
return info
except Exception as e:
print(f"Error getting media info: {e}")
return {} |