Spaces:
Sleeping
Sleeping
| import os | |
| import uuid | |
| import threading | |
| import time | |
| from flask import Flask, request, jsonify, send_file | |
| from flask_cors import CORS | |
| from werkzeug.utils import secure_filename | |
| from moviepy.editor import VideoFileClip | |
| import yt_dlp | |
| app = Flask(__name__) | |
| CORS(app) | |
| app.secret_key = "hugspace-backend-secret" | |
| # Configuration | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| UPLOAD_FOLDER = os.path.join(BASE_DIR, "uploads") | |
| USER_FOLDERS = os.path.join(BASE_DIR, "user_files") | |
| ALLOWED_EXTENSIONS = {"mp4", "avi", "mov", "mkv", "webm", "flv", "3gp", "wmv"} | |
| # Ensure directories exist | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| os.makedirs(USER_FOLDERS, exist_ok=True) | |
| # Progress tracking | |
| progress_store = {} | |
| def allowed_file(filename): | |
| return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS | |
| def get_user_folder(session_id): | |
| user_path = os.path.join(USER_FOLDERS, session_id) | |
| os.makedirs(user_path, exist_ok=True) | |
| return user_path | |
| # --- TASK: COMPRESS VIDEO --- | |
| def compress_video_task(input_path, output_path, quality, session_id): | |
| try: | |
| progress_store[session_id] = {"status": "processing", "progress": 10} | |
| clip = VideoFileClip(input_path) | |
| # Presets | |
| presets = { | |
| "low": {"bitrate": "500k", "height": 480}, | |
| "medium": {"bitrate": "1000k", "height": 720}, | |
| "high": {"bitrate": "2500k", "height": 1080}, | |
| } | |
| settings = presets.get(quality, presets["medium"]) | |
| if clip.h > settings["height"]: | |
| clip = clip.resize(height=settings["height"]) | |
| progress_store[session_id]["progress"] = 30 | |
| clip.write_videofile( | |
| output_path, | |
| bitrate=settings["bitrate"], | |
| codec="libx264", | |
| audio_codec="aac", | |
| preset="ultrafast", | |
| logger=None | |
| ) | |
| clip.close() | |
| progress_store[session_id] = {"status": "completed", "progress": 100} | |
| except Exception as e: | |
| progress_store[session_id] = {"status": "error", "error": str(e)} | |
| finally: | |
| if os.path.exists(input_path): os.remove(input_path) | |
| # --- TASK: DOWNLOAD VIDEO --- | |
| def download_video_task(url, output_path, session_id): | |
| try: | |
| progress_store[session_id] = {"status": "processing", "progress": 0} | |
| def progress_hook(d): | |
| if d['status'] == 'downloading': | |
| p = d.get('_percent_str', '0%').replace('%','') | |
| try: | |
| progress_store[session_id]["progress"] = float(p) | |
| except: pass | |
| ydl_opts = { | |
| 'outtmpl': os.path.join(output_path, '%(title)s.%(ext)s'), | |
| 'format': 'best[height<=1080]', | |
| 'progress_hooks': [progress_hook], | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([url]) | |
| progress_store[session_id] = {"status": "completed", "progress": 100} | |
| except Exception as e: | |
| progress_store[session_id] = {"status": "error", "error": str(e)} | |
| # --- TASK: AUDIO EXTRACTION --- | |
| def extract_audio_task(input_path, output_path, session_id): | |
| try: | |
| progress_store[session_id] = {"status": "processing", "progress": 10} | |
| clip = VideoFileClip(input_path) | |
| progress_store[session_id]["progress"] = 50 | |
| # Write audio | |
| clip.audio.write_audiofile(output_path, logger=None) | |
| clip.close() | |
| progress_store[session_id] = {"status": "completed", "progress": 100} | |
| except Exception as e: | |
| progress_store[session_id] = {"status": "error", "error": str(e)} | |
| finally: | |
| if os.path.exists(input_path): os.remove(input_path) | |
| # --- TASK: GIF MAKER --- | |
| def create_gif_task(input_path, output_path, session_id): | |
| try: | |
| progress_store[session_id] = {"status": "processing", "progress": 10} | |
| clip = VideoFileClip(input_path) | |
| # Optimize for Free Tier CPU (Limit resolution and FPS) | |
| if clip.w > 480: | |
| clip = clip.resize(width=480) | |
| # If video is longer than 10s, take first 10s to prevent crash | |
| if clip.duration > 10: | |
| clip = clip.subclip(0, 10) | |
| progress_store[session_id]["progress"] = 40 | |
| clip.write_gif(output_path, fps=10, logger=None) | |
| clip.close() | |
| progress_store[session_id] = {"status": "completed", "progress": 100} | |
| except Exception as e: | |
| progress_store[session_id] = {"status": "error", "error": str(e)} | |
| finally: | |
| if os.path.exists(input_path): os.remove(input_path) | |
| # --- ENDPOINTS --- | |
| def home(): | |
| return jsonify({"status": "active", "service": "Fugth Video Backend"}) | |
| # UNIVERSAL UPLOAD HANDLER | |
| def process_media(task_type): | |
| if 'file' not in request.files: | |
| return jsonify({"error": "No file"}), 400 | |
| file = request.files['file'] | |
| session_id = request.form.get('session_id', str(uuid.uuid4())) | |
| quality = request.form.get('quality', 'medium') | |
| if file and allowed_file(file.filename): | |
| filename = secure_filename(file.filename) | |
| input_path = os.path.join(UPLOAD_FOLDER, f"{uuid.uuid4()}_{filename}") | |
| file.save(input_path) | |
| user_folder = get_user_folder(session_id) | |
| # Dispatch Thread based on Task Type | |
| if task_type == 'compress': | |
| out_name = f"compressed_{filename}" | |
| target = compress_video_task | |
| args = (input_path, os.path.join(user_folder, out_name), quality, session_id) | |
| elif task_type == 'audio': | |
| out_name = f"{os.path.splitext(filename)[0]}.mp3" | |
| target = extract_audio_task | |
| args = (input_path, os.path.join(user_folder, out_name), session_id) | |
| elif task_type == 'gif': | |
| out_name = f"{os.path.splitext(filename)[0]}.gif" | |
| target = create_gif_task | |
| args = (input_path, os.path.join(user_folder, out_name), session_id) | |
| else: | |
| return jsonify({"error": "Invalid task"}), 400 | |
| thread = threading.Thread(target=target, args=args) | |
| thread.start() | |
| return jsonify({"success": True, "session_id": session_id}) | |
| return jsonify({"error": "Invalid file"}), 400 | |
| def start_download(): | |
| data = request.get_json(force=True) | |
| url = data.get('url') | |
| session_id = data.get('session_id', str(uuid.uuid4())) | |
| if not url: return jsonify({"error": "No URL"}), 400 | |
| user_folder = get_user_folder(session_id) | |
| thread = threading.Thread(target=download_video_task, args=(url, user_folder, session_id)) | |
| thread.start() | |
| return jsonify({"success": True, "session_id": session_id}) | |
| def get_progress(session_id): | |
| data = progress_store.get(session_id, {"status": "waiting"}) | |
| return jsonify(data) | |
| def list_files(session_id): | |
| user_folder = get_user_folder(session_id) | |
| files = [] | |
| if os.path.exists(user_folder): | |
| for f in os.listdir(user_folder): | |
| path = os.path.join(user_folder, f) | |
| size = os.path.getsize(path) / (1024 * 1024) | |
| files.append({"name": f, "size": f"{size:.2f} MB"}) | |
| return jsonify({"files": files}) | |
| def get_file(session_id, filename): | |
| user_folder = get_user_folder(session_id) | |
| path = os.path.join(user_folder, secure_filename(filename)) | |
| if os.path.exists(path): | |
| return send_file(path, as_attachment=True) | |
| return jsonify({"error": "File not found"}), 404 | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) |