| from flask import Flask, send_file, request, make_response, Response |
| import yt_dlp |
| import os |
| from urllib.parse import urlparse, unquote |
| from gallery_dl import job |
| import requests |
| from datetime import datetime |
| import tempfile |
| import shutil |
| import mimetypes |
| import uuid |
| import re |
|
|
| app = Flask(__name__) |
|
|
| class MediaDownloader: |
| def __init__(self): |
| self.temp_dir = tempfile.mkdtemp() |
| self.create_directories() |
|
|
| def create_directories(self): |
| self.image_path = os.path.join(self.temp_dir, "images") |
| self.video_path = os.path.join(self.temp_dir, "videos") |
| os.makedirs(self.image_path, exist_ok=True) |
| os.makedirs(self.video_path, exist_ok=True) |
|
|
| def cleanup(self): |
| try: |
| if os.path.exists(self.temp_dir): |
| shutil.rmtree(self.temp_dir) |
| except Exception as e: |
| print(f"Cleanup error: {str(e)}") |
|
|
| def is_valid_url(self, url): |
| try: |
| result = urlparse(url) |
| return all([result.scheme, result.netloc]) |
| except: |
| return False |
|
|
| def sanitize_filename(self, filename): |
| return re.sub(r'[<>:"/\\|?*]', '_', filename) |
|
|
| def download_video(self, url): |
| if not self.is_valid_url(url): |
| return None |
|
|
| unique_id = str(uuid.uuid4()) |
| output_template = os.path.join(self.video_path, f'{unique_id}.%(ext)s') |
| |
| ydl_opts = { |
| 'format': 'best', |
| 'outtmpl': output_template, |
| 'ignoreerrors': True, |
| 'no_warnings': True, |
| 'quiet': True, |
| 'extract_flat': False, |
| 'http_headers': { |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' |
| } |
| } |
|
|
| try: |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| info = ydl.extract_info(url, download=True) |
| if info: |
| filename = ydl.prepare_filename(info) |
| return filename if os.path.exists(filename) else None |
| except Exception as e: |
| print(f"Video download error: {str(e)}") |
| return None |
| return None |
|
|
| def download_images(self, url): |
| if not self.is_valid_url(url): |
| return None |
|
|
| try: |
| class UrlDl(job.Job): |
| def __init__(self, url, parent=None): |
| job.Job.__init__(self, url, parent) |
| self.urls = [] |
|
|
| def handle_url(self, url, _): |
| self.urls.append(url) |
|
|
| j = UrlDl(url) |
| j.run() |
|
|
| if not j.urls: |
| return None |
|
|
| downloaded_files = [] |
| for img_url in j.urls: |
| try: |
| response = requests.get(img_url, headers={ |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' |
| }, timeout=30) |
| response.raise_for_status() |
|
|
| unique_id = str(uuid.uuid4()) |
| ext = os.path.splitext(urlparse(img_url).path)[1] |
| if not ext or ext.lower() not in ['.jpg', '.jpeg', '.png', '.gif', '.webp']: |
| ext = '.jpg' |
| |
| filename = self.sanitize_filename(f"{unique_id}{ext}") |
| filepath = os.path.join(self.image_path, filename) |
|
|
| with open(filepath, 'wb') as f: |
| f.write(response.content) |
| downloaded_files.append(filepath) |
|
|
| except Exception as e: |
| print(f"Error downloading image {img_url}: {str(e)}") |
| continue |
|
|
| return downloaded_files if downloaded_files else None |
|
|
| except Exception as e: |
| print(f"Error in download_images: {str(e)}") |
| return None |
|
|
| def download_media(self, url): |
| if not url or not self.is_valid_url(url): |
| return None |
|
|
| video_path = self.download_video(url) |
| if video_path: |
| return [video_path] |
|
|
| image_paths = self.download_images(url) |
| if image_paths: |
| return image_paths |
|
|
| return None |
|
|
| @app.route('/health') |
| def health_check(): |
| return 'OK', 200 |
|
|
| @app.route('/') |
| def home(): |
| return """ |
| <h1>Downloader</h1> |
| <p>Use: /download?url=url</p> |
| """ |
|
|
| @app.route('/download') |
| def download(): |
| try: |
| url = request.args.get('url') |
| if not url: |
| return "No URL", 400 |
|
|
| url = unquote(url) |
| downloader = MediaDownloader() |
|
|
| try: |
| files = downloader.download_media(url) |
|
|
| if not files: |
| downloader.cleanup() |
| return "Download failed", 400 |
|
|
| if len(files) == 1: |
| response = make_response(send_file( |
| files[0], |
| as_attachment=True, |
| download_name=os.path.basename(files[0]) |
| )) |
| @response.call_on_close |
| def cleanup(): |
| downloader.cleanup() |
| return response |
| else: |
| def generate(): |
| try: |
| for file_path in files: |
| if os.path.exists(file_path): |
| with open(file_path, 'rb') as f: |
| content = f.read() |
| filename = os.path.basename(file_path) |
| yield f'--boundary\n'.encode() |
| yield f'Content-Disposition: attachment; filename="{filename}"\n'.encode() |
| yield f'Content-Type: {mimetypes.guess_type(file_path)[0]}\n'.encode() |
| yield f'Content-Length: {len(content)}\n\n'.encode() |
| yield content |
| yield b'\n' |
| yield b'--boundary--\n' |
| finally: |
| downloader.cleanup() |
|
|
| response = Response( |
| generate(), |
| mimetype='multipart/x-mixed-replace; boundary=boundary', |
| direct_passthrough=True |
| ) |
| response.headers['Content-Type'] = 'multipart/x-mixed-replace; boundary=boundary' |
| return response |
|
|
| except Exception as e: |
| downloader.cleanup() |
| return f"Download error: {str(e)}", 500 |
|
|
| except Exception as e: |
| return f"Server error: {str(e)}", 500 |
|
|
| if __name__ == '__main__': |
| app.run(host='0.0.0.0', port=7860, debug=False) |