import os import tempfile import ffmpeg import yt_dlp from pathlib import Path from typing import Optional, List # Global list to track temp files for cleanup _temp_files: List[str] = [] def cleanup_temp_files(): """Clean up temporary files created during processing""" global _temp_files for temp_file in _temp_files: try: if os.path.exists(temp_file): os.unlink(temp_file) except Exception: pass _temp_files.clear() def download_from_url(url: str) -> Optional[str]: """ Download media from URL using yt-dlp Args: url: URL to download from Returns: Path to downloaded file or None if failed """ try: # Create temp directory temp_dir = tempfile.mkdtemp() # Configure yt-dlp options ydl_opts = { 'format': 'best[ext=mp4]/best', 'outtmpl': os.path.join(temp_dir, '%(id)s.%(ext)s'), 'quiet': True, 'no_warnings': True, 'extract_flat': False, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: # Extract info and download info = ydl.extract_info(url, download=True) if not info: return None # Find downloaded file video_id = info.get('id', '') ext = info.get('ext', 'mp4') downloaded_file = os.path.join(temp_dir, f"{video_id}.{ext}") if os.path.exists(downloaded_file): _temp_files.append(downloaded_file) return downloaded_file return None except Exception as e: print(f"Download error: {e}") return None def convert_media_file(input_path: str, output_format: str) -> Optional[str]: """ Convert media file to target format using ffmpeg Args: input_path: Path to input file output_format: Target format (e.g., 'mp4', 'mp3') Returns: Path to converted file or None if failed """ try: # Create output path temp_dir = tempfile.mkdtemp() base_name = Path(input_path).stem output_path = os.path.join(temp_dir, f"{base_name}.{output_format}") # Determine if we need audio-only conversion audio_formats = ["mp3", "wav", "aac", "flac", "ogg", "m4a", "wma", "opus", "aiff"] is_audio_only = output_format in audio_formats # Build ffmpeg command based on output format if is_audio_only: # Extract audio only stream = ffmpeg.input(input_path) audio = stream.audio stream = ffmpeg.output(audio, output_path, **{'q:a': 0, 'map_a': 0}) else: # Full video conversion if output_format == "gif": # Special handling for GIF stream = ffmpeg.input(input_path) stream = ffmpeg.output( stream, output_path, vf="fps=10,scale=320:-1:flags=lanczos", **{'loop': 0} ) else: # Standard video conversion stream = ffmpeg.input(input_path) stream = ffmpeg.output(stream, output_path, **{'c:v': 'libx264', 'crf': 23}) # Run conversion ffmpeg.run(stream, overwrite_output=True, quiet=True) if os.path.exists(output_path): _temp_files.append(output_path) return output_path return None except ffmpeg.Error as e: print(f"FFmpeg error: {e}") return None except Exception as e: print(f"Conversion error: {e}") return None def get_media_info(file_path: str) -> dict: """ Get media file information using ffmpeg Args: file_path: Path to media file Returns: Dictionary with media info """ try: probe = ffmpeg.probe(file_path) video_streams = [stream for stream in probe['streams'] if stream['codec_type'] == 'video'] audio_streams = [stream for stream in probe['streams'] if stream['codec_type'] == 'audio'] info = { 'duration': float(probe['format'].get('duration', 0)), 'size': int(probe['format'].get('size', 0)), 'bitrate': int(probe['format'].get('bit_rate', 0)), 'has_video': len(video_streams) > 0, 'has_audio': len(audio_streams) > 0, } if video_streams: info['video_codec'] = video_streams[0].get('codec_name') info['width'] = video_streams[0].get('width') info['height'] = video_streams[0].get('height') info['fps'] = eval(video_streams[0].get('r_frame_rate', '0')) if audio_streams: info['audio_codec'] = audio_streams[0].get('codec_name') info['sample_rate'] = audio_streams[0].get('sample_rate') info['channels'] = audio_streams[0].get('channels') return info except Exception as e: print(f"Error getting media info: {e}") return {}