fugthchat's picture
Update app.py
a7bd8c9 verified
import os
import uuid
import threading
import time
from flask import Flask, request, jsonify, send_file
from flask_cors import CORS
from werkzeug.utils import secure_filename
from moviepy.editor import VideoFileClip
import yt_dlp
app = Flask(__name__)
CORS(app)
app.secret_key = "hugspace-backend-secret"
# Configuration
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
UPLOAD_FOLDER = os.path.join(BASE_DIR, "uploads")
USER_FOLDERS = os.path.join(BASE_DIR, "user_files")
ALLOWED_EXTENSIONS = {"mp4", "avi", "mov", "mkv", "webm", "flv", "3gp", "wmv"}
# Ensure directories exist
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(USER_FOLDERS, exist_ok=True)
# Progress tracking
progress_store = {}
def allowed_file(filename):
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
def get_user_folder(session_id):
user_path = os.path.join(USER_FOLDERS, session_id)
os.makedirs(user_path, exist_ok=True)
return user_path
# --- TASK: COMPRESS VIDEO ---
def compress_video_task(input_path, output_path, quality, session_id):
try:
progress_store[session_id] = {"status": "processing", "progress": 10}
clip = VideoFileClip(input_path)
# Presets
presets = {
"low": {"bitrate": "500k", "height": 480},
"medium": {"bitrate": "1000k", "height": 720},
"high": {"bitrate": "2500k", "height": 1080},
}
settings = presets.get(quality, presets["medium"])
if clip.h > settings["height"]:
clip = clip.resize(height=settings["height"])
progress_store[session_id]["progress"] = 30
clip.write_videofile(
output_path,
bitrate=settings["bitrate"],
codec="libx264",
audio_codec="aac",
preset="ultrafast",
logger=None
)
clip.close()
progress_store[session_id] = {"status": "completed", "progress": 100}
except Exception as e:
progress_store[session_id] = {"status": "error", "error": str(e)}
finally:
if os.path.exists(input_path): os.remove(input_path)
# --- TASK: DOWNLOAD VIDEO ---
def download_video_task(url, output_path, session_id):
try:
progress_store[session_id] = {"status": "processing", "progress": 0}
def progress_hook(d):
if d['status'] == 'downloading':
p = d.get('_percent_str', '0%').replace('%','')
try:
progress_store[session_id]["progress"] = float(p)
except: pass
ydl_opts = {
'outtmpl': os.path.join(output_path, '%(title)s.%(ext)s'),
'format': 'best[height<=1080]',
'progress_hooks': [progress_hook],
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
progress_store[session_id] = {"status": "completed", "progress": 100}
except Exception as e:
progress_store[session_id] = {"status": "error", "error": str(e)}
# --- TASK: AUDIO EXTRACTION ---
def extract_audio_task(input_path, output_path, session_id):
try:
progress_store[session_id] = {"status": "processing", "progress": 10}
clip = VideoFileClip(input_path)
progress_store[session_id]["progress"] = 50
# Write audio
clip.audio.write_audiofile(output_path, logger=None)
clip.close()
progress_store[session_id] = {"status": "completed", "progress": 100}
except Exception as e:
progress_store[session_id] = {"status": "error", "error": str(e)}
finally:
if os.path.exists(input_path): os.remove(input_path)
# --- TASK: GIF MAKER ---
def create_gif_task(input_path, output_path, session_id):
try:
progress_store[session_id] = {"status": "processing", "progress": 10}
clip = VideoFileClip(input_path)
# Optimize for Free Tier CPU (Limit resolution and FPS)
if clip.w > 480:
clip = clip.resize(width=480)
# If video is longer than 10s, take first 10s to prevent crash
if clip.duration > 10:
clip = clip.subclip(0, 10)
progress_store[session_id]["progress"] = 40
clip.write_gif(output_path, fps=10, logger=None)
clip.close()
progress_store[session_id] = {"status": "completed", "progress": 100}
except Exception as e:
progress_store[session_id] = {"status": "error", "error": str(e)}
finally:
if os.path.exists(input_path): os.remove(input_path)
# --- ENDPOINTS ---
@app.route('/')
def home():
return jsonify({"status": "active", "service": "Fugth Video Backend"})
# UNIVERSAL UPLOAD HANDLER
@app.route('/process/<task_type>', methods=['POST'])
def process_media(task_type):
if 'file' not in request.files:
return jsonify({"error": "No file"}), 400
file = request.files['file']
session_id = request.form.get('session_id', str(uuid.uuid4()))
quality = request.form.get('quality', 'medium')
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
input_path = os.path.join(UPLOAD_FOLDER, f"{uuid.uuid4()}_{filename}")
file.save(input_path)
user_folder = get_user_folder(session_id)
# Dispatch Thread based on Task Type
if task_type == 'compress':
out_name = f"compressed_{filename}"
target = compress_video_task
args = (input_path, os.path.join(user_folder, out_name), quality, session_id)
elif task_type == 'audio':
out_name = f"{os.path.splitext(filename)[0]}.mp3"
target = extract_audio_task
args = (input_path, os.path.join(user_folder, out_name), session_id)
elif task_type == 'gif':
out_name = f"{os.path.splitext(filename)[0]}.gif"
target = create_gif_task
args = (input_path, os.path.join(user_folder, out_name), session_id)
else:
return jsonify({"error": "Invalid task"}), 400
thread = threading.Thread(target=target, args=args)
thread.start()
return jsonify({"success": True, "session_id": session_id})
return jsonify({"error": "Invalid file"}), 400
@app.route('/download_video', methods=['POST'])
def start_download():
data = request.get_json(force=True)
url = data.get('url')
session_id = data.get('session_id', str(uuid.uuid4()))
if not url: return jsonify({"error": "No URL"}), 400
user_folder = get_user_folder(session_id)
thread = threading.Thread(target=download_video_task, args=(url, user_folder, session_id))
thread.start()
return jsonify({"success": True, "session_id": session_id})
@app.route('/progress/<session_id>', methods=['GET'])
def get_progress(session_id):
data = progress_store.get(session_id, {"status": "waiting"})
return jsonify(data)
@app.route('/files/<session_id>', methods=['GET'])
def list_files(session_id):
user_folder = get_user_folder(session_id)
files = []
if os.path.exists(user_folder):
for f in os.listdir(user_folder):
path = os.path.join(user_folder, f)
size = os.path.getsize(path) / (1024 * 1024)
files.append({"name": f, "size": f"{size:.2f} MB"})
return jsonify({"files": files})
@app.route('/get_file/<session_id>/<filename>', methods=['GET'])
def get_file(session_id, filename):
user_folder = get_user_folder(session_id)
path = os.path.join(user_folder, secure_filename(filename))
if os.path.exists(path):
return send_file(path, as_attachment=True)
return jsonify({"error": "File not found"}), 404
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)