|
|
import os |
|
|
import json |
|
|
import uuid |
|
|
import zipfile |
|
|
import threading |
|
|
import time |
|
|
from flask import Flask, request, jsonify, send_file |
|
|
from yt_dlp import YoutubeDL |
|
|
|
|
|
app = Flask(__name__, static_folder=".", static_url_path="") |
|
|
|
|
|
|
|
|
@app.route("/") |
|
|
def index(): |
|
|
return app.send_static_file("index.html") |
|
|
|
|
|
|
|
|
TASKS_DIR = "tasks" |
|
|
if not os.path.exists(TASKS_DIR): |
|
|
os.makedirs(TASKS_DIR) |
|
|
|
|
|
|
|
|
def download_youtube(url, task_id): |
|
|
task_file = os.path.join(TASKS_DIR, f"{task_id}.json") |
|
|
|
|
|
def update_status(status, progress=None, total=None, error=None): |
|
|
with open(task_file, "w") as f: |
|
|
json.dump( |
|
|
{ |
|
|
"status": status, |
|
|
"progress": progress, |
|
|
"total": total, |
|
|
"error": error, |
|
|
"url": url, |
|
|
}, |
|
|
f, |
|
|
) |
|
|
|
|
|
try: |
|
|
ydl_opts = { |
|
|
"format": "bestaudio/best", |
|
|
"outtmpl": os.path.join(TASKS_DIR, f"{task_id}/%(title)s.%(ext)s"), |
|
|
"postprocessors": [ |
|
|
{ |
|
|
"key": "FFmpegExtractAudio", |
|
|
"preferredcodec": "mp3", |
|
|
"preferredquality": "192", |
|
|
} |
|
|
], |
|
|
"quiet": True, |
|
|
"no_warnings": True, |
|
|
"extract_flat": "in_playlist", |
|
|
"max_downloads": 50, |
|
|
"socket_timeout": 30, |
|
|
"retries": 10, |
|
|
"fragment_retries": 10, |
|
|
} |
|
|
|
|
|
update_status("analyzing") |
|
|
|
|
|
with YoutubeDL(ydl_opts) as ydl: |
|
|
info = ydl.extract_info(url, download=False) |
|
|
|
|
|
if "entries" in info: |
|
|
total = len(info["entries"][:50]) |
|
|
update_status("downloading", progress=0, total=total) |
|
|
|
|
|
for idx, entry in enumerate(info["entries"][:50]): |
|
|
video_url = entry["url"] |
|
|
video_opts = ydl_opts.copy() |
|
|
video_opts["outtmpl"] = os.path.join( |
|
|
TASKS_DIR, f"{task_id}/%(title)s.%(ext)s" |
|
|
) |
|
|
|
|
|
with YoutubeDL(video_opts) as video_ydl: |
|
|
video_ydl.download([video_url]) |
|
|
|
|
|
update_status("downloading", progress=idx + 1, total=total) |
|
|
else: |
|
|
update_status("downloading", progress=1, total=1) |
|
|
ydl_opts["outtmpl"] = os.path.join( |
|
|
TASKS_DIR, f"{task_id}/%(title)s.%(ext)s" |
|
|
) |
|
|
with YoutubeDL(ydl_opts) as ydl_single: |
|
|
ydl_single.download([url]) |
|
|
|
|
|
zip_path = os.path.join(TASKS_DIR, f"{task_id}.zip") |
|
|
with zipfile.ZipFile(zip_path, "w") as zipf: |
|
|
for root, dirs, files in os.walk(os.path.join(TASKS_DIR, task_id)): |
|
|
for file in files: |
|
|
if file.endswith(".mp3"): |
|
|
file_path = os.path.join(root, file) |
|
|
arcname = file |
|
|
zipf.write(file_path, arcname) |
|
|
|
|
|
update_status("completed") |
|
|
|
|
|
except Exception as e: |
|
|
update_status("error", error=str(e)) |
|
|
|
|
|
|
|
|
@app.route("/download", methods=["POST"]) |
|
|
def start_download(): |
|
|
data = request.get_json() |
|
|
url = data.get("url") |
|
|
|
|
|
if not url: |
|
|
return jsonify({"error": "URL required"}), 400 |
|
|
|
|
|
task_id = str(uuid.uuid4()) |
|
|
os.makedirs(os.path.join(TASKS_DIR, task_id), exist_ok=True) |
|
|
|
|
|
thread = threading.Thread(target=download_youtube, args=(url, task_id)) |
|
|
thread.start() |
|
|
|
|
|
return jsonify({"task_id": task_id}) |
|
|
|
|
|
|
|
|
@app.route("/status/<task_id>", methods=["GET"]) |
|
|
def get_status(task_id): |
|
|
task_file = os.path.join(TASKS_DIR, f"{task_id}.json") |
|
|
|
|
|
if not os.path.exists(task_file): |
|
|
return jsonify({"error": "Task not found"}), 404 |
|
|
|
|
|
with open(task_file, "r") as f: |
|
|
data = json.load(f) |
|
|
|
|
|
return jsonify(data) |
|
|
|
|
|
|
|
|
@app.route("/download/<task_id>", methods=["GET"]) |
|
|
def download_zip(task_id): |
|
|
task_file = os.path.join(TASKS_DIR, f"{task_id}.json") |
|
|
|
|
|
if not os.path.exists(task_file): |
|
|
return jsonify({"error": "Task not found"}), 404 |
|
|
|
|
|
with open(task_file, "r") as f: |
|
|
data = json.load(f) |
|
|
|
|
|
if data["status"] != "completed": |
|
|
return jsonify({"error": "Download not completed"}), 400 |
|
|
|
|
|
zip_path = os.path.join(TASKS_DIR, f"{task_id}.zip") |
|
|
|
|
|
return send_file(zip_path, as_attachment=True, download_name="youtube_mp3.zip") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
app.run(host="0.0.0.0", port=7860) |
|
|
|