Taf2023's picture
Upload 8 files
463ade3 verified
#!/usr/bin/env python3
"""
Universal Media Downloader Backend API
Built with Flask and yt-dlp for platform-agnostic media downloading
"""
import os
import sys
import json
import time
import threading
import subprocess
from datetime import datetime
from flask import Flask, request, jsonify, send_file, send_from_directory
from flask_cors import CORS
from werkzeug.exceptions import BadRequest, InternalServerError
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# Initialize Flask app
app = Flask(__name__)
CORS(app)
# Global variables
yt_dlp_version = None
last_update_check = None
download_queue = []
download_history = []
active_downloads = {}
class YTDLPManager:
"""Manages yt-dlp operations with automatic updates"""
def __init__(self):
self.ensure_ytdlp_installed()
self.update_yt_dlp()
@staticmethod
def format_file_size(bytes_size):
"""Format file size in human readable format"""
if not bytes_size or bytes_size == 0:
return "Unknown"
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_size < 1024.0:
return f"{bytes_size:.1f} {unit}"
bytes_size /= 1024.0
return f"{bytes_size:.1f} PB"
@staticmethod
def format_time(seconds):
"""Format time duration in human readable format"""
if not seconds:
return "Unknown"
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
if hours > 0:
return f"{hours}:{minutes:02d}:{secs:02d}"
else:
return f"{minutes}:{secs:02d}"
def ensure_ytdlp_installed(self):
"""Ensure yt-dlp is installed"""
try:
import yt_dlp
logger.info("yt-dlp is available")
except ImportError:
logger.info("Installing yt-dlp...")
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'yt-dlp'])
logger.info("yt-dlp installed successfully")
def update_yt_dlp(self):
"""Update yt-dlp to latest version"""
global yt_dlp_version, last_update_check
try:
logger.info("Checking for yt-dlp updates...")
result = subprocess.run([
sys.executable, '-m', 'pip', 'install', '--upgrade', 'yt-dlp'
], capture_output=True, text=True, timeout=300)
if result.returncode == 0:
# Get version info
version_result = subprocess.run([
sys.executable, '-m', 'yt_dlp', '--version'
], capture_output=True, text=True)
if version_result.returncode == 0:
yt_dlp_version = version_result.stdout.strip()
last_update_check = datetime.now()
logger.info(f"yt-dlp updated to version: {yt_dlp_version}")
return True
else:
logger.warning("Could not get yt-dlp version after update")
else:
logger.warning(f"yt-dlp update failed: {result.stderr}")
except Exception as e:
logger.error(f"Error updating yt-dlp: {e}")
return False
def get_formats(self, url):
"""Extract available formats from URL"""
try:
import yt_dlp
ydl_opts = {
'quiet': True,
'no_warnings': True,
'extract_flat': False,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
formats = []
if 'formats' in info:
for fmt in info['formats']:
if fmt.get('vcodec') != 'none' or fmt.get('acodec') != 'none':
format_info = {
'id': fmt.get('format_id', 'unknown'),
'ext': fmt.get('ext', 'unknown'),
'vcodec': fmt.get('vcodec', 'none'),
'acodec': fmt.get('acodec', 'none'),
'width': fmt.get('width'),
'height': fmt.get('height'),
'fps': fmt.get('fps'),
'filesize': fmt.get('filesize') or fmt.get('filesize_approx'),
'format_note': fmt.get('format_note', ''),
'url': fmt.get('url', ''),
'type': 'video' if fmt.get('vcodec') != 'none' else 'audio'
}
formats.append(format_info)
# Also include direct download if available
if info.get('url'):
formats.append({
'id': 'direct',
'ext': info.get('ext', 'mp4'),
'vcodec': info.get('vcodec', 'none'),
'acodec': info.get('acodec', 'none'),
'width': info.get('width'),
'height': info.get('height'),
'fps': info.get('fps'),
'filesize': info.get('filesize'),
'format_note': 'Direct',
'url': info.get('url'),
'type': 'video' if info.get('vcodec') != 'none' else 'audio'
})
return {
'success': True,
'title': info.get('title', 'Unknown Title'),
'uploader': info.get('uploader', 'Unknown Uploader'),
'duration': info.get('duration'),
'thumbnail': info.get('thumbnail'),
'formats': formats,
'platform': self.detect_platform(url),
'view_count': info.get('view_count'),
'like_count': info.get('like_count')
}
except Exception as e:
logger.error(f"Error extracting formats from {url}: {e}")
# Try to provide direct file URLs as fallback
try:
import yt_dlp
ydl_opts = {
'quiet': True,
'no_warnings': True,
'extract_flat': False,
'force_json': True
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
# Try to get basic info and available file URLs
info = ydl.extract_info(url, download=False)
if info:
# Extract basic information
title = info.get('title', 'Unknown Title')
uploader = info.get('uploader', 'Unknown Uploader')
platform = self.detect_platform(url)
# Group direct file URLs by type
video_urls = []
audio_urls = []
image_urls = []
other_urls = []
# Get thumbnail
if info.get('thumbnail'):
image_urls.append({
'type': 'thumbnail',
'quality': 'default',
'url': info.get('thumbnail'),
'description': 'Video Thumbnail'
})
# Process formats for direct URLs
if 'formats' in info:
for fmt in info['formats']:
file_url = fmt.get('url')
if file_url:
format_type = self.determine_format_type(fmt)
url_info = {
'type': format_type,
'quality': self.get_quality_label(fmt),
'url': file_url,
'format_id': fmt.get('format_id', 'unknown'),
'ext': fmt.get('ext', 'unknown'),
'filesize': fmt.get('filesize') or fmt.get('filesize_approx'),
'description': self.get_format_description(fmt, format_type)
}
if format_type == 'video':
video_urls.append(url_info)
elif format_type == 'audio':
audio_urls.append(url_info)
elif format_type == 'image':
image_urls.append(url_info)
else:
other_urls.append(url_info)
# Also include direct URL if available
if info.get('url'):
direct_url_info = {
'type': 'direct',
'quality': 'direct',
'url': info.get('url'),
'format_id': 'direct',
'ext': info.get('ext', 'unknown'),
'filesize': info.get('filesize'),
'description': 'Direct Media File'
}
media_type = 'video' if info.get('vcodec') != 'none' else 'audio'
if media_type == 'video':
video_urls.append(direct_url_info)
else:
audio_urls.append(direct_url_info)
# Select best quality for each type
best_video = self.select_best_quality(video_urls) if video_urls else None
best_audio = self.select_best_quality(audio_urls) if audio_urls else None
# Build response with best quality links
direct_download_links = []
if best_video:
direct_download_links.append(best_video)
if best_audio:
direct_download_links.append(best_audio)
# Add thumbnail if available
for img in image_urls[:2]: # Max 2 thumbnail options
direct_download_links.append(img)
# If we have limited options, show more links
if len(direct_download_links) < 3:
# Add second best video/audio if available
if len(video_urls) > 1:
direct_download_links.append(video_urls[1])
if len(audio_urls) > 1:
direct_download_links.append(audio_urls[1])
# Add other formats
for other in other_urls[:2]: # Max 2 other formats
direct_download_links.append(other)
return {
'success': False,
'fallback': True,
'direct_files': True,
'error': str(e),
'message': 'Direct file URLs found. Click to download:',
'basic_info': {
'title': title,
'uploader': uploader,
'platform': platform,
'url': url
},
'download_options': direct_download_links,
'formats': [],
'instruction': 'Click on any direct download link above to download the file immediately.'
}
except Exception as fallback_error:
logger.error(f"Direct URL extraction also failed: {fallback_error}")
# If all attempts fail, return basic error with platform-specific suggestions
platform = self.detect_platform(url)
suggestions = {
'youtube.com': 'YouTube: Content may be private, restricted, or geo-blocked',
'youtu.be': 'YouTube: Try the full YouTube URL or check content availability',
'vimeo.com': 'Vimeo: Content may require download permissions or login',
'tiktok.com': 'TikTok: Content may be private or restricted',
'instagram.com': 'Instagram: Content may be private or have download restrictions',
'twitter.com': 'Twitter/X: Content may be private or deleted',
'facebook.com': 'Facebook: Content may require login or permissions',
'default': 'Content may be private, restricted, deleted, or the platform may not support downloads'
}
suggestion = suggestions.get(platform, suggestions['default'])
return {
'success': False,
'fallback': False,
'error': str(e),
'message': 'Unable to extract direct download links.',
'platform_suggestion': suggestion,
'formats': [],
'instruction': 'Please check if the content is public, accessible, and supports downloads.'
}
def detect_platform(self, url):
"""Detect the platform from URL"""
platforms = {
'youtube.com': 'YouTube',
'youtu.be': 'YouTube',
'vimeo.com': 'Vimeo',
'dailymotion.com': 'Dailymotion',
'twitch.tv': 'Twitch',
'tiktok.com': 'TikTok',
'instagram.com': 'Instagram',
'twitter.com': 'Twitter',
'x.com': 'Twitter',
'facebook.com': 'Facebook',
'reddit.com': 'Reddit',
'soundcloud.com': 'SoundCloud',
'spotify.com': 'Spotify',
'bandcamp.com': 'Bandcamp'
}
url_lower = url.lower()
for domain, platform in platforms.items():
if domain in url_lower:
return platform
return 'Unknown'
def extract_youtube_id(self, url):
"""Extract YouTube video ID from URL"""
import re
# Patterns for YouTube URLs
patterns = [
r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([^&\n?#]+)',
r'youtube\.com/v/([^&\n?#]+)',
r'youtube\.com/.*[?&]v=([^&\n?#]+)'
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return None
def determine_format_type(self, fmt):
"""Determine the type of media from format information"""
vcodec = fmt.get('vcodec', 'none')
acodec = fmt.get('acodec', 'none')
if vcodec != 'none':
return 'video'
elif acodec != 'none':
return 'audio'
else:
# Check for images
ext = fmt.get('ext', '').lower()
if ext in ['jpg', 'jpeg', 'png', 'webp', 'gif']:
return 'image'
return 'other'
def get_quality_label(self, fmt):
"""Get a human-readable quality label"""
width = fmt.get('width')
height = fmt.get('height')
fps = fmt.get('fps')
ext = fmt.get('ext', '')
if height:
quality = f"{height}p"
if fps and fps > 30:
quality += f"{int(fps)}"
return quality
elif width:
return f"{width}px"
elif ext:
return ext.upper()
else:
return 'Unknown'
def get_format_description(self, fmt, format_type):
"""Get a description for the format"""
quality = self.get_quality_label(fmt)
ext = fmt.get('ext', 'unknown')
filesize = fmt.get('filesize') or fmt.get('filesize_approx')
desc_parts = [f"{format_type.capitalize()} {quality}"]
if ext and ext != 'unknown':
desc_parts.append(f"({ext.upper()})")
if filesize:
size_str = self.format_file_size(filesize)
desc_parts.append(f"- {size_str}")
return " ".join(desc_parts)
def select_best_quality(self, urls):
"""Select the best quality URL from a list"""
if not urls:
return None
# For video: prefer higher resolution, then better codec
# For audio: prefer higher bitrate
# For images: prefer higher resolution
if urls[0]['type'] == 'video':
# Sort by height (resolution), then by filesize (assuming larger is better quality)
sorted_urls = sorted(urls, key=lambda x: (
x.get('height', 0) or 0,
x.get('filesize', 0) or 0
), reverse=True)
return sorted_urls[0]
elif urls[0]['type'] == 'audio':
# Sort by filesize (bitrate)
sorted_urls = sorted(urls, key=lambda x: x.get('filesize', 0) or 0, reverse=True)
return sorted_urls[0]
elif urls[0]['type'] == 'image':
# Sort by resolution
sorted_urls = sorted(urls, key=lambda x: (
x.get('height', 0) or 0,
x.get('width', 0) or 0
), reverse=True)
return sorted_urls[0]
# Default: return first
return urls[0]
def start_download(self, url, format_id, download_id):
"""Start downloading with progress tracking"""
try:
import yt_dlp
# Create output directory
output_dir = 'downloads'
os.makedirs(output_dir, exist_ok=True)
# Setup progress hook
def progress_hook(d):
if d['status'] == 'downloading':
if 'total_bytes' in d:
percentage = d['downloaded_bytes'] / d['total_bytes'] * 100
elif 'total_bytes_estimate' in d:
percentage = d['downloaded_bytes'] / d['total_bytes_estimate'] * 100
else:
percentage = 0
active_downloads[download_id] = {
'status': 'downloading',
'percentage': percentage,
'speed': d.get('speed', 0),
'eta': d.get('eta', 0),
'filename': d.get('filename', ''),
'downloaded_bytes': d.get('downloaded_bytes', 0),
'total_bytes': d.get('total_bytes', 0)
}
elif d['status'] == 'finished':
active_downloads[download_id] = {
'status': 'finished',
'filename': d.get('filename', ''),
'completed': True
}
ydl_opts = {
'outtmpl': f'{output_dir}/%(title)s.%(ext)s',
'format': format_id if format_id != 'direct' else 'best',
'progress_hooks': [progress_hook],
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
# Move to history
if download_id in active_downloads:
download_info = active_downloads[download_id].copy()
download_info.update({
'url': url,
'format_id': format_id,
'download_id': download_id,
'start_time': datetime.now().isoformat()
})
download_history.append(download_info)
del active_downloads[download_id]
except Exception as e:
logger.error(f"Download error: {e}")
active_downloads[download_id] = {
'status': 'error',
'error': str(e)
}
# Initialize manager
ytdlp_manager = YTDLPManager()
# Background update checker
def periodic_update_check():
"""Check for updates every 24 hours"""
while True:
try:
time.sleep(24 * 60 * 60) # 24 hours
ytdlp_manager.update_yt_dlp()
except Exception as e:
logger.error(f"Periodic update check failed: {e}")
# Start background thread for updates
update_thread = threading.Thread(target=periodic_update_check, daemon=True)
update_thread.start()
# Frontend Routes
@app.route('/', methods=['GET'])
def serve_frontend():
"""Serve the main frontend page"""
return send_from_directory('.', 'index.html')
@app.route('/style.css', methods=['GET'])
def serve_css():
"""Serve CSS file"""
return send_from_directory('.', 'style.css')
@app.route('/script.js', methods=['GET'])
def serve_js():
"""Serve JavaScript file"""
return send_from_directory('.', 'script.js')
# API Routes
@app.route('/api/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
return jsonify({
'status': 'healthy',
'yt_dlp_version': yt_dlp_version,
'last_update_check': last_update_check.isoformat() if last_update_check else None,
'active_downloads': len(active_downloads),
'queue_size': len(download_queue)
})
@app.route('/api/formats', methods=['POST'])
def get_formats():
"""Extract available formats from URL"""
try:
data = request.get_json()
if not data or 'url' not in data:
raise BadRequest("URL is required")
url = data['url'].strip()
if not url:
raise BadRequest("URL cannot be empty")
# Basic URL validation
if not url.startswith(('http://', 'https://')):
raise BadRequest("URL must start with http:// or https://")
logger.info(f"Extracting formats from: {url}")
result = ytdlp_manager.get_formats(url)
return jsonify(result)
except BadRequest as e:
return jsonify({'success': False, 'error': str(e)}), 400
except Exception as e:
logger.error(f"Format extraction error: {e}")
return jsonify({
'success': False,
'error': 'Internal server error',
'message': 'An error occurred while processing your request.'
}), 500
@app.route('/api/download', methods=['POST'])
def start_download():
"""Start a download"""
try:
data = request.get_json()
if not data or 'url' not in data or 'format_id' not in data:
raise BadRequest("URL and format_id are required")
url = data['url'].strip()
format_id = data['format_id']
download_id = data.get('download_id', f"dl_{int(time.time())}")
if not url.startswith(('http://', 'https://')):
raise BadRequest("Invalid URL format")
logger.info(f"Starting download: {url} with format: {format_id}")
# Start download in background thread
download_thread = threading.Thread(
target=ytdlp_manager.start_download,
args=(url, format_id, download_id)
)
download_thread.start()
return jsonify({
'success': True,
'download_id': download_id,
'message': 'Download started'
})
except BadRequest as e:
return jsonify({'success': False, 'error': str(e)}), 400
except Exception as e:
logger.error(f"Download start error: {e}")
return jsonify({
'success': False,
'error': 'Internal server error'
}), 500
@app.route('/api/progress/<download_id>', methods=['GET'])
def get_download_progress(download_id):
"""Get download progress"""
if download_id in active_downloads:
return jsonify({
'success': True,
'download_id': download_id,
'progress': active_downloads[download_id]
})
else:
return jsonify({
'success': False,
'message': 'Download not found'
}), 404
@app.route('/api/downloads/active', methods=['GET'])
def get_active_downloads():
"""Get all active downloads"""
return jsonify({
'success': True,
'downloads': list(active_downloads.values())
})
@app.route('/api/downloads/history', methods=['GET'])
def get_download_history():
"""Get download history"""
limit = request.args.get('limit', 50, type=int)
return jsonify({
'success': True,
'history': download_history[-limit:]
})
@app.route('/api/update', methods=['POST'])
def manual_update():
"""Manually trigger yt-dlp update"""
try:
success = ytdlp_manager.update_yt_dlp()
return jsonify({
'success': success,
'version': yt_dlp_version,
'message': 'yt-dlp updated successfully' if success else 'Update check completed'
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@app.route('/api/supported-platforms', methods=['GET'])
def get_supported_platforms():
"""Get list of supported platforms"""
platforms = [
{'name': 'YouTube', 'domains': ['youtube.com', 'youtu.be']},
{'name': 'Vimeo', 'domains': ['vimeo.com']},
{'name': 'Dailymotion', 'domains': ['dailymotion.com']},
{'name': 'Twitch', 'domains': ['twitch.tv']},
{'name': 'TikTok', 'domains': ['tiktok.com']},
{'name': 'Instagram', 'domains': ['instagram.com']},
{'name': 'Twitter', 'domains': ['twitter.com', 'x.com']},
{'name': 'Facebook', 'domains': ['facebook.com']},
{'name': 'Reddit', 'domains': ['reddit.com']},
{'name': 'SoundCloud', 'domains': ['soundcloud.com']},
{'name': 'Spotify', 'domains': ['spotify.com']},
{'name': 'Bandcamp', 'domains': ['bandcamp.com']}
]
return jsonify({
'success': True,
'platforms': platforms,
'yt_dlp_version': yt_dlp_version
})
@app.route('/api/file/<filename>', methods=['GET'])
def download_file(filename):
"""Serve downloaded files"""
try:
file_path = os.path.join('downloads', filename)
if os.path.exists(file_path):
return send_file(file_path, as_attachment=True)
else:
return jsonify({'error': 'File not found'}), 404
except Exception as e:
return jsonify({'error': str(e)}), 500
# Error handlers
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': 'Endpoint not found'}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({'error': 'Internal server error'}), 500
if __name__ == '__main__':
port = int(os.environ.get('PORT', 5000))
debug = os.environ.get('DEBUG', 'False').lower() == 'true'
logger.info(f"Starting Universal Media Downloader API on port {port}")
app.run(host='0.0.0.0', port=port, debug=debug)