""" app.py ------ Gradio UI for the Video Watermark Remover Space (ZeroGPU). Flow: 1. Upload video → extract first frame → display in ImageEditor for mask drawing 2. User brushes over the watermark 3. Preview Crop → shows crop region + mask overlay 4. Mode: Fast (LaMa) | Quality (VACE-14B) 5. Remove Watermark → runs pipeline → output video """ from __future__ import annotations import os import shutil import tempfile from dataclasses import asdict, fields, replace from pathlib import Path import gradio as gr import numpy as np from PIL import Image, ImageDraw # --------------------------------------------------------------------------- # ZeroGPU spaces shim — import succeeds locally too # --------------------------------------------------------------------------- try: import spaces # type: ignore except ImportError: class spaces: # type: ignore @staticmethod def GPU(duration=60): def decorator(fn): return fn return decorator # --------------------------------------------------------------------------- # Pipeline imports # --------------------------------------------------------------------------- from pipeline.composite import composite_and_save, feathered_alpha from pipeline.crop import ( DEFAULT_DILATE_PX, CropRegion, build_inpaint_mask, compute_crop_region, mask_to_bbox, ) from pipeline.lama import inpaint_frames_lama_stream from pipeline.vace import ( get_prewarm_error, inpaint_frames_vace_stream, is_prewarm_done, prewarm_vace_cache, wait_for_prewarm, ) from pipeline.video import ( VideoMeta, VideoWorkspace, attach_audio, extract_first_frame_array, extract_frames, frames_to_video, probe, ) # --------------------------------------------------------------------------- # Tunables # --------------------------------------------------------------------------- # Working window the pipeline actually inpaints. Longer uploads are # accepted (up to UPLOAD_DURATION_S below) and silently trimmed during # frame extraction. PROCESS_DURATION_S = 15.0 # Maximum accepted clip length. Anything between PROCESS_DURATION_S and # UPLOAD_DURATION_S is auto-trimmed to PROCESS_DURATION_S during processing. UPLOAD_DURATION_S = 60.0 # Maximum accepted source resolution. Larger uploads are rejected outright. MAX_UPLOAD_W = 1920 MAX_UPLOAD_H = 1080 # Cap working fps for both LaMa per-frame and VACE chunked inference. At # 60 fps × 15 s the per-frame LaMa budget overflows the 240 s @spaces.GPU # lease (~270 s estimated), and VACE chunking produces ~13 chunks worth # >300 s. At 30 fps both fit comfortably. Sources above this rate are # extracted *and encoded* at PROCESS_FPS_MAX so the output mp4 duration # matches the trimmed input. PROCESS_FPS_MAX = 30.0 # Mode labels used in both the UI radio choices and the dispatch logic. # Defining them once prevents drift between the two sites. MODE_FAST = "Fast (LaMa)" MODE_QUALITY = "Quality (VACE-14B)" ALL_MODES = (MODE_FAST, MODE_QUALITY) # Kick off the ~75 GB VACE checkpoint download in a background thread so # the cache is populated before the first Quality-mode click. Idempotent # and non-blocking — the app starts serving immediately. Skip with # VACE_PREWARM=0 if you only ever use Fast/LaMa mode. prewarm_vace_cache() # --------------------------------------------------------------------------- # CSS — dark theme; lives in static/style.css for syntax highlighting & diffability # --------------------------------------------------------------------------- CSS = (Path(__file__).resolve().parent / "static" / "style.css").read_text() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _get_mask_from_editor(editor_value: dict | None) -> np.ndarray | None: """Extract a uint8 mask (H×W) from gr.ImageEditor output (Gradio 4.x). Returns ``None`` if the editor is empty *or* the user has only drawn blank pixels — callers don't need to distinguish; both mean "no usable mask drawn". The returned array is at *display* resolution (the editor canvas size), not source-frame resolution; always rescale via :func:`_rescale_mask_to_frame` before interpreting against VideoMeta. Unions all layers so drawings across multiple paint layers are preserved. Gradio 4.x layers are RGBA ndarrays. Gradio 5+ changed the format to dicts with a 'url' key — if requirements.txt ever lifts the gradio<5 cap, this function needs to be rewritten. """ if editor_value is None: return None layers = [ layer for layer in (editor_value.get("layers") or []) if isinstance(layer, np.ndarray) ] if not layers: return None combined: np.ndarray | None = None for arr in layers: if arr.ndim == 3 and arr.shape[2] == 4: channel = arr[:, :, 3] # RGBA → alpha elif arr.ndim == 3: channel = arr.max(axis=2) # RGB → luminance max elif arr.ndim == 2: channel = arr.astype(np.uint8) # already single-channel else: continue # unexpected shape — skip combined = channel if combined is None else np.maximum(combined, channel) if combined is None or combined.max() == 0: return None return combined def _rescale_mask_to_frame( raw_mask: np.ndarray, target_w: int, target_h: int, ) -> np.ndarray: """Rescale a mask from editor/display resolution to full source-frame resolution. gr.ImageEditor renders at a fixed CSS height, so the returned layer may be e.g. 854×480 for a 1920×1080 source. Using display-resolution coordinates against full-frame VideoMeta dimensions would place the crop in the wrong position. Also enforces uint8 binary (0 / 255) output regardless of the layer dtype returned by Gradio — guards against future API changes where the layer becomes float32 (0–1) rather than uint8 (0–255). """ if raw_mask.shape[0] == target_h and raw_mask.shape[1] == target_w: result = raw_mask else: result = np.array( Image.fromarray(raw_mask).resize((target_w, target_h), Image.NEAREST) ) # Normalise to binary uint8 regardless of incoming dtype return (result > 0).astype(np.uint8) * 255 def _create_crop_preview( first_frame: np.ndarray, crop_region: CropRegion, inpaint_mask: np.ndarray, ) -> np.ndarray: """Overlay crop rectangle and mask on the first frame for preview.""" img = Image.fromarray(first_frame).convert("RGBA") overlay = Image.new("RGBA", img.size, (0, 0, 0, 0)) draw = ImageDraw.Draw(overlay) cr = crop_region # Semi-transparent teal fill for crop region draw.rectangle( list(cr.pil_box), fill=(13, 148, 136, 40), outline=(45, 212, 191, 200), width=2, ) # Inpaint mask overlay (red, semi-transparent) if inpaint_mask is not None: mask_full = np.zeros((img.height, img.width), dtype=np.uint8) mask_full[cr.frame_y : cr.frame_y2, cr.frame_x : cr.frame_x2] = inpaint_mask rgba = np.zeros((img.height, img.width, 4), dtype=np.uint8) rgba[mask_full > 0] = (239, 68, 68, 140) overlay = Image.alpha_composite(overlay, Image.fromarray(rgba, mode="RGBA")) composite = Image.alpha_composite(img, overlay).convert("RGB") return np.array(composite) _VIDEO_META_FIELDS = frozenset(f.name for f in fields(VideoMeta)) def _meta_from_state(d: dict) -> VideoMeta: """Reconstruct a VideoMeta from a Gradio State dict, ignoring unknown keys. Cached browser sessions across app versions may carry extra fields that no longer exist on VideoMeta — passing them via ``**d`` would raise TypeError, so we filter to known fields. Optional fields use the dataclass defaults if missing. """ return VideoMeta(**{k: v for k, v in d.items() if k in _VIDEO_META_FIELDS}) def _compute_crop_and_mask( raw_mask: np.ndarray, meta_state: dict, context_px: int, ): """Run the mask → bbox → crop_region → inpaint_mask chain. Shared by :func:`on_preview_crop` and :func:`run_pipeline`; returns ``(meta, bbox, crop_region, inpaint_mask)``. ``raw_mask`` is at editor display resolution (whatever Gradio gave back); this function rescales it to the source-frame resolution defined by *meta_state*. """ meta = _meta_from_state(meta_state) full_mask = _rescale_mask_to_frame(raw_mask, meta.width, meta.height) bbox = mask_to_bbox(full_mask) crop_region = compute_crop_region( bbox, meta.width, meta.height, context_px=context_px, ) inpaint_mask = build_inpaint_mask( crop_region, source_mask=full_mask, dilate_px=DEFAULT_DILATE_PX, ) return meta, bbox, crop_region, inpaint_mask # --------------------------------------------------------------------------- # Callbacks # --------------------------------------------------------------------------- def on_video_upload(video_path: str | None): """Extract first frame and populate the ImageEditor.""" # Tuple shape: (editor_update, crop_preview_update, meta_state, status). def _msg(text: str): return gr.update(), gr.update(), None, text if not video_path: return _msg("Upload a video to begin.") try: meta = probe(video_path) # ── Input validation — guard against disk exhaustion on ZeroGPU ── # MAX_FRAMES catches VFR / container-less sources where ffprobe # returns N/A for duration; duration_s would be 0.0 after our # parse-fallback, so the duration check alone could let an # arbitrarily long clip through. max_frames = round(UPLOAD_DURATION_S * max(meta.fps, 1.0)) if meta.duration_s > UPLOAD_DURATION_S: return _msg( f"❌ Clip too long ({meta.duration_s:.1f}s). " f"Max {UPLOAD_DURATION_S:.0f}s; only the first " f"{PROCESS_DURATION_S:.0f}s would be processed anyway." ) if meta.frame_count > max_frames: return _msg( f"❌ Clip too long ({meta.frame_count} frames at " f"{meta.fps:.2f} fps). Max {UPLOAD_DURATION_S:.0f} seconds." ) if meta.width * meta.height > MAX_UPLOAD_W * MAX_UPLOAD_H: return _msg( f"❌ Resolution too high ({meta.width}×{meta.height}). " f"Max {MAX_UPLOAD_W}×{MAX_UPLOAD_H}." ) will_trim = meta.duration_s > PROCESS_DURATION_S will_cap_fps = meta.fps > PROCESS_FPS_MAX first_frame = extract_first_frame_array(video_path) meta_str = ( f"{meta.width}×{meta.height} · {meta.fps:.3g} fps · " f"{meta.duration_s:.1f}s · {meta.frame_count} frames" ) if meta.color_trc: meta_str += f" · {meta.color_trc}" editor_val = { "background": first_frame, "layers": [], "composite": None, } notes = [] if will_trim: notes.append( f"⚠️ Clip is {meta.duration_s:.1f}s — only the first " f"{PROCESS_DURATION_S:.0f}s will be processed." ) if will_cap_fps: notes.append( f"⚠️ Source is {meta.fps:.0f} fps — output will be " f"{PROCESS_FPS_MAX:.0f} fps to fit GPU budget." ) # Double newline between notes so they don't visually run together # in the Gradio status box. notes_str = ("\n\n" + "\n\n".join(notes)) if notes else "" return ( gr.update(value=editor_val), gr.update(value=None), asdict(meta), f"✓ Loaded — {meta_str}{notes_str}" f"\n\nNow draw over the watermark with the brush tool.", ) except Exception as e: return _msg(f"❌ Error: {e}") def on_preview_crop(editor_value: dict | None, meta_state: dict | None, context_px: int): """Compute crop region from mask and render a preview overlay.""" if meta_state is None or editor_value is None: return gr.update(), "Upload a video first." raw_mask = _get_mask_from_editor(editor_value) if raw_mask is None: return gr.update(), "⚠️ No drawing detected. Use the brush to paint over the watermark." try: meta, bbox, crop_region, inpaint_mask = _compute_crop_and_mask( raw_mask, meta_state, context_px, ) bg = editor_value.get("background") if bg is None: first_frame = np.zeros((meta.height, meta.width, 3), dtype=np.uint8) else: first_frame = np.array(Image.fromarray(np.asarray(bg)).convert("RGB")) # Ensure first_frame is at full source resolution for the overlay if first_frame.shape[1] != meta.width or first_frame.shape[0] != meta.height: first_frame = np.array( Image.fromarray(first_frame).resize( (meta.width, meta.height), Image.LANCZOS ) ) preview = _create_crop_preview(first_frame, crop_region, inpaint_mask) status = ( f"✓ Crop computed\n" f" Watermark bbox : {bbox.width}×{bbox.height} px\n" f" Crop region : {crop_region.frame_w}×{crop_region.frame_h} " f"@ ({crop_region.frame_x}, {crop_region.frame_y})\n" f" VACE target : {crop_region.target_w}×{crop_region.target_h}\n" f"\nLooks good? Hit Remove Watermark." ) return gr.update(value=preview), status except Exception as e: return gr.update(), f"❌ {e}" def on_clear_mask(editor_value: dict | None): """Clear all paint layers from the editor while preserving the loaded frame.""" if editor_value is None: return gr.update(), "Upload a video to begin." bg = editor_value.get("background") return ( gr.update(value={"background": bg, "layers": [], "composite": None}), "Mask cleared. Draw over the watermark to start again.", ) def on_snap_to_rectangle(editor_value: dict | None): """Replace the user's freehand scribble with a clean rectangle covering its bbox. Most watermarks are rectangular (corner logos, channel bugs, subtitle bars), so this gives the same coverage as careful brush-filling but in one click after a rough scribble. Reuses the existing brush — no custom tool needed. """ if editor_value is None: return gr.update(), "Upload a video to begin." raw_mask = _get_mask_from_editor(editor_value) if raw_mask is None: return ( gr.update(), "⚠️ Draw a rough scribble over the watermark first, then snap.", ) bg = editor_value.get("background") if bg is None: return gr.update(), "Upload a video first." bg_arr = np.asarray(bg) H, W = bg_arr.shape[:2] # raw_mask comes back at editor (display) resolution — same as bg ys, xs = np.where(raw_mask > 0) y1, y2 = int(ys.min()), int(ys.max()) x1, x2 = int(xs.min()), int(xs.max()) # Build an RGBA layer painted with the brush colour (#ef4444) inside the bbox. # Matching the brush colour means the editor renders it identically to a # carefully-painted rectangle. new_layer = np.zeros((H, W, 4), dtype=np.uint8) new_layer[y1:y2 + 1, x1:x2 + 1] = (239, 68, 68, 255) return ( gr.update(value={ "background": bg, "layers": [new_layer], "composite": None, }), f"✓ Snapped to {x2 - x1 + 1}×{y2 - y1 + 1} px rectangle " f"at ({x1},{y1})→({x2},{y2}). Adjust with brush/eraser if needed.", ) @spaces.GPU(duration=240) def _gpu_inpaint_lama( frame_paths: list, crop_region: CropRegion, inpaint_mask: np.ndarray, out_dir, total: int, progress, ) -> None: """LaMa branch — streams one frame at a time, never holds the full list.""" alpha = feathered_alpha(inpaint_mask) out_dir = Path(out_dir) def _prog(i: int) -> None: progress( 0.20 + 0.65 * ((i + 1) / total), desc=f"LaMa {i + 1}/{total}…", ) crops_iter = inpaint_frames_lama_stream( frame_paths, crop_region, inpaint_mask, _prog, ) composite_and_save(frame_paths, crops_iter, crop_region, alpha, out_dir) @spaces.GPU(duration=300) def _gpu_inpaint_vace( frame_paths: list, crop_region: CropRegion, inpaint_mask: np.ndarray, out_dir, progress, ) -> None: """VACE branch — chunked temporal inference with streaming output. Memory footprint per chunk (~250 MB) is independent of clip length — see :func:`pipeline.vace.inpaint_frames_vace_stream` for the chunking rationale. The chunk loop emits each frame as it's ready, so this function composites + saves frame-by-frame instead of buffering all inpainted crops in RAM. """ alpha = feathered_alpha(inpaint_mask) out_dir = Path(out_dir) progress(0.20, desc="Loading VACE-14B (first run ~30-60s; cached after)…") def _prog_chunk(ci: int, n_chunks: int) -> None: # Map chunk completion to overall pipeline progress 0.25 → 0.90. frac = (ci + 1) / max(n_chunks, 1) progress(0.25 + 0.65 * frac, desc=f"VACE chunk {ci + 1}/{n_chunks}…") crops_iter = inpaint_frames_vace_stream( frame_paths, crop_region, inpaint_mask, progress_fn=_prog_chunk, ) composite_and_save(frame_paths, crops_iter, crop_region, alpha, out_dir) def run_pipeline( video_path: str | None, editor_value: dict | None, mode: str, context_px: int, meta_state: dict | None, progress=gr.Progress(), ): """ Pipeline orchestrator — CPU work only. GPU allocation is acquired and released inside the per-mode @spaces.GPU functions (_gpu_inpaint_lama / _gpu_inpaint_vace). Frame extraction and video encoding/muxing are pure CPU/disk I/O and do not consume GPU quota. """ if video_path is None: raise gr.Error("Upload a video first.") if meta_state is None: raise gr.Error("Video metadata missing — re-upload the video.") if mode not in ALL_MODES: raise gr.Error(f"Unknown mode '{mode}'. Choose from: {ALL_MODES}") raw_mask = _get_mask_from_editor(editor_value) if raw_mask is None: raise gr.Error("Draw over the watermark before processing.") progress(0.05, desc="Computing crop region…") try: meta, _bbox, crop_region, inpaint_mask = _compute_crop_and_mask( raw_mask, meta_state, context_px, ) except ValueError as e: # mask_to_bbox / compute_crop_region raise ValueError with user- # facing messages ("drawn area too small", "watermark too large", # "frame too small", etc.). Surface as gr.Error so the UI shows a # clean red toast instead of a generic stack-traced exception. raise gr.Error(str(e)) from e # Cap working fps so per-frame LaMa and per-chunk VACE both fit within # their @spaces.GPU duration budgets at the worst-case input rate. # ``working_meta`` is what frames_to_video uses to set the output's # encode framerate — must match what extract_frames was given so the # output mp4's duration equals the trimmed input duration. working_fps = min(meta.fps, PROCESS_FPS_MAX) working_meta = replace(meta, fps=working_fps) with VideoWorkspace() as ws: try: # Preserve the original file extension so FFmpeg can detect the # container format. Gradio always adds an extension for video # uploads, but fall back to .mp4 if the path somehow has none. src_suffix = Path(video_path).suffix or ".mp4" safe_video = ws.path("source" + src_suffix) shutil.copy2(video_path, safe_video) # ── Extract frames (CFR-forced for VFR safety) ───────────── progress(0.10, desc="Extracting frames…") frame_paths = extract_frames( safe_video, ws.frames_dir, fps=working_fps, max_duration_s=PROCESS_DURATION_S, ) total = len(frame_paths) # ── GPU: inpaint + composite + save ──────────────────────── progress(0.15, desc="Starting inpainting…") if mode == MODE_FAST: _gpu_inpaint_lama( frame_paths, crop_region, inpaint_mask, ws.out_frames_dir, total, progress, ) else: # MODE_QUALITY (already validated above) # If the prewarm thread is still downloading, wait for it # CPU-side rather than burning the @spaces.GPU(duration=300) # budget on the wait. On a fresh deploy where the user # clicks Quality before prewarm finishes, this could be # several minutes; the progress message tells them what's # happening. if not is_prewarm_done(): progress(0.16, desc="Waiting for VACE checkpoint cache to finish prewarming…") wait_for_prewarm() # If prewarm raised, the cache is incomplete and the # local_files_only=True from_pretrained calls inside # _get_pipe would fail with a confusing cache-miss error. # Surface the real cause and route the user to Fast mode. err = get_prewarm_error() if err is not None: raise gr.Error( f"VACE checkpoint download failed: {err}. " f"Use Fast (LaMa) mode, or restart the Space to retry the download." ) _gpu_inpaint_vace( frame_paths, crop_region, inpaint_mask, ws.out_frames_dir, progress, ) # ── CPU: encode + mux ─────────────────────────────────────── progress(0.95, desc="Encoding video…") silent_path = ws.path("silent.mp4") frames_to_video(ws.out_frames_dir, silent_path, working_meta) # The final mp4 outlives the VideoWorkspace (returned to Gradio # for download), so it goes to the system tempdir, not ``ws``. fd, final_path_str = tempfile.mkstemp(suffix=".mp4", prefix="wm_out_") os.close(fd) final_path = Path(final_path_str) try: attach_audio(safe_video, silent_path, final_path) except Exception: final_path.unlink(missing_ok=True) raise except gr.Error: # Already a user-facing red toast; let it through unchanged. raise except (ValueError, RuntimeError) as e: # Pipeline operations (ffmpeg, validation) raise these with # readable messages — surface as gr.Error for a clean toast. raise gr.Error(f"❌ {e}") from e except Exception as e: # Unexpected errors (CUDA OOM, model crashes, OSError on disk # full, etc.) — keep the exception type prefix so debugging # signal isn't lost, but still wrap as gr.Error so the user # sees a styled toast instead of a raw stack trace. raise gr.Error(f"❌ {type(e).__name__}: {e}") from e progress(1.0, desc="Done!") return str(final_path), f"✓ Done — {total} frames processed ({mode})" # --------------------------------------------------------------------------- # UI # --------------------------------------------------------------------------- def _card_title(text: str, step: int | None = None, top_margin: bool = False) -> gr.HTML: """Render a card heading. ``step`` adds the numbered badge prefix.""" margin = ' style="margin-top:16px"' if top_margin else "" badge = f'{step}' if step is not None else "" return gr.HTML(f'
Draw over the watermark · choose a mode · get clean footage