MaxonML's picture
Upload app.py
bba6d0a verified
import base64
import os
import shutil
import subprocess
import uuid
import zipfile
import cv2
import numpy as np
from flask import Flask, jsonify, render_template, request, send_file
from werkzeug.utils import secure_filename
app = Flask(__name__)
UPLOAD_FOLDER = "/tmp/uploads"
PROCESSED_FOLDER = "/tmp/processed"
app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER
app.config["PROCESSED_FOLDER"] = PROCESSED_FOLDER
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(PROCESSED_FOLDER, exist_ok=True)
# Try imageio_ffmpeg (local dev), fall back to system ffmpeg (HF Spaces / Docker)
try:
import imageio_ffmpeg
FFMPEG_EXE = imageio_ffmpeg.get_ffmpeg_exe()
except ImportError:
FFMPEG_EXE = shutil.which("ffmpeg") or "ffmpeg"
@app.route("/")
def index():
return render_template("index.html")
@app.route("/upload", methods=["POST"])
def upload_files():
if "videos" not in request.files:
return jsonify({"error": "No files"}), 400
files = request.files.getlist("videos")
saved_files = []
frame_url = None
orig_w, orig_h = 0, 0
for idx, file in enumerate(files):
if file.filename:
filename = secure_filename(file.filename)
unique_filename = f"{uuid.uuid4().hex}_{filename}"
filepath = os.path.join(app.config["UPLOAD_FOLDER"], unique_filename)
file.save(filepath)
saved_files.append(unique_filename)
if idx == 0:
cap = cv2.VideoCapture(filepath)
orig_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
orig_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
ret, frame = cap.read()
cap.release()
if ret:
frame_filename = f"{unique_filename}.jpg"
frame_path = os.path.join(
app.config["UPLOAD_FOLDER"], frame_filename
)
cv2.imwrite(frame_path, frame)
frame_url = f"/uploads/{frame_filename}"
return jsonify(
{
"filenames": saved_files,
"frame_url": frame_url,
"orig_w": orig_w,
"orig_h": orig_h,
}
)
@app.route("/uploads/<filename>")
def serve_upload(filename):
return send_file(os.path.join(app.config["UPLOAD_FOLDER"], filename))
@app.route("/process", methods=["POST"])
def process_videos():
data = request.json
filenames = data.get("filenames", [])
method = data.get("method", "blur_heavy")
tool = data.get("tool", "box")
upscale = data.get("upscale", "none")
orig_w, orig_h = int(data["orig_w"]), int(data["orig_h"])
processed_files = []
# Handle coordinates based on the tool used
if tool == "box":
px, py, pw, ph = (
float(data["px"]),
float(data["py"]),
float(data["pw"]),
float(data["ph"]),
)
x = max(0, min(int(px * orig_w), orig_w - 2))
y = max(0, min(int(py * orig_h), orig_h - 2))
w = max(1, min(int(pw * orig_w), orig_w - x))
h = max(1, min(int(ph * orig_h), orig_h - y))
mask_path = None
elif tool == "brush":
# Decode the painted canvas into an image
mask_data = data.get("mask_b64", "").split(",")[1]
img_bytes = base64.b64decode(mask_data)
nparr = np.frombuffer(img_bytes, np.uint8)
mask_img = cv2.imdecode(nparr, cv2.IMREAD_UNCHANGED) # Reads RGBA
# Extract the alpha channel (where the user painted)
alpha_channel = mask_img[:, :, 3]
full_mask = cv2.resize(
alpha_channel, (orig_w, orig_h), interpolation=cv2.INTER_NEAREST
)
# Find the exact bounding box of their painting
coords = cv2.findNonZero(full_mask)
if coords is None:
return jsonify({"error": "No brush strokes detected."}), 400
x, y, w, h = cv2.boundingRect(coords)
# Crop the mask to the exact bounding box to map it to FFmpeg
mask_crop = full_mask[y : y + h, x : x + w]
mask_path = os.path.join(
app.config["UPLOAD_FOLDER"], f"mask_{uuid.uuid4().hex}.png"
)
cv2.imwrite(mask_path, mask_crop)
for filename in filenames:
input_path = os.path.join(app.config["UPLOAD_FOLDER"], filename)
# Extract original name by removing the uuid prefix (32 hex chars + underscore)
original_name = filename.split("_", 1)[1] if "_" in filename else filename
name_part, ext_part = os.path.splitext(original_name)
friendly_name = f"{name_part}_Cle{ext_part}"
output_filename = f"clean_{filename}"
output_path = os.path.join(app.config["PROCESSED_FOLDER"], output_filename)
# Build FFmpeg commands depending on the tool and method
if tool == "box":
if method == "delogo":
vf = f"delogo=x={x}:y={y}:w={w}:h={h}"
elif method == "blur_light":
vf = f"split[m][r];[r]crop={w}:{h}:{x}:{y},gblur=sigma=10[b];[m][b]overlay={x}:{y}"
elif method == "blur_heavy":
vf = f"split[m][r];[r]crop={w}:{h}:{x}:{y},gblur=sigma=35[b];[m][b]overlay={x}:{y}"
elif method == "pixelate":
pw_sq, ph_sq = max(1, w // 15), max(1, h // 15)
vf = f"split[m][r];[r]crop={w}:{h}:{x}:{y},scale={pw_sq}:{ph_sq},scale={w}:{h}:flags=neighbor[p];[m][p]overlay={x}:{y}"
elif method == "black_box":
vf = f"drawbox=x={x}:y={y}:w={w}:h={h}:color=black:t=fill"
command = [
FFMPEG_EXE,
"-y",
"-i",
input_path,
"-vf",
vf,
"-c:a",
"copy",
output_path,
]
elif tool == "brush":
# Brush uses an advanced alpha-merge complex filter to shape the blur to the paint strokes
if method == "delogo":
# Delogo strictly requires a box, so we fall back to the bounding box of the paint
command = [
FFMPEG_EXE,
"-y",
"-i",
input_path,
"-vf",
f"delogo=x={x}:y={y}:w={w}:h={h}",
"-c:a",
"copy",
output_path,
]
else:
if method == "blur_light":
effect = f"[0:v]crop={w}:{h}:{x}:{y},gblur=sigma=10[fx];"
elif method == "blur_heavy":
effect = f"[0:v]crop={w}:{h}:{x}:{y},gblur=sigma=35[fx];"
elif method == "pixelate":
pw_sq, ph_sq = max(1, w // 15), max(1, h // 15)
effect = f"[0:v]crop={w}:{h}:{x}:{y},scale={pw_sq}:{ph_sq},scale={w}:{h}:flags=neighbor[fx];"
elif method == "black_box":
effect = f"color=black:size={w}x{h}[fx];"
# Combine the cropped effect with the painted alpha mask
filter_chain = (
effect
+ f"[fx][1:v]alphamerge[al];[0:v][al]overlay={x}:{y}:shortest=1"
)
command = [
FFMPEG_EXE,
"-y",
"-i",
input_path,
"-loop",
"1",
"-i",
mask_path, # Load the painted mask as an infinite video layer
"-filter_complex",
filter_chain,
"-c:a",
"copy",
output_path,
]
try:
subprocess.run(
command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
processed_files.append((output_filename, friendly_name))
except subprocess.CalledProcessError as e:
print("FFMPEG ERROR:", e.stderr.decode())
return jsonify({"error": "FFmpeg processing failed"}), 500
# --- Optional Upscale Pass ---
if upscale != "none":
upscaled_files = []
for internal_name, friendly in processed_files:
src = os.path.join(app.config["PROCESSED_FOLDER"], internal_name)
up_filename = f"up_{internal_name}"
up_path = os.path.join(app.config["PROCESSED_FOLDER"], up_filename)
# Determine target resolution
if upscale == "1.5x":
scale_expr = "scale=iw*1.5:ih*1.5:flags=lanczos"
elif upscale == "2x":
scale_expr = "scale=iw*2:ih*2:flags=lanczos"
elif upscale == "4k":
scale_expr = "scale=3840:2160:force_original_aspect_ratio=decrease:flags=lanczos,pad=3840:2160:(ow-iw)/2:(oh-ih)/2"
else:
scale_expr = "scale=iw*2:ih*2:flags=lanczos"
# Lanczos upscale + light unsharp mask to sharpen
vf_upscale = f"{scale_expr},unsharp=5:5:0.5:5:5:0.0"
up_cmd = [
FFMPEG_EXE,
"-y",
"-i",
src,
"-vf",
vf_upscale,
"-c:v",
"libx264",
"-crf",
"18",
"-preset",
"slow",
"-c:a",
"copy",
up_path,
]
try:
subprocess.run(
up_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
# Update friendly name to indicate upscale
fn_part, fn_ext = os.path.splitext(friendly)
upscaled_friendly = f"{fn_part}_HD{fn_ext}"
upscaled_files.append((up_filename, upscaled_friendly))
except subprocess.CalledProcessError as e:
print("UPSCALE ERROR:", e.stderr.decode())
return jsonify({"error": "Video upscaling failed"}), 500
processed_files = upscaled_files
if len(processed_files) == 1:
internal_name, friendly = processed_files[0]
return jsonify(
{"download_url": f"/download/{internal_name}", "download_name": friendly}
)
else:
zip_filename = f"bulk_processed_{uuid.uuid4().hex[:6]}.zip"
zip_path = os.path.join(app.config["PROCESSED_FOLDER"], zip_filename)
with zipfile.ZipFile(zip_path, "w") as zipf:
for internal_name, friendly in processed_files:
zipf.write(
os.path.join(app.config["PROCESSED_FOLDER"], internal_name),
friendly,
)
return jsonify(
{"download_url": f"/download/{zip_filename}", "download_name": zip_filename}
)
@app.route("/download/<filename>")
def download_file(filename):
download_name = request.args.get("name", filename)
return send_file(
os.path.join(app.config["PROCESSED_FOLDER"], filename),
as_attachment=True,
download_name=download_name,
)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860, debug=False)