Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Universal Media Downloader Backend API | |
| Built with Flask and yt-dlp for platform-agnostic media downloading | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import time | |
| import threading | |
| import subprocess | |
| from datetime import datetime | |
| from flask import Flask, request, jsonify, send_file, send_from_directory | |
| from flask_cors import CORS | |
| from werkzeug.exceptions import BadRequest, InternalServerError | |
| import logging | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('app.log'), | |
| logging.StreamHandler(sys.stdout) | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Initialize Flask app | |
| app = Flask(__name__) | |
| CORS(app) | |
| # Global variables | |
| yt_dlp_version = None | |
| last_update_check = None | |
| download_queue = [] | |
| download_history = [] | |
| active_downloads = {} | |
| class YTDLPManager: | |
| """Manages yt-dlp operations with automatic updates""" | |
| def __init__(self): | |
| self.ensure_ytdlp_installed() | |
| self.update_yt_dlp() | |
| def format_file_size(bytes_size): | |
| """Format file size in human readable format""" | |
| if not bytes_size or bytes_size == 0: | |
| return "Unknown" | |
| for unit in ['B', 'KB', 'MB', 'GB', 'TB']: | |
| if bytes_size < 1024.0: | |
| return f"{bytes_size:.1f} {unit}" | |
| bytes_size /= 1024.0 | |
| return f"{bytes_size:.1f} PB" | |
| def format_time(seconds): | |
| """Format time duration in human readable format""" | |
| if not seconds: | |
| return "Unknown" | |
| hours = int(seconds // 3600) | |
| minutes = int((seconds % 3600) // 60) | |
| secs = int(seconds % 60) | |
| if hours > 0: | |
| return f"{hours}:{minutes:02d}:{secs:02d}" | |
| else: | |
| return f"{minutes}:{secs:02d}" | |
| def ensure_ytdlp_installed(self): | |
| """Ensure yt-dlp is installed""" | |
| try: | |
| import yt_dlp | |
| logger.info("yt-dlp is available") | |
| except ImportError: | |
| logger.info("Installing yt-dlp...") | |
| subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'yt-dlp']) | |
| logger.info("yt-dlp installed successfully") | |
| def update_yt_dlp(self): | |
| """Update yt-dlp to latest version""" | |
| global yt_dlp_version, last_update_check | |
| try: | |
| logger.info("Checking for yt-dlp updates...") | |
| result = subprocess.run([ | |
| sys.executable, '-m', 'pip', 'install', '--upgrade', 'yt-dlp' | |
| ], capture_output=True, text=True, timeout=300) | |
| if result.returncode == 0: | |
| # Get version info | |
| version_result = subprocess.run([ | |
| sys.executable, '-m', 'yt_dlp', '--version' | |
| ], capture_output=True, text=True) | |
| if version_result.returncode == 0: | |
| yt_dlp_version = version_result.stdout.strip() | |
| last_update_check = datetime.now() | |
| logger.info(f"yt-dlp updated to version: {yt_dlp_version}") | |
| return True | |
| else: | |
| logger.warning("Could not get yt-dlp version after update") | |
| else: | |
| logger.warning(f"yt-dlp update failed: {result.stderr}") | |
| except Exception as e: | |
| logger.error(f"Error updating yt-dlp: {e}") | |
| return False | |
| def get_formats(self, url): | |
| """Extract available formats from URL""" | |
| try: | |
| import yt_dlp | |
| ydl_opts = { | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'extract_flat': False, | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=False) | |
| formats = [] | |
| if 'formats' in info: | |
| for fmt in info['formats']: | |
| if fmt.get('vcodec') != 'none' or fmt.get('acodec') != 'none': | |
| format_info = { | |
| 'id': fmt.get('format_id', 'unknown'), | |
| 'ext': fmt.get('ext', 'unknown'), | |
| 'vcodec': fmt.get('vcodec', 'none'), | |
| 'acodec': fmt.get('acodec', 'none'), | |
| 'width': fmt.get('width'), | |
| 'height': fmt.get('height'), | |
| 'fps': fmt.get('fps'), | |
| 'filesize': fmt.get('filesize') or fmt.get('filesize_approx'), | |
| 'format_note': fmt.get('format_note', ''), | |
| 'url': fmt.get('url', ''), | |
| 'type': 'video' if fmt.get('vcodec') != 'none' else 'audio' | |
| } | |
| formats.append(format_info) | |
| # Also include direct download if available | |
| if info.get('url'): | |
| formats.append({ | |
| 'id': 'direct', | |
| 'ext': info.get('ext', 'mp4'), | |
| 'vcodec': info.get('vcodec', 'none'), | |
| 'acodec': info.get('acodec', 'none'), | |
| 'width': info.get('width'), | |
| 'height': info.get('height'), | |
| 'fps': info.get('fps'), | |
| 'filesize': info.get('filesize'), | |
| 'format_note': 'Direct', | |
| 'url': info.get('url'), | |
| 'type': 'video' if info.get('vcodec') != 'none' else 'audio' | |
| }) | |
| return { | |
| 'success': True, | |
| 'title': info.get('title', 'Unknown Title'), | |
| 'uploader': info.get('uploader', 'Unknown Uploader'), | |
| 'duration': info.get('duration'), | |
| 'thumbnail': info.get('thumbnail'), | |
| 'formats': formats, | |
| 'platform': self.detect_platform(url), | |
| 'view_count': info.get('view_count'), | |
| 'like_count': info.get('like_count') | |
| } | |
| except Exception as e: | |
| logger.error(f"Error extracting formats from {url}: {e}") | |
| # Try to provide direct file URLs as fallback | |
| try: | |
| import yt_dlp | |
| ydl_opts = { | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'extract_flat': False, | |
| 'force_json': True | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| # Try to get basic info and available file URLs | |
| info = ydl.extract_info(url, download=False) | |
| if info: | |
| # Extract basic information | |
| title = info.get('title', 'Unknown Title') | |
| uploader = info.get('uploader', 'Unknown Uploader') | |
| platform = self.detect_platform(url) | |
| # Group direct file URLs by type | |
| video_urls = [] | |
| audio_urls = [] | |
| image_urls = [] | |
| other_urls = [] | |
| # Get thumbnail | |
| if info.get('thumbnail'): | |
| image_urls.append({ | |
| 'type': 'thumbnail', | |
| 'quality': 'default', | |
| 'url': info.get('thumbnail'), | |
| 'description': 'Video Thumbnail' | |
| }) | |
| # Process formats for direct URLs | |
| if 'formats' in info: | |
| for fmt in info['formats']: | |
| file_url = fmt.get('url') | |
| if file_url: | |
| format_type = self.determine_format_type(fmt) | |
| url_info = { | |
| 'type': format_type, | |
| 'quality': self.get_quality_label(fmt), | |
| 'url': file_url, | |
| 'format_id': fmt.get('format_id', 'unknown'), | |
| 'ext': fmt.get('ext', 'unknown'), | |
| 'filesize': fmt.get('filesize') or fmt.get('filesize_approx'), | |
| 'description': self.get_format_description(fmt, format_type) | |
| } | |
| if format_type == 'video': | |
| video_urls.append(url_info) | |
| elif format_type == 'audio': | |
| audio_urls.append(url_info) | |
| elif format_type == 'image': | |
| image_urls.append(url_info) | |
| else: | |
| other_urls.append(url_info) | |
| # Also include direct URL if available | |
| if info.get('url'): | |
| direct_url_info = { | |
| 'type': 'direct', | |
| 'quality': 'direct', | |
| 'url': info.get('url'), | |
| 'format_id': 'direct', | |
| 'ext': info.get('ext', 'unknown'), | |
| 'filesize': info.get('filesize'), | |
| 'description': 'Direct Media File' | |
| } | |
| media_type = 'video' if info.get('vcodec') != 'none' else 'audio' | |
| if media_type == 'video': | |
| video_urls.append(direct_url_info) | |
| else: | |
| audio_urls.append(direct_url_info) | |
| # Select best quality for each type | |
| best_video = self.select_best_quality(video_urls) if video_urls else None | |
| best_audio = self.select_best_quality(audio_urls) if audio_urls else None | |
| # Build response with best quality links | |
| direct_download_links = [] | |
| if best_video: | |
| direct_download_links.append(best_video) | |
| if best_audio: | |
| direct_download_links.append(best_audio) | |
| # Add thumbnail if available | |
| for img in image_urls[:2]: # Max 2 thumbnail options | |
| direct_download_links.append(img) | |
| # If we have limited options, show more links | |
| if len(direct_download_links) < 3: | |
| # Add second best video/audio if available | |
| if len(video_urls) > 1: | |
| direct_download_links.append(video_urls[1]) | |
| if len(audio_urls) > 1: | |
| direct_download_links.append(audio_urls[1]) | |
| # Add other formats | |
| for other in other_urls[:2]: # Max 2 other formats | |
| direct_download_links.append(other) | |
| return { | |
| 'success': False, | |
| 'fallback': True, | |
| 'direct_files': True, | |
| 'error': str(e), | |
| 'message': 'Direct file URLs found. Click to download:', | |
| 'basic_info': { | |
| 'title': title, | |
| 'uploader': uploader, | |
| 'platform': platform, | |
| 'url': url | |
| }, | |
| 'download_options': direct_download_links, | |
| 'formats': [], | |
| 'instruction': 'Click on any direct download link above to download the file immediately.' | |
| } | |
| except Exception as fallback_error: | |
| logger.error(f"Direct URL extraction also failed: {fallback_error}") | |
| # If all attempts fail, return basic error with platform-specific suggestions | |
| platform = self.detect_platform(url) | |
| suggestions = { | |
| 'youtube.com': 'YouTube: Content may be private, restricted, or geo-blocked', | |
| 'youtu.be': 'YouTube: Try the full YouTube URL or check content availability', | |
| 'vimeo.com': 'Vimeo: Content may require download permissions or login', | |
| 'tiktok.com': 'TikTok: Content may be private or restricted', | |
| 'instagram.com': 'Instagram: Content may be private or have download restrictions', | |
| 'twitter.com': 'Twitter/X: Content may be private or deleted', | |
| 'facebook.com': 'Facebook: Content may require login or permissions', | |
| 'default': 'Content may be private, restricted, deleted, or the platform may not support downloads' | |
| } | |
| suggestion = suggestions.get(platform, suggestions['default']) | |
| return { | |
| 'success': False, | |
| 'fallback': False, | |
| 'error': str(e), | |
| 'message': 'Unable to extract direct download links.', | |
| 'platform_suggestion': suggestion, | |
| 'formats': [], | |
| 'instruction': 'Please check if the content is public, accessible, and supports downloads.' | |
| } | |
| def detect_platform(self, url): | |
| """Detect the platform from URL""" | |
| platforms = { | |
| 'youtube.com': 'YouTube', | |
| 'youtu.be': 'YouTube', | |
| 'vimeo.com': 'Vimeo', | |
| 'dailymotion.com': 'Dailymotion', | |
| 'twitch.tv': 'Twitch', | |
| 'tiktok.com': 'TikTok', | |
| 'instagram.com': 'Instagram', | |
| 'twitter.com': 'Twitter', | |
| 'x.com': 'Twitter', | |
| 'facebook.com': 'Facebook', | |
| 'reddit.com': 'Reddit', | |
| 'soundcloud.com': 'SoundCloud', | |
| 'spotify.com': 'Spotify', | |
| 'bandcamp.com': 'Bandcamp' | |
| } | |
| url_lower = url.lower() | |
| for domain, platform in platforms.items(): | |
| if domain in url_lower: | |
| return platform | |
| return 'Unknown' | |
| def extract_youtube_id(self, url): | |
| """Extract YouTube video ID from URL""" | |
| import re | |
| # Patterns for YouTube URLs | |
| patterns = [ | |
| r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([^&\n?#]+)', | |
| r'youtube\.com/v/([^&\n?#]+)', | |
| r'youtube\.com/.*[?&]v=([^&\n?#]+)' | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, url) | |
| if match: | |
| return match.group(1) | |
| return None | |
| def determine_format_type(self, fmt): | |
| """Determine the type of media from format information""" | |
| vcodec = fmt.get('vcodec', 'none') | |
| acodec = fmt.get('acodec', 'none') | |
| if vcodec != 'none': | |
| return 'video' | |
| elif acodec != 'none': | |
| return 'audio' | |
| else: | |
| # Check for images | |
| ext = fmt.get('ext', '').lower() | |
| if ext in ['jpg', 'jpeg', 'png', 'webp', 'gif']: | |
| return 'image' | |
| return 'other' | |
| def get_quality_label(self, fmt): | |
| """Get a human-readable quality label""" | |
| width = fmt.get('width') | |
| height = fmt.get('height') | |
| fps = fmt.get('fps') | |
| ext = fmt.get('ext', '') | |
| if height: | |
| quality = f"{height}p" | |
| if fps and fps > 30: | |
| quality += f"{int(fps)}" | |
| return quality | |
| elif width: | |
| return f"{width}px" | |
| elif ext: | |
| return ext.upper() | |
| else: | |
| return 'Unknown' | |
| def get_format_description(self, fmt, format_type): | |
| """Get a description for the format""" | |
| quality = self.get_quality_label(fmt) | |
| ext = fmt.get('ext', 'unknown') | |
| filesize = fmt.get('filesize') or fmt.get('filesize_approx') | |
| desc_parts = [f"{format_type.capitalize()} {quality}"] | |
| if ext and ext != 'unknown': | |
| desc_parts.append(f"({ext.upper()})") | |
| if filesize: | |
| size_str = self.format_file_size(filesize) | |
| desc_parts.append(f"- {size_str}") | |
| return " ".join(desc_parts) | |
| def select_best_quality(self, urls): | |
| """Select the best quality URL from a list""" | |
| if not urls: | |
| return None | |
| # For video: prefer higher resolution, then better codec | |
| # For audio: prefer higher bitrate | |
| # For images: prefer higher resolution | |
| if urls[0]['type'] == 'video': | |
| # Sort by height (resolution), then by filesize (assuming larger is better quality) | |
| sorted_urls = sorted(urls, key=lambda x: ( | |
| x.get('height', 0) or 0, | |
| x.get('filesize', 0) or 0 | |
| ), reverse=True) | |
| return sorted_urls[0] | |
| elif urls[0]['type'] == 'audio': | |
| # Sort by filesize (bitrate) | |
| sorted_urls = sorted(urls, key=lambda x: x.get('filesize', 0) or 0, reverse=True) | |
| return sorted_urls[0] | |
| elif urls[0]['type'] == 'image': | |
| # Sort by resolution | |
| sorted_urls = sorted(urls, key=lambda x: ( | |
| x.get('height', 0) or 0, | |
| x.get('width', 0) or 0 | |
| ), reverse=True) | |
| return sorted_urls[0] | |
| # Default: return first | |
| return urls[0] | |
| def start_download(self, url, format_id, download_id): | |
| """Start downloading with progress tracking""" | |
| try: | |
| import yt_dlp | |
| # Create output directory | |
| output_dir = 'downloads' | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Setup progress hook | |
| def progress_hook(d): | |
| if d['status'] == 'downloading': | |
| if 'total_bytes' in d: | |
| percentage = d['downloaded_bytes'] / d['total_bytes'] * 100 | |
| elif 'total_bytes_estimate' in d: | |
| percentage = d['downloaded_bytes'] / d['total_bytes_estimate'] * 100 | |
| else: | |
| percentage = 0 | |
| active_downloads[download_id] = { | |
| 'status': 'downloading', | |
| 'percentage': percentage, | |
| 'speed': d.get('speed', 0), | |
| 'eta': d.get('eta', 0), | |
| 'filename': d.get('filename', ''), | |
| 'downloaded_bytes': d.get('downloaded_bytes', 0), | |
| 'total_bytes': d.get('total_bytes', 0) | |
| } | |
| elif d['status'] == 'finished': | |
| active_downloads[download_id] = { | |
| 'status': 'finished', | |
| 'filename': d.get('filename', ''), | |
| 'completed': True | |
| } | |
| ydl_opts = { | |
| 'outtmpl': f'{output_dir}/%(title)s.%(ext)s', | |
| 'format': format_id if format_id != 'direct' else 'best', | |
| 'progress_hooks': [progress_hook], | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([url]) | |
| # Move to history | |
| if download_id in active_downloads: | |
| download_info = active_downloads[download_id].copy() | |
| download_info.update({ | |
| 'url': url, | |
| 'format_id': format_id, | |
| 'download_id': download_id, | |
| 'start_time': datetime.now().isoformat() | |
| }) | |
| download_history.append(download_info) | |
| del active_downloads[download_id] | |
| except Exception as e: | |
| logger.error(f"Download error: {e}") | |
| active_downloads[download_id] = { | |
| 'status': 'error', | |
| 'error': str(e) | |
| } | |
| # Initialize manager | |
| ytdlp_manager = YTDLPManager() | |
| # Background update checker | |
| def periodic_update_check(): | |
| """Check for updates every 24 hours""" | |
| while True: | |
| try: | |
| time.sleep(24 * 60 * 60) # 24 hours | |
| ytdlp_manager.update_yt_dlp() | |
| except Exception as e: | |
| logger.error(f"Periodic update check failed: {e}") | |
| # Start background thread for updates | |
| update_thread = threading.Thread(target=periodic_update_check, daemon=True) | |
| update_thread.start() | |
| # Frontend Routes | |
| def serve_frontend(): | |
| """Serve the main frontend page""" | |
| return send_from_directory('.', 'index.html') | |
| def serve_css(): | |
| """Serve CSS file""" | |
| return send_from_directory('.', 'style.css') | |
| def serve_js(): | |
| """Serve JavaScript file""" | |
| return send_from_directory('.', 'script.js') | |
| # API Routes | |
| def health_check(): | |
| """Health check endpoint""" | |
| return jsonify({ | |
| 'status': 'healthy', | |
| 'yt_dlp_version': yt_dlp_version, | |
| 'last_update_check': last_update_check.isoformat() if last_update_check else None, | |
| 'active_downloads': len(active_downloads), | |
| 'queue_size': len(download_queue) | |
| }) | |
| def get_formats(): | |
| """Extract available formats from URL""" | |
| try: | |
| data = request.get_json() | |
| if not data or 'url' not in data: | |
| raise BadRequest("URL is required") | |
| url = data['url'].strip() | |
| if not url: | |
| raise BadRequest("URL cannot be empty") | |
| # Basic URL validation | |
| if not url.startswith(('http://', 'https://')): | |
| raise BadRequest("URL must start with http:// or https://") | |
| logger.info(f"Extracting formats from: {url}") | |
| result = ytdlp_manager.get_formats(url) | |
| return jsonify(result) | |
| except BadRequest as e: | |
| return jsonify({'success': False, 'error': str(e)}), 400 | |
| except Exception as e: | |
| logger.error(f"Format extraction error: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Internal server error', | |
| 'message': 'An error occurred while processing your request.' | |
| }), 500 | |
| def start_download(): | |
| """Start a download""" | |
| try: | |
| data = request.get_json() | |
| if not data or 'url' not in data or 'format_id' not in data: | |
| raise BadRequest("URL and format_id are required") | |
| url = data['url'].strip() | |
| format_id = data['format_id'] | |
| download_id = data.get('download_id', f"dl_{int(time.time())}") | |
| if not url.startswith(('http://', 'https://')): | |
| raise BadRequest("Invalid URL format") | |
| logger.info(f"Starting download: {url} with format: {format_id}") | |
| # Start download in background thread | |
| download_thread = threading.Thread( | |
| target=ytdlp_manager.start_download, | |
| args=(url, format_id, download_id) | |
| ) | |
| download_thread.start() | |
| return jsonify({ | |
| 'success': True, | |
| 'download_id': download_id, | |
| 'message': 'Download started' | |
| }) | |
| except BadRequest as e: | |
| return jsonify({'success': False, 'error': str(e)}), 400 | |
| except Exception as e: | |
| logger.error(f"Download start error: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Internal server error' | |
| }), 500 | |
| def get_download_progress(download_id): | |
| """Get download progress""" | |
| if download_id in active_downloads: | |
| return jsonify({ | |
| 'success': True, | |
| 'download_id': download_id, | |
| 'progress': active_downloads[download_id] | |
| }) | |
| else: | |
| return jsonify({ | |
| 'success': False, | |
| 'message': 'Download not found' | |
| }), 404 | |
| def get_active_downloads(): | |
| """Get all active downloads""" | |
| return jsonify({ | |
| 'success': True, | |
| 'downloads': list(active_downloads.values()) | |
| }) | |
| def get_download_history(): | |
| """Get download history""" | |
| limit = request.args.get('limit', 50, type=int) | |
| return jsonify({ | |
| 'success': True, | |
| 'history': download_history[-limit:] | |
| }) | |
| def manual_update(): | |
| """Manually trigger yt-dlp update""" | |
| try: | |
| success = ytdlp_manager.update_yt_dlp() | |
| return jsonify({ | |
| 'success': success, | |
| 'version': yt_dlp_version, | |
| 'message': 'yt-dlp updated successfully' if success else 'Update check completed' | |
| }) | |
| except Exception as e: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': str(e) | |
| }), 500 | |
| def get_supported_platforms(): | |
| """Get list of supported platforms""" | |
| platforms = [ | |
| {'name': 'YouTube', 'domains': ['youtube.com', 'youtu.be']}, | |
| {'name': 'Vimeo', 'domains': ['vimeo.com']}, | |
| {'name': 'Dailymotion', 'domains': ['dailymotion.com']}, | |
| {'name': 'Twitch', 'domains': ['twitch.tv']}, | |
| {'name': 'TikTok', 'domains': ['tiktok.com']}, | |
| {'name': 'Instagram', 'domains': ['instagram.com']}, | |
| {'name': 'Twitter', 'domains': ['twitter.com', 'x.com']}, | |
| {'name': 'Facebook', 'domains': ['facebook.com']}, | |
| {'name': 'Reddit', 'domains': ['reddit.com']}, | |
| {'name': 'SoundCloud', 'domains': ['soundcloud.com']}, | |
| {'name': 'Spotify', 'domains': ['spotify.com']}, | |
| {'name': 'Bandcamp', 'domains': ['bandcamp.com']} | |
| ] | |
| return jsonify({ | |
| 'success': True, | |
| 'platforms': platforms, | |
| 'yt_dlp_version': yt_dlp_version | |
| }) | |
| def download_file(filename): | |
| """Serve downloaded files""" | |
| try: | |
| file_path = os.path.join('downloads', filename) | |
| if os.path.exists(file_path): | |
| return send_file(file_path, as_attachment=True) | |
| else: | |
| return jsonify({'error': 'File not found'}), 404 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| # Error handlers | |
| def not_found(error): | |
| return jsonify({'error': 'Endpoint not found'}), 404 | |
| def internal_error(error): | |
| return jsonify({'error': 'Internal server error'}), 500 | |
| if __name__ == '__main__': | |
| port = int(os.environ.get('PORT', 5000)) | |
| debug = os.environ.get('DEBUG', 'False').lower() == 'true' | |
| logger.info(f"Starting Universal Media Downloader API on port {port}") | |
| app.run(host='0.0.0.0', port=port, debug=debug) |