| import base64 |
| import os |
| import shutil |
| import subprocess |
| import uuid |
| import zipfile |
|
|
| import cv2 |
| import numpy as np |
| from flask import Flask, jsonify, render_template, request, send_file |
| from werkzeug.utils import secure_filename |
|
|
| app = Flask(__name__) |
|
|
| UPLOAD_FOLDER = "/tmp/uploads" |
| PROCESSED_FOLDER = "/tmp/processed" |
| app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER |
| app.config["PROCESSED_FOLDER"] = PROCESSED_FOLDER |
|
|
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) |
| os.makedirs(PROCESSED_FOLDER, exist_ok=True) |
|
|
| |
| try: |
| import imageio_ffmpeg |
|
|
| FFMPEG_EXE = imageio_ffmpeg.get_ffmpeg_exe() |
| except ImportError: |
| FFMPEG_EXE = shutil.which("ffmpeg") or "ffmpeg" |
|
|
|
|
| @app.route("/") |
| def index(): |
| return render_template("index.html") |
|
|
|
|
| @app.route("/upload", methods=["POST"]) |
| def upload_files(): |
| if "videos" not in request.files: |
| return jsonify({"error": "No files"}), 400 |
|
|
| files = request.files.getlist("videos") |
| saved_files = [] |
| frame_url = None |
| orig_w, orig_h = 0, 0 |
|
|
| for idx, file in enumerate(files): |
| if file.filename: |
| filename = secure_filename(file.filename) |
| unique_filename = f"{uuid.uuid4().hex}_{filename}" |
| filepath = os.path.join(app.config["UPLOAD_FOLDER"], unique_filename) |
| file.save(filepath) |
| saved_files.append(unique_filename) |
|
|
| if idx == 0: |
| cap = cv2.VideoCapture(filepath) |
| orig_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| orig_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| ret, frame = cap.read() |
| cap.release() |
|
|
| if ret: |
| frame_filename = f"{unique_filename}.jpg" |
| frame_path = os.path.join( |
| app.config["UPLOAD_FOLDER"], frame_filename |
| ) |
| cv2.imwrite(frame_path, frame) |
| frame_url = f"/uploads/{frame_filename}" |
|
|
| return jsonify( |
| { |
| "filenames": saved_files, |
| "frame_url": frame_url, |
| "orig_w": orig_w, |
| "orig_h": orig_h, |
| } |
| ) |
|
|
|
|
| @app.route("/uploads/<filename>") |
| def serve_upload(filename): |
| return send_file(os.path.join(app.config["UPLOAD_FOLDER"], filename)) |
|
|
|
|
| @app.route("/process", methods=["POST"]) |
| def process_videos(): |
| data = request.json |
| filenames = data.get("filenames", []) |
| method = data.get("method", "blur_heavy") |
| tool = data.get("tool", "box") |
| upscale = data.get("upscale", "none") |
|
|
| orig_w, orig_h = int(data["orig_w"]), int(data["orig_h"]) |
| processed_files = [] |
|
|
| |
| if tool == "box": |
| px, py, pw, ph = ( |
| float(data["px"]), |
| float(data["py"]), |
| float(data["pw"]), |
| float(data["ph"]), |
| ) |
| x = max(0, min(int(px * orig_w), orig_w - 2)) |
| y = max(0, min(int(py * orig_h), orig_h - 2)) |
| w = max(1, min(int(pw * orig_w), orig_w - x)) |
| h = max(1, min(int(ph * orig_h), orig_h - y)) |
| mask_path = None |
|
|
| elif tool == "brush": |
| |
| mask_data = data.get("mask_b64", "").split(",")[1] |
| img_bytes = base64.b64decode(mask_data) |
| nparr = np.frombuffer(img_bytes, np.uint8) |
| mask_img = cv2.imdecode(nparr, cv2.IMREAD_UNCHANGED) |
|
|
| |
| alpha_channel = mask_img[:, :, 3] |
| full_mask = cv2.resize( |
| alpha_channel, (orig_w, orig_h), interpolation=cv2.INTER_NEAREST |
| ) |
|
|
| |
| coords = cv2.findNonZero(full_mask) |
| if coords is None: |
| return jsonify({"error": "No brush strokes detected."}), 400 |
|
|
| x, y, w, h = cv2.boundingRect(coords) |
|
|
| |
| mask_crop = full_mask[y : y + h, x : x + w] |
| mask_path = os.path.join( |
| app.config["UPLOAD_FOLDER"], f"mask_{uuid.uuid4().hex}.png" |
| ) |
| cv2.imwrite(mask_path, mask_crop) |
|
|
| for filename in filenames: |
| input_path = os.path.join(app.config["UPLOAD_FOLDER"], filename) |
| |
| original_name = filename.split("_", 1)[1] if "_" in filename else filename |
| name_part, ext_part = os.path.splitext(original_name) |
| friendly_name = f"{name_part}_Cle{ext_part}" |
| output_filename = f"clean_{filename}" |
| output_path = os.path.join(app.config["PROCESSED_FOLDER"], output_filename) |
|
|
| |
| if tool == "box": |
| if method == "delogo": |
| vf = f"delogo=x={x}:y={y}:w={w}:h={h}" |
| elif method == "blur_light": |
| vf = f"split[m][r];[r]crop={w}:{h}:{x}:{y},gblur=sigma=10[b];[m][b]overlay={x}:{y}" |
| elif method == "blur_heavy": |
| vf = f"split[m][r];[r]crop={w}:{h}:{x}:{y},gblur=sigma=35[b];[m][b]overlay={x}:{y}" |
| elif method == "pixelate": |
| pw_sq, ph_sq = max(1, w // 15), max(1, h // 15) |
| vf = f"split[m][r];[r]crop={w}:{h}:{x}:{y},scale={pw_sq}:{ph_sq},scale={w}:{h}:flags=neighbor[p];[m][p]overlay={x}:{y}" |
| elif method == "black_box": |
| vf = f"drawbox=x={x}:y={y}:w={w}:h={h}:color=black:t=fill" |
|
|
| command = [ |
| FFMPEG_EXE, |
| "-y", |
| "-i", |
| input_path, |
| "-vf", |
| vf, |
| "-c:a", |
| "copy", |
| output_path, |
| ] |
|
|
| elif tool == "brush": |
| |
| if method == "delogo": |
| |
| command = [ |
| FFMPEG_EXE, |
| "-y", |
| "-i", |
| input_path, |
| "-vf", |
| f"delogo=x={x}:y={y}:w={w}:h={h}", |
| "-c:a", |
| "copy", |
| output_path, |
| ] |
| else: |
| if method == "blur_light": |
| effect = f"[0:v]crop={w}:{h}:{x}:{y},gblur=sigma=10[fx];" |
| elif method == "blur_heavy": |
| effect = f"[0:v]crop={w}:{h}:{x}:{y},gblur=sigma=35[fx];" |
| elif method == "pixelate": |
| pw_sq, ph_sq = max(1, w // 15), max(1, h // 15) |
| effect = f"[0:v]crop={w}:{h}:{x}:{y},scale={pw_sq}:{ph_sq},scale={w}:{h}:flags=neighbor[fx];" |
| elif method == "black_box": |
| effect = f"color=black:size={w}x{h}[fx];" |
|
|
| |
| filter_chain = ( |
| effect |
| + f"[fx][1:v]alphamerge[al];[0:v][al]overlay={x}:{y}:shortest=1" |
| ) |
|
|
| command = [ |
| FFMPEG_EXE, |
| "-y", |
| "-i", |
| input_path, |
| "-loop", |
| "1", |
| "-i", |
| mask_path, |
| "-filter_complex", |
| filter_chain, |
| "-c:a", |
| "copy", |
| output_path, |
| ] |
|
|
| try: |
| subprocess.run( |
| command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE |
| ) |
| processed_files.append((output_filename, friendly_name)) |
| except subprocess.CalledProcessError as e: |
| print("FFMPEG ERROR:", e.stderr.decode()) |
| return jsonify({"error": "FFmpeg processing failed"}), 500 |
|
|
| |
| if upscale != "none": |
| upscaled_files = [] |
| for internal_name, friendly in processed_files: |
| src = os.path.join(app.config["PROCESSED_FOLDER"], internal_name) |
| up_filename = f"up_{internal_name}" |
| up_path = os.path.join(app.config["PROCESSED_FOLDER"], up_filename) |
|
|
| |
| if upscale == "1.5x": |
| scale_expr = "scale=iw*1.5:ih*1.5:flags=lanczos" |
| elif upscale == "2x": |
| scale_expr = "scale=iw*2:ih*2:flags=lanczos" |
| elif upscale == "4k": |
| scale_expr = "scale=3840:2160:force_original_aspect_ratio=decrease:flags=lanczos,pad=3840:2160:(ow-iw)/2:(oh-ih)/2" |
| else: |
| scale_expr = "scale=iw*2:ih*2:flags=lanczos" |
|
|
| |
| vf_upscale = f"{scale_expr},unsharp=5:5:0.5:5:5:0.0" |
|
|
| up_cmd = [ |
| FFMPEG_EXE, |
| "-y", |
| "-i", |
| src, |
| "-vf", |
| vf_upscale, |
| "-c:v", |
| "libx264", |
| "-crf", |
| "18", |
| "-preset", |
| "slow", |
| "-c:a", |
| "copy", |
| up_path, |
| ] |
|
|
| try: |
| subprocess.run( |
| up_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE |
| ) |
| |
| fn_part, fn_ext = os.path.splitext(friendly) |
| upscaled_friendly = f"{fn_part}_HD{fn_ext}" |
| upscaled_files.append((up_filename, upscaled_friendly)) |
| except subprocess.CalledProcessError as e: |
| print("UPSCALE ERROR:", e.stderr.decode()) |
| return jsonify({"error": "Video upscaling failed"}), 500 |
|
|
| processed_files = upscaled_files |
|
|
| if len(processed_files) == 1: |
| internal_name, friendly = processed_files[0] |
| return jsonify( |
| {"download_url": f"/download/{internal_name}", "download_name": friendly} |
| ) |
| else: |
| zip_filename = f"bulk_processed_{uuid.uuid4().hex[:6]}.zip" |
| zip_path = os.path.join(app.config["PROCESSED_FOLDER"], zip_filename) |
| with zipfile.ZipFile(zip_path, "w") as zipf: |
| for internal_name, friendly in processed_files: |
| zipf.write( |
| os.path.join(app.config["PROCESSED_FOLDER"], internal_name), |
| friendly, |
| ) |
| return jsonify( |
| {"download_url": f"/download/{zip_filename}", "download_name": zip_filename} |
| ) |
|
|
|
|
| @app.route("/download/<filename>") |
| def download_file(filename): |
| download_name = request.args.get("name", filename) |
| return send_file( |
| os.path.join(app.config["PROCESSED_FOLDER"], filename), |
| as_attachment=True, |
| download_name=download_name, |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| app.run(host="0.0.0.0", port=7860, debug=False) |
|
|