import os import json import uuid import zipfile import threading import time from flask import Flask, request, jsonify, send_file from yt_dlp import YoutubeDL app = Flask(__name__, static_folder=".", static_url_path="") @app.route("/") def index(): return app.send_static_file("index.html") TASKS_DIR = "tasks" if not os.path.exists(TASKS_DIR): os.makedirs(TASKS_DIR) def download_youtube(url, task_id): task_file = os.path.join(TASKS_DIR, f"{task_id}.json") def update_status(status, progress=None, total=None, error=None): with open(task_file, "w") as f: json.dump( { "status": status, "progress": progress, "total": total, "error": error, "url": url, }, f, ) try: ydl_opts = { "format": "bestaudio/best", "outtmpl": os.path.join(TASKS_DIR, f"{task_id}/%(title)s.%(ext)s"), "postprocessors": [ { "key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": "192", } ], "quiet": True, "no_warnings": True, "extract_flat": "in_playlist", "max_downloads": 50, "socket_timeout": 30, "retries": 10, "fragment_retries": 10, } update_status("analyzing") with YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) if "entries" in info: total = len(info["entries"][:50]) update_status("downloading", progress=0, total=total) for idx, entry in enumerate(info["entries"][:50]): video_url = entry["url"] video_opts = ydl_opts.copy() video_opts["outtmpl"] = os.path.join( TASKS_DIR, f"{task_id}/%(title)s.%(ext)s" ) with YoutubeDL(video_opts) as video_ydl: video_ydl.download([video_url]) update_status("downloading", progress=idx + 1, total=total) else: update_status("downloading", progress=1, total=1) ydl_opts["outtmpl"] = os.path.join( TASKS_DIR, f"{task_id}/%(title)s.%(ext)s" ) with YoutubeDL(ydl_opts) as ydl_single: ydl_single.download([url]) zip_path = os.path.join(TASKS_DIR, f"{task_id}.zip") with zipfile.ZipFile(zip_path, "w") as zipf: for root, dirs, files in os.walk(os.path.join(TASKS_DIR, task_id)): for file in files: if file.endswith(".mp3"): file_path = os.path.join(root, file) arcname = file zipf.write(file_path, arcname) update_status("completed") except Exception as e: update_status("error", error=str(e)) @app.route("/download", methods=["POST"]) def start_download(): data = request.get_json() url = data.get("url") if not url: return jsonify({"error": "URL required"}), 400 task_id = str(uuid.uuid4()) os.makedirs(os.path.join(TASKS_DIR, task_id), exist_ok=True) thread = threading.Thread(target=download_youtube, args=(url, task_id)) thread.start() return jsonify({"task_id": task_id}) @app.route("/status/", methods=["GET"]) def get_status(task_id): task_file = os.path.join(TASKS_DIR, f"{task_id}.json") if not os.path.exists(task_file): return jsonify({"error": "Task not found"}), 404 with open(task_file, "r") as f: data = json.load(f) return jsonify(data) @app.route("/download/", methods=["GET"]) def download_zip(task_id): task_file = os.path.join(TASKS_DIR, f"{task_id}.json") if not os.path.exists(task_file): return jsonify({"error": "Task not found"}), 404 with open(task_file, "r") as f: data = json.load(f) if data["status"] != "completed": return jsonify({"error": "Download not completed"}), 400 zip_path = os.path.join(TASKS_DIR, f"{task_id}.zip") return send_file(zip_path, as_attachment=True, download_name="youtube_mp3.zip") if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)