Spaces:
Build error
Build error
| from __future__ import unicode_literals | |
| import yt_dlp | |
| import os | |
| import time | |
| import shutil | |
| import logging | |
| import re | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Optional, Callable, Dict, Any, Union | |
| # Configuration | |
| MAX_FILE_SIZE = 40 * 1024 * 1024 # 40 MB | |
| FILE_TOO_LARGE_MESSAGE = "The audio file exceeds the 40MB size limit. Please try a shorter video clip or select a lower quality option." | |
| MAX_RETRIES = 3 | |
| RETRY_DELAY = 2 # seconds | |
| DEFAULT_AUDIO_FORMAT = "mp3" | |
| DEFAULT_AUDIO_QUALITY = "192" # kbps | |
| SUPPORTED_FORMATS = ["mp3", "m4a", "wav", "aac", "flac", "opus"] | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| ) | |
| logger = logging.getLogger("youtube_downloader") | |
| class DownloadLogger: | |
| """Enhanced logger for yt-dlp with callback support""" | |
| def __init__(self, progress_callback: Optional[Callable[[str], None]] = None): | |
| self.progress_callback = progress_callback or (lambda x: None) | |
| def debug(self, msg: str) -> None: | |
| if msg.startswith('[download]'): | |
| # Extract progress information | |
| if '%' in msg: | |
| self.progress_callback(msg) | |
| logger.debug(msg) | |
| def warning(self, msg: str) -> None: | |
| logger.warning(msg) | |
| def error(self, msg: str) -> None: | |
| logger.error(msg) | |
| class DownloadError(Exception): | |
| """Custom exception for download errors""" | |
| pass | |
| def validate_url(url: str) -> bool: | |
| """Validate if the URL is a supported video platform URL""" | |
| video_platforms = [ | |
| r'youtube\.com', | |
| r'youtu\.be', | |
| r'vimeo\.com', | |
| r'dailymotion\.com', | |
| r'twitch\.tv', | |
| r'soundcloud\.com', | |
| r'instagram\.com' | |
| ] | |
| pattern = '|'.join([f'({platform})' for platform in video_platforms]) | |
| return bool(re.search(pattern, url, re.IGNORECASE)) | |
| def ensure_download_directory(directory: str) -> str: | |
| """Ensure download directory exists, create if it doesn't""" | |
| path = Path(directory) | |
| path.mkdir(parents=True, exist_ok=True) | |
| return str(path.absolute()) | |
| def get_download_options( | |
| output_dir: str = "./downloads/audio", | |
| audio_format: str = DEFAULT_AUDIO_FORMAT, | |
| audio_quality: str = DEFAULT_AUDIO_QUALITY, | |
| progress_callback: Optional[Callable[[str], None]] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Get yt-dlp download options with specified parameters | |
| Args: | |
| output_dir: Directory to save downloaded files | |
| audio_format: Audio format (mp3, m4a, wav, etc.) | |
| audio_quality: Audio quality in kbps | |
| progress_callback: Function to call with progress updates | |
| Returns: | |
| Dictionary of yt-dlp options | |
| """ | |
| if audio_format not in SUPPORTED_FORMATS: | |
| logger.warning(f"Unsupported format '{audio_format}', falling back to {DEFAULT_AUDIO_FORMAT}") | |
| audio_format = DEFAULT_AUDIO_FORMAT | |
| # Ensure download directory exists | |
| output_dir = ensure_download_directory(output_dir) | |
| return { | |
| "format": "bestaudio/best", | |
| "postprocessors": [{ | |
| "key": "FFmpegExtractAudio", | |
| "preferredcodec": audio_format, | |
| "preferredquality": audio_quality, | |
| }], | |
| "logger": DownloadLogger(progress_callback), | |
| "outtmpl": f"{output_dir}/%(title)s.%(ext)s", | |
| "noplaylist": True, | |
| "quiet": False, | |
| "no_warnings": False, | |
| "progress_hooks": [lambda d: download_progress_hook(d, progress_callback)], | |
| "overwrites": True, | |
| } | |
| def download_progress_hook(d: Dict[str, Any], callback: Optional[Callable[[str], None]] = None) -> None: | |
| """ | |
| Hook for tracking download progress | |
| Args: | |
| d: Download information dictionary | |
| callback: Function to call with progress updates | |
| """ | |
| if callback is None: | |
| callback = lambda x: None | |
| if d['status'] == 'downloading': | |
| progress = d.get('_percent_str', 'unknown progress') | |
| speed = d.get('_speed_str', 'unknown speed') | |
| eta = d.get('_eta_str', 'unknown ETA') | |
| callback(f"Downloading: {progress} at {speed}, ETA: {eta}") | |
| elif d['status'] == 'finished': | |
| filename = os.path.basename(d['filename']) | |
| callback(f"Download complete: {filename}") | |
| logger.info(f"Download finished: {d['filename']}") | |
| def estimate_file_size(info: Dict[str, Any]) -> int: | |
| """ | |
| Better estimate file size from video info | |
| Args: | |
| info: Video information dictionary | |
| Returns: | |
| Estimated file size in bytes | |
| """ | |
| # Try different fields that might contain size information | |
| filesize = info.get("filesize") | |
| if filesize is not None: | |
| return filesize | |
| filesize = info.get("filesize_approx") | |
| if filesize is not None: | |
| return filesize | |
| # If we have duration and a bitrate, we can estimate | |
| duration = info.get("duration") | |
| bitrate = info.get("abr") or info.get("tbr") | |
| if duration and bitrate: | |
| # Estimate using bitrate (kbps) * duration (seconds) / 8 (bits to bytes) * 1024 (to KB) | |
| return int(bitrate * duration * 128) # 128 = 1024 / 8 | |
| # Default to a reasonable upper limit if we can't determine | |
| return MAX_FILE_SIZE | |
| def download_video_audio( | |
| url: str, | |
| output_dir: str = "./downloads/audio", | |
| audio_format: str = DEFAULT_AUDIO_FORMAT, | |
| audio_quality: str = DEFAULT_AUDIO_QUALITY, | |
| progress_callback: Optional[Callable[[str], None]] = None | |
| ) -> Optional[str]: | |
| """ | |
| Download audio from a video URL | |
| Args: | |
| url: URL of the video | |
| output_dir: Directory to save downloaded files | |
| audio_format: Audio format (mp3, m4a, wav, etc.) | |
| audio_quality: Audio quality in kbps | |
| progress_callback: Function to call with progress updates | |
| Returns: | |
| Path to the downloaded audio file or None if download failed | |
| Raises: | |
| DownloadError: If download fails after retries | |
| """ | |
| if not validate_url(url): | |
| error_msg = f"Invalid or unsupported URL: {url}" | |
| logger.error(error_msg) | |
| raise DownloadError(error_msg) | |
| retries = 0 | |
| while retries < MAX_RETRIES: | |
| try: | |
| if progress_callback: | |
| progress_callback(f"Starting download (attempt {retries + 1}/{MAX_RETRIES})...") | |
| ydl_opts = get_download_options(output_dir, audio_format, audio_quality, progress_callback) | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| logger.info(f"Downloading audio from: {url}") | |
| # Extract info first without downloading | |
| info = ydl.extract_info(url, download=False) | |
| # Better file size estimation | |
| estimated_size = estimate_file_size(info) | |
| if estimated_size > MAX_FILE_SIZE: | |
| error_msg = f"{FILE_TOO_LARGE_MESSAGE} (Estimated: {estimated_size / 1024 / 1024:.1f}MB)" | |
| logger.error(error_msg) | |
| raise DownloadError(error_msg) | |
| # Now download | |
| ydl.download([url]) | |
| # Get the filename - needs some extra handling due to extraction | |
| filename = ydl.prepare_filename(info) | |
| base_filename = os.path.splitext(filename)[0] | |
| final_filename = f"{base_filename}.{audio_format}" | |
| # Verify file exists and return path | |
| if os.path.exists(final_filename): | |
| return final_filename | |
| else: | |
| # Try to find the file with a different extension | |
| for ext in SUPPORTED_FORMATS: | |
| potential_file = f"{base_filename}.{ext}" | |
| if os.path.exists(potential_file): | |
| return potential_file | |
| # If we get here, something went wrong | |
| raise FileNotFoundError(f"Could not locate downloaded file for {url}") | |
| except yt_dlp.utils.DownloadError as e: | |
| retries += 1 | |
| error_msg = f"Download error (Attempt {retries}/{MAX_RETRIES}): {str(e)}" | |
| logger.error(error_msg) | |
| if progress_callback: | |
| progress_callback(error_msg) | |
| if "HTTP Error 429" in str(e): | |
| # Rate limiting - wait longer | |
| time.sleep(RETRY_DELAY * 5) | |
| elif retries >= MAX_RETRIES: | |
| raise DownloadError(f"Failed to download after {MAX_RETRIES} attempts: {str(e)}") | |
| else: | |
| time.sleep(RETRY_DELAY) | |
| except Exception as e: | |
| retries += 1 | |
| error_msg = f"Unexpected error (Attempt {retries}/{MAX_RETRIES}): {str(e)}" | |
| logger.error(error_msg) | |
| if progress_callback: | |
| progress_callback(error_msg) | |
| if retries >= MAX_RETRIES: | |
| raise DownloadError(f"Failed to download after {MAX_RETRIES} attempts: {str(e)}") | |
| time.sleep(RETRY_DELAY) | |
| return None | |
| def delete_download(path: str) -> bool: | |
| """ | |
| Delete a downloaded file or directory | |
| Args: | |
| path: Path to file or directory to delete | |
| Returns: | |
| True if deletion was successful, False otherwise | |
| """ | |
| try: | |
| if not path or not os.path.exists(path): | |
| logger.warning(f"Path does not exist: {path}") | |
| return False | |
| if os.path.isfile(path): | |
| os.remove(path) | |
| logger.info(f"File deleted: {path}") | |
| elif os.path.isdir(path): | |
| shutil.rmtree(path) | |
| logger.info(f"Directory deleted: {path}") | |
| else: | |
| logger.warning(f"Path is neither a file nor a directory: {path}") | |
| return False | |
| return True | |
| except PermissionError: | |
| logger.error(f"Permission denied: Unable to delete {path}") | |
| except FileNotFoundError: | |
| logger.error(f"File or directory not found: {path}") | |
| except Exception as e: | |
| logger.error(f"Error deleting {path}: {str(e)}") | |
| return False | |
| def trim_audio_file(input_file: str, max_duration_seconds: int = 600) -> str: | |
| """ | |
| Trim an audio file to a maximum duration to reduce file size | |
| Args: | |
| input_file: Path to input audio file | |
| max_duration_seconds: Maximum duration in seconds | |
| Returns: | |
| Path to trimmed file | |
| """ | |
| try: | |
| import ffmpeg | |
| # Create output filename | |
| file_dir = os.path.dirname(input_file) | |
| file_name, file_ext = os.path.splitext(os.path.basename(input_file)) | |
| output_file = os.path.join(file_dir, f"{file_name}_trimmed{file_ext}") | |
| # Trim using ffmpeg | |
| ffmpeg.input(input_file).output( | |
| output_file, t=str(max_duration_seconds), acodec='copy' | |
| ).run(quiet=True, overwrite_output=True) | |
| logger.info(f"Trimmed {input_file} to {max_duration_seconds} seconds") | |
| return output_file | |
| except Exception as e: | |
| logger.error(f"Error trimming audio: {str(e)}") | |
| return input_file # Return original if trimming fails | |
| def get_video_info(url: str) -> Dict[str, Any]: | |
| """ | |
| Get information about a video without downloading | |
| Args: | |
| url: URL of the video | |
| Returns: | |
| Dictionary of video information | |
| """ | |
| try: | |
| with yt_dlp.YoutubeDL({"quiet": True}) as ydl: | |
| info = ydl.extract_info(url, download=False) | |
| return info | |
| except Exception as e: | |
| logger.error(f"Error getting video info: {str(e)}") | |
| raise DownloadError(f"Could not retrieve video information: {str(e)}") |