Spaces:
No application file
No application file
| import os | |
| import tempfile | |
| import ffmpeg | |
| import yt_dlp | |
| from pathlib import Path | |
| from typing import Optional, List | |
| # Global list to track temp files for cleanup | |
| _temp_files: List[str] = [] | |
| def cleanup_temp_files(): | |
| """Clean up temporary files created during processing""" | |
| global _temp_files | |
| for temp_file in _temp_files: | |
| try: | |
| if os.path.exists(temp_file): | |
| os.unlink(temp_file) | |
| except Exception: | |
| pass | |
| _temp_files.clear() | |
| def download_from_url(url: str) -> Optional[str]: | |
| """ | |
| Download media from URL using yt-dlp | |
| Args: | |
| url: URL to download from | |
| Returns: | |
| Path to downloaded file or None if failed | |
| """ | |
| try: | |
| # Create temp directory | |
| temp_dir = tempfile.mkdtemp() | |
| # Configure yt-dlp options | |
| ydl_opts = { | |
| 'format': 'best[ext=mp4]/best', | |
| 'outtmpl': os.path.join(temp_dir, '%(id)s.%(ext)s'), | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'extract_flat': False, | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| # Extract info and download | |
| info = ydl.extract_info(url, download=True) | |
| if not info: | |
| return None | |
| # Find downloaded file | |
| video_id = info.get('id', '') | |
| ext = info.get('ext', 'mp4') | |
| downloaded_file = os.path.join(temp_dir, f"{video_id}.{ext}") | |
| if os.path.exists(downloaded_file): | |
| _temp_files.append(downloaded_file) | |
| return downloaded_file | |
| return None | |
| except Exception as e: | |
| print(f"Download error: {e}") | |
| return None | |
| def convert_media_file(input_path: str, output_format: str) -> Optional[str]: | |
| """ | |
| Convert media file to target format using ffmpeg | |
| Args: | |
| input_path: Path to input file | |
| output_format: Target format (e.g., 'mp4', 'mp3') | |
| Returns: | |
| Path to converted file or None if failed | |
| """ | |
| try: | |
| # Create output path | |
| temp_dir = tempfile.mkdtemp() | |
| base_name = Path(input_path).stem | |
| output_path = os.path.join(temp_dir, f"{base_name}.{output_format}") | |
| # Determine if we need audio-only conversion | |
| audio_formats = ["mp3", "wav", "aac", "flac", "ogg", "m4a", "wma", "opus", "aiff"] | |
| is_audio_only = output_format in audio_formats | |
| # Build ffmpeg command based on output format | |
| if is_audio_only: | |
| # Extract audio only | |
| stream = ffmpeg.input(input_path) | |
| audio = stream.audio | |
| stream = ffmpeg.output(audio, output_path, **{'q:a': 0, 'map_a': 0}) | |
| else: | |
| # Full video conversion | |
| if output_format == "gif": | |
| # Special handling for GIF | |
| stream = ffmpeg.input(input_path) | |
| stream = ffmpeg.output( | |
| stream, | |
| output_path, | |
| vf="fps=10,scale=320:-1:flags=lanczos", | |
| **{'loop': 0} | |
| ) | |
| else: | |
| # Standard video conversion | |
| stream = ffmpeg.input(input_path) | |
| stream = ffmpeg.output(stream, output_path, **{'c:v': 'libx264', 'crf': 23}) | |
| # Run conversion | |
| ffmpeg.run(stream, overwrite_output=True, quiet=True) | |
| if os.path.exists(output_path): | |
| _temp_files.append(output_path) | |
| return output_path | |
| return None | |
| except ffmpeg.Error as e: | |
| print(f"FFmpeg error: {e}") | |
| return None | |
| except Exception as e: | |
| print(f"Conversion error: {e}") | |
| return None | |
| def get_media_info(file_path: str) -> dict: | |
| """ | |
| Get media file information using ffmpeg | |
| Args: | |
| file_path: Path to media file | |
| Returns: | |
| Dictionary with media info | |
| """ | |
| try: | |
| probe = ffmpeg.probe(file_path) | |
| video_streams = [stream for stream in probe['streams'] if stream['codec_type'] == 'video'] | |
| audio_streams = [stream for stream in probe['streams'] if stream['codec_type'] == 'audio'] | |
| info = { | |
| 'duration': float(probe['format'].get('duration', 0)), | |
| 'size': int(probe['format'].get('size', 0)), | |
| 'bitrate': int(probe['format'].get('bit_rate', 0)), | |
| 'has_video': len(video_streams) > 0, | |
| 'has_audio': len(audio_streams) > 0, | |
| } | |
| if video_streams: | |
| info['video_codec'] = video_streams[0].get('codec_name') | |
| info['width'] = video_streams[0].get('width') | |
| info['height'] = video_streams[0].get('height') | |
| info['fps'] = eval(video_streams[0].get('r_frame_rate', '0')) | |
| if audio_streams: | |
| info['audio_codec'] = audio_streams[0].get('codec_name') | |
| info['sample_rate'] = audio_streams[0].get('sample_rate') | |
| info['channels'] = audio_streams[0].get('channels') | |
| return info | |
| except Exception as e: | |
| print(f"Error getting media info: {e}") | |
| return {} |