Spaces:
Sleeping
Sleeping
File size: 4,639 Bytes
6ade7fe |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
from flask import Flask, render_template, request, send_file, jsonify, after_this_request
import yt_dlp
import io
import tempfile
import os
import threading
import uuid
import glob
import shutil
app = Flask(__name__)
tasks = {}
def progress_update(d, task_id):
if d['status'] == 'downloading':
total_bytes = d.get('total_bytes') or d.get('total_bytes_estimate')
if total_bytes:
percent = (d['downloaded_bytes'] / total_bytes) * 100
tasks[task_id]['progress'] = min(100, percent)
elif d['status'] == 'finished':
tasks[task_id]['progress'] = 100
@app.route('/')
def index():
"""Renders the main page."""
return render_template('index.html')
@app.route('/download', methods=['POST'])
def download():
"""Handles the video download request."""
video_url = request.form.get('url')
if not video_url:
return jsonify({'error': 'Please provide a video URL.'}), 400
task_id = str(uuid.uuid4())
try:
# Get info first
ydl_opts_info = {
'noplaylist': True,
}
with yt_dlp.YoutubeDL(ydl_opts_info) as ydl:
info_dict = ydl.extract_info(video_url, download=False)
video_title = info_dict.get('title', 'video')
video_ext = info_dict.get('ext', 'mp4')
filename = f"{video_title}.{video_ext}"
tasks[task_id] = {
'progress': 0,
'done': False,
'error': None,
'filepath': None,
'filename': filename,
'tmpdir': None
}
def download_in_thread():
try:
tmpdir = tempfile.mkdtemp()
tasks[task_id]['tmpdir'] = tmpdir
ydl_opts = {
'format': 'best',
'outtmpl': os.path.join(tmpdir, '%(title)s.%(ext)s'),
'noplaylist': True,
'progress_hooks': [lambda d: progress_update(d, task_id)]
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl_download:
ydl_download.download([video_url])
# Find the downloaded file
files = glob.glob(os.path.join(tmpdir, '*'))
if files:
# Assume the first (or only) file is the final one; in practice, sort by name or check size
filepath = max(files, key=os.path.getsize) # largest file as final
tasks[task_id]['filepath'] = filepath
actual_filename = os.path.basename(filepath)
tasks[task_id]['filename'] = actual_filename
tasks[task_id]['progress'] = 100
tasks[task_id]['done'] = True
except Exception as e:
tasks[task_id]['error'] = f'An error occurred during download: {str(e)}'
tasks[task_id]['done'] = True
thread = threading.Thread(target=download_in_thread, daemon=True)
thread.start()
return jsonify({'task_id': task_id})
except Exception as e:
return jsonify({'error': f'An unexpected error occurred: {str(e)}'}), 500
@app.route('/progress/<task_id>')
def get_progress(task_id):
"""Get progress for a task."""
task = tasks.get(task_id)
if not task:
return jsonify({'error': 'Task not found'}), 404
return jsonify({
'progress': task['progress'],
'done': task['done'],
'error': task['error'],
'filename': task.get('filename')
})
@app.route('/file/<task_id>')
def get_file(task_id):
"""Serve the downloaded file."""
task = tasks.get(task_id)
if not task or not task['done'] or task['error'] or not task['filepath']:
if task and task['error']:
return jsonify({'error': task['error']}), 500
return jsonify({'error': 'Download not ready or failed'}), 404
try:
ext = task['filepath'].split('.')[-1]
@after_this_request
def cleanup(response):
try:
os.unlink(task['filepath'])
if task.get('tmpdir'):
shutil.rmtree(task['tmpdir'])
except Exception:
pass
if task_id in tasks:
del tasks[task_id]
return response
return send_file(
task['filepath'],
as_attachment=True,
download_name=task['filename'],
mimetype=f'video/{ext}'
)
except Exception as e:
return jsonify({'error': f'An unexpected error occurred: {str(e)}'}), 500
if __name__ == '__main__':
app.run(debug=True) |