import os import uuid import threading import time from flask import Flask, request, jsonify, send_file from flask_cors import CORS from werkzeug.utils import secure_filename from moviepy.editor import VideoFileClip import yt_dlp app = Flask(__name__) CORS(app) app.secret_key = "hugspace-backend-secret" # Configuration BASE_DIR = os.path.dirname(os.path.abspath(__file__)) UPLOAD_FOLDER = os.path.join(BASE_DIR, "uploads") USER_FOLDERS = os.path.join(BASE_DIR, "user_files") ALLOWED_EXTENSIONS = {"mp4", "avi", "mov", "mkv", "webm", "flv", "3gp", "wmv"} # Ensure directories exist os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(USER_FOLDERS, exist_ok=True) # Progress tracking progress_store = {} def allowed_file(filename): return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS def get_user_folder(session_id): user_path = os.path.join(USER_FOLDERS, session_id) os.makedirs(user_path, exist_ok=True) return user_path # --- TASK: COMPRESS VIDEO --- def compress_video_task(input_path, output_path, quality, session_id): try: progress_store[session_id] = {"status": "processing", "progress": 10} clip = VideoFileClip(input_path) # Presets presets = { "low": {"bitrate": "500k", "height": 480}, "medium": {"bitrate": "1000k", "height": 720}, "high": {"bitrate": "2500k", "height": 1080}, } settings = presets.get(quality, presets["medium"]) if clip.h > settings["height"]: clip = clip.resize(height=settings["height"]) progress_store[session_id]["progress"] = 30 clip.write_videofile( output_path, bitrate=settings["bitrate"], codec="libx264", audio_codec="aac", preset="ultrafast", logger=None ) clip.close() progress_store[session_id] = {"status": "completed", "progress": 100} except Exception as e: progress_store[session_id] = {"status": "error", "error": str(e)} finally: if os.path.exists(input_path): os.remove(input_path) # --- TASK: DOWNLOAD VIDEO --- def download_video_task(url, output_path, session_id): try: progress_store[session_id] = {"status": "processing", "progress": 0} def progress_hook(d): if d['status'] == 'downloading': p = d.get('_percent_str', '0%').replace('%','') try: progress_store[session_id]["progress"] = float(p) except: pass ydl_opts = { 'outtmpl': os.path.join(output_path, '%(title)s.%(ext)s'), 'format': 'best[height<=1080]', 'progress_hooks': [progress_hook], } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) progress_store[session_id] = {"status": "completed", "progress": 100} except Exception as e: progress_store[session_id] = {"status": "error", "error": str(e)} # --- TASK: AUDIO EXTRACTION --- def extract_audio_task(input_path, output_path, session_id): try: progress_store[session_id] = {"status": "processing", "progress": 10} clip = VideoFileClip(input_path) progress_store[session_id]["progress"] = 50 # Write audio clip.audio.write_audiofile(output_path, logger=None) clip.close() progress_store[session_id] = {"status": "completed", "progress": 100} except Exception as e: progress_store[session_id] = {"status": "error", "error": str(e)} finally: if os.path.exists(input_path): os.remove(input_path) # --- TASK: GIF MAKER --- def create_gif_task(input_path, output_path, session_id): try: progress_store[session_id] = {"status": "processing", "progress": 10} clip = VideoFileClip(input_path) # Optimize for Free Tier CPU (Limit resolution and FPS) if clip.w > 480: clip = clip.resize(width=480) # If video is longer than 10s, take first 10s to prevent crash if clip.duration > 10: clip = clip.subclip(0, 10) progress_store[session_id]["progress"] = 40 clip.write_gif(output_path, fps=10, logger=None) clip.close() progress_store[session_id] = {"status": "completed", "progress": 100} except Exception as e: progress_store[session_id] = {"status": "error", "error": str(e)} finally: if os.path.exists(input_path): os.remove(input_path) # --- ENDPOINTS --- @app.route('/') def home(): return jsonify({"status": "active", "service": "Fugth Video Backend"}) # UNIVERSAL UPLOAD HANDLER @app.route('/process/', methods=['POST']) def process_media(task_type): if 'file' not in request.files: return jsonify({"error": "No file"}), 400 file = request.files['file'] session_id = request.form.get('session_id', str(uuid.uuid4())) quality = request.form.get('quality', 'medium') if file and allowed_file(file.filename): filename = secure_filename(file.filename) input_path = os.path.join(UPLOAD_FOLDER, f"{uuid.uuid4()}_{filename}") file.save(input_path) user_folder = get_user_folder(session_id) # Dispatch Thread based on Task Type if task_type == 'compress': out_name = f"compressed_{filename}" target = compress_video_task args = (input_path, os.path.join(user_folder, out_name), quality, session_id) elif task_type == 'audio': out_name = f"{os.path.splitext(filename)[0]}.mp3" target = extract_audio_task args = (input_path, os.path.join(user_folder, out_name), session_id) elif task_type == 'gif': out_name = f"{os.path.splitext(filename)[0]}.gif" target = create_gif_task args = (input_path, os.path.join(user_folder, out_name), session_id) else: return jsonify({"error": "Invalid task"}), 400 thread = threading.Thread(target=target, args=args) thread.start() return jsonify({"success": True, "session_id": session_id}) return jsonify({"error": "Invalid file"}), 400 @app.route('/download_video', methods=['POST']) def start_download(): data = request.get_json(force=True) url = data.get('url') session_id = data.get('session_id', str(uuid.uuid4())) if not url: return jsonify({"error": "No URL"}), 400 user_folder = get_user_folder(session_id) thread = threading.Thread(target=download_video_task, args=(url, user_folder, session_id)) thread.start() return jsonify({"success": True, "session_id": session_id}) @app.route('/progress/', methods=['GET']) def get_progress(session_id): data = progress_store.get(session_id, {"status": "waiting"}) return jsonify(data) @app.route('/files/', methods=['GET']) def list_files(session_id): user_folder = get_user_folder(session_id) files = [] if os.path.exists(user_folder): for f in os.listdir(user_folder): path = os.path.join(user_folder, f) size = os.path.getsize(path) / (1024 * 1024) files.append({"name": f, "size": f"{size:.2f} MB"}) return jsonify({"files": files}) @app.route('/get_file//', methods=['GET']) def get_file(session_id, filename): user_folder = get_user_folder(session_id) path = os.path.join(user_folder, secure_filename(filename)) if os.path.exists(path): return send_file(path, as_attachment=True) return jsonify({"error": "File not found"}), 404 if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)