import base64 import os import shutil import subprocess import uuid import zipfile import cv2 import numpy as np from flask import Flask, jsonify, render_template, request, send_file from werkzeug.utils import secure_filename app = Flask(__name__) UPLOAD_FOLDER = "/tmp/uploads" PROCESSED_FOLDER = "/tmp/processed" app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER app.config["PROCESSED_FOLDER"] = PROCESSED_FOLDER os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(PROCESSED_FOLDER, exist_ok=True) # Try imageio_ffmpeg (local dev), fall back to system ffmpeg (HF Spaces / Docker) try: import imageio_ffmpeg FFMPEG_EXE = imageio_ffmpeg.get_ffmpeg_exe() except ImportError: FFMPEG_EXE = shutil.which("ffmpeg") or "ffmpeg" @app.route("/") def index(): return render_template("index.html") @app.route("/upload", methods=["POST"]) def upload_files(): if "videos" not in request.files: return jsonify({"error": "No files"}), 400 files = request.files.getlist("videos") saved_files = [] frame_url = None orig_w, orig_h = 0, 0 for idx, file in enumerate(files): if file.filename: filename = secure_filename(file.filename) unique_filename = f"{uuid.uuid4().hex}_{filename}" filepath = os.path.join(app.config["UPLOAD_FOLDER"], unique_filename) file.save(filepath) saved_files.append(unique_filename) if idx == 0: cap = cv2.VideoCapture(filepath) orig_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) orig_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) ret, frame = cap.read() cap.release() if ret: frame_filename = f"{unique_filename}.jpg" frame_path = os.path.join( app.config["UPLOAD_FOLDER"], frame_filename ) cv2.imwrite(frame_path, frame) frame_url = f"/uploads/{frame_filename}" return jsonify( { "filenames": saved_files, "frame_url": frame_url, "orig_w": orig_w, "orig_h": orig_h, } ) @app.route("/uploads/") def serve_upload(filename): return send_file(os.path.join(app.config["UPLOAD_FOLDER"], filename)) @app.route("/process", methods=["POST"]) def process_videos(): data = request.json filenames = data.get("filenames", []) method = data.get("method", "blur_heavy") tool = data.get("tool", "box") upscale = data.get("upscale", "none") orig_w, orig_h = int(data["orig_w"]), int(data["orig_h"]) processed_files = [] # Handle coordinates based on the tool used if tool == "box": px, py, pw, ph = ( float(data["px"]), float(data["py"]), float(data["pw"]), float(data["ph"]), ) x = max(0, min(int(px * orig_w), orig_w - 2)) y = max(0, min(int(py * orig_h), orig_h - 2)) w = max(1, min(int(pw * orig_w), orig_w - x)) h = max(1, min(int(ph * orig_h), orig_h - y)) mask_path = None elif tool == "brush": # Decode the painted canvas into an image mask_data = data.get("mask_b64", "").split(",")[1] img_bytes = base64.b64decode(mask_data) nparr = np.frombuffer(img_bytes, np.uint8) mask_img = cv2.imdecode(nparr, cv2.IMREAD_UNCHANGED) # Reads RGBA # Extract the alpha channel (where the user painted) alpha_channel = mask_img[:, :, 3] full_mask = cv2.resize( alpha_channel, (orig_w, orig_h), interpolation=cv2.INTER_NEAREST ) # Find the exact bounding box of their painting coords = cv2.findNonZero(full_mask) if coords is None: return jsonify({"error": "No brush strokes detected."}), 400 x, y, w, h = cv2.boundingRect(coords) # Crop the mask to the exact bounding box to map it to FFmpeg mask_crop = full_mask[y : y + h, x : x + w] mask_path = os.path.join( app.config["UPLOAD_FOLDER"], f"mask_{uuid.uuid4().hex}.png" ) cv2.imwrite(mask_path, mask_crop) for filename in filenames: input_path = os.path.join(app.config["UPLOAD_FOLDER"], filename) # Extract original name by removing the uuid prefix (32 hex chars + underscore) original_name = filename.split("_", 1)[1] if "_" in filename else filename name_part, ext_part = os.path.splitext(original_name) friendly_name = f"{name_part}_Cle{ext_part}" output_filename = f"clean_{filename}" output_path = os.path.join(app.config["PROCESSED_FOLDER"], output_filename) # Build FFmpeg commands depending on the tool and method if tool == "box": if method == "delogo": vf = f"delogo=x={x}:y={y}:w={w}:h={h}" elif method == "blur_light": vf = f"split[m][r];[r]crop={w}:{h}:{x}:{y},gblur=sigma=10[b];[m][b]overlay={x}:{y}" elif method == "blur_heavy": vf = f"split[m][r];[r]crop={w}:{h}:{x}:{y},gblur=sigma=35[b];[m][b]overlay={x}:{y}" elif method == "pixelate": pw_sq, ph_sq = max(1, w // 15), max(1, h // 15) vf = f"split[m][r];[r]crop={w}:{h}:{x}:{y},scale={pw_sq}:{ph_sq},scale={w}:{h}:flags=neighbor[p];[m][p]overlay={x}:{y}" elif method == "black_box": vf = f"drawbox=x={x}:y={y}:w={w}:h={h}:color=black:t=fill" command = [ FFMPEG_EXE, "-y", "-i", input_path, "-vf", vf, "-c:a", "copy", output_path, ] elif tool == "brush": # Brush uses an advanced alpha-merge complex filter to shape the blur to the paint strokes if method == "delogo": # Delogo strictly requires a box, so we fall back to the bounding box of the paint command = [ FFMPEG_EXE, "-y", "-i", input_path, "-vf", f"delogo=x={x}:y={y}:w={w}:h={h}", "-c:a", "copy", output_path, ] else: if method == "blur_light": effect = f"[0:v]crop={w}:{h}:{x}:{y},gblur=sigma=10[fx];" elif method == "blur_heavy": effect = f"[0:v]crop={w}:{h}:{x}:{y},gblur=sigma=35[fx];" elif method == "pixelate": pw_sq, ph_sq = max(1, w // 15), max(1, h // 15) effect = f"[0:v]crop={w}:{h}:{x}:{y},scale={pw_sq}:{ph_sq},scale={w}:{h}:flags=neighbor[fx];" elif method == "black_box": effect = f"color=black:size={w}x{h}[fx];" # Combine the cropped effect with the painted alpha mask filter_chain = ( effect + f"[fx][1:v]alphamerge[al];[0:v][al]overlay={x}:{y}:shortest=1" ) command = [ FFMPEG_EXE, "-y", "-i", input_path, "-loop", "1", "-i", mask_path, # Load the painted mask as an infinite video layer "-filter_complex", filter_chain, "-c:a", "copy", output_path, ] try: subprocess.run( command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) processed_files.append((output_filename, friendly_name)) except subprocess.CalledProcessError as e: print("FFMPEG ERROR:", e.stderr.decode()) return jsonify({"error": "FFmpeg processing failed"}), 500 # --- Optional Upscale Pass --- if upscale != "none": upscaled_files = [] for internal_name, friendly in processed_files: src = os.path.join(app.config["PROCESSED_FOLDER"], internal_name) up_filename = f"up_{internal_name}" up_path = os.path.join(app.config["PROCESSED_FOLDER"], up_filename) # Determine target resolution if upscale == "1.5x": scale_expr = "scale=iw*1.5:ih*1.5:flags=lanczos" elif upscale == "2x": scale_expr = "scale=iw*2:ih*2:flags=lanczos" elif upscale == "4k": scale_expr = "scale=3840:2160:force_original_aspect_ratio=decrease:flags=lanczos,pad=3840:2160:(ow-iw)/2:(oh-ih)/2" else: scale_expr = "scale=iw*2:ih*2:flags=lanczos" # Lanczos upscale + light unsharp mask to sharpen vf_upscale = f"{scale_expr},unsharp=5:5:0.5:5:5:0.0" up_cmd = [ FFMPEG_EXE, "-y", "-i", src, "-vf", vf_upscale, "-c:v", "libx264", "-crf", "18", "-preset", "slow", "-c:a", "copy", up_path, ] try: subprocess.run( up_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) # Update friendly name to indicate upscale fn_part, fn_ext = os.path.splitext(friendly) upscaled_friendly = f"{fn_part}_HD{fn_ext}" upscaled_files.append((up_filename, upscaled_friendly)) except subprocess.CalledProcessError as e: print("UPSCALE ERROR:", e.stderr.decode()) return jsonify({"error": "Video upscaling failed"}), 500 processed_files = upscaled_files if len(processed_files) == 1: internal_name, friendly = processed_files[0] return jsonify( {"download_url": f"/download/{internal_name}", "download_name": friendly} ) else: zip_filename = f"bulk_processed_{uuid.uuid4().hex[:6]}.zip" zip_path = os.path.join(app.config["PROCESSED_FOLDER"], zip_filename) with zipfile.ZipFile(zip_path, "w") as zipf: for internal_name, friendly in processed_files: zipf.write( os.path.join(app.config["PROCESSED_FOLDER"], internal_name), friendly, ) return jsonify( {"download_url": f"/download/{zip_filename}", "download_name": zip_filename} ) @app.route("/download/") def download_file(filename): download_name = request.args.get("name", filename) return send_file( os.path.join(app.config["PROCESSED_FOLDER"], filename), as_attachment=True, download_name=download_name, ) if __name__ == "__main__": app.run(host="0.0.0.0", port=7860, debug=False)