Jack Wu
refactor: modernize type hinting by using collection ABCs and built-in generic types across the codebase
b6ba6fc | """ | |
| app.py | |
| ------ | |
| Gradio UI for the Video Watermark Remover Space (ZeroGPU). | |
| Flow: | |
| 1. Upload video β extract first frame β display in ImageEditor for mask drawing | |
| 2. User brushes over the watermark | |
| 3. Preview Crop β shows crop region + mask overlay | |
| 4. Mode: Fast (LaMa) | Quality (VACE-14B) | |
| 5. Remove Watermark β runs pipeline β output video | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import shutil | |
| import tempfile | |
| from dataclasses import asdict, fields, replace | |
| from pathlib import Path | |
| import gradio as gr | |
| import numpy as np | |
| from PIL import Image, ImageDraw | |
| # --------------------------------------------------------------------------- | |
| # ZeroGPU spaces shim β import succeeds locally too | |
| # --------------------------------------------------------------------------- | |
| try: | |
| import spaces # type: ignore | |
| except ImportError: | |
| class spaces: # type: ignore | |
| def GPU(duration=60): | |
| def decorator(fn): | |
| return fn | |
| return decorator | |
| # --------------------------------------------------------------------------- | |
| # Pipeline imports | |
| # --------------------------------------------------------------------------- | |
| from pipeline.composite import composite_and_save, feathered_alpha | |
| from pipeline.crop import ( | |
| DEFAULT_DILATE_PX, CropRegion, build_inpaint_mask, compute_crop_region, | |
| mask_to_bbox, | |
| ) | |
| from pipeline.lama import inpaint_frames_lama_stream | |
| from pipeline.vace import ( | |
| get_prewarm_error, inpaint_frames_vace_stream, is_prewarm_done, | |
| prewarm_vace_cache, wait_for_prewarm, | |
| ) | |
| from pipeline.video import ( | |
| VideoMeta, VideoWorkspace, | |
| attach_audio, extract_first_frame_array, extract_frames, frames_to_video, probe, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Tunables | |
| # --------------------------------------------------------------------------- | |
| # Working window the pipeline actually inpaints. Longer uploads are | |
| # accepted (up to UPLOAD_DURATION_S below) and silently trimmed during | |
| # frame extraction. | |
| PROCESS_DURATION_S = 15.0 | |
| # Maximum accepted clip length. Anything between PROCESS_DURATION_S and | |
| # UPLOAD_DURATION_S is auto-trimmed to PROCESS_DURATION_S during processing. | |
| UPLOAD_DURATION_S = 60.0 | |
| # Maximum accepted source resolution. Larger uploads are rejected outright. | |
| MAX_UPLOAD_W = 1920 | |
| MAX_UPLOAD_H = 1080 | |
| # Cap working fps for both LaMa per-frame and VACE chunked inference. At | |
| # 60 fps Γ 15 s the per-frame LaMa budget overflows the 240 s @spaces.GPU | |
| # lease (~270 s estimated), and VACE chunking produces ~13 chunks worth | |
| # >300 s. At 30 fps both fit comfortably. Sources above this rate are | |
| # extracted *and encoded* at PROCESS_FPS_MAX so the output mp4 duration | |
| # matches the trimmed input. | |
| PROCESS_FPS_MAX = 30.0 | |
| # Mode labels used in both the UI radio choices and the dispatch logic. | |
| # Defining them once prevents drift between the two sites. | |
| MODE_FAST = "Fast (LaMa)" | |
| MODE_QUALITY = "Quality (VACE-14B)" | |
| ALL_MODES = (MODE_FAST, MODE_QUALITY) | |
| # Kick off the ~75 GB VACE checkpoint download in a background thread so | |
| # the cache is populated before the first Quality-mode click. Idempotent | |
| # and non-blocking β the app starts serving immediately. Skip with | |
| # VACE_PREWARM=0 if you only ever use Fast/LaMa mode. | |
| prewarm_vace_cache() | |
| # --------------------------------------------------------------------------- | |
| # CSS β dark theme; lives in static/style.css for syntax highlighting & diffability | |
| # --------------------------------------------------------------------------- | |
| CSS = (Path(__file__).resolve().parent / "static" / "style.css").read_text() | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def _get_mask_from_editor(editor_value: dict | None) -> np.ndarray | None: | |
| """Extract a uint8 mask (HΓW) from gr.ImageEditor output (Gradio 4.x). | |
| Returns ``None`` if the editor is empty *or* the user has only drawn | |
| blank pixels β callers don't need to distinguish; both mean "no usable | |
| mask drawn". The returned array is at *display* resolution (the editor | |
| canvas size), not source-frame resolution; always rescale via | |
| :func:`_rescale_mask_to_frame` before interpreting against VideoMeta. | |
| Unions all layers so drawings across multiple paint layers are preserved. | |
| Gradio 4.x layers are RGBA ndarrays. Gradio 5+ changed the format to dicts | |
| with a 'url' key β if requirements.txt ever lifts the gradio<5 cap, this | |
| function needs to be rewritten. | |
| """ | |
| if editor_value is None: | |
| return None | |
| layers = [ | |
| layer for layer in (editor_value.get("layers") or []) | |
| if isinstance(layer, np.ndarray) | |
| ] | |
| if not layers: | |
| return None | |
| combined: np.ndarray | None = None | |
| for arr in layers: | |
| if arr.ndim == 3 and arr.shape[2] == 4: | |
| channel = arr[:, :, 3] # RGBA β alpha | |
| elif arr.ndim == 3: | |
| channel = arr.max(axis=2) # RGB β luminance max | |
| elif arr.ndim == 2: | |
| channel = arr.astype(np.uint8) # already single-channel | |
| else: | |
| continue # unexpected shape β skip | |
| combined = channel if combined is None else np.maximum(combined, channel) | |
| if combined is None or combined.max() == 0: | |
| return None | |
| return combined | |
| def _rescale_mask_to_frame( | |
| raw_mask: np.ndarray, | |
| target_w: int, | |
| target_h: int, | |
| ) -> np.ndarray: | |
| """Rescale a mask from editor/display resolution to full source-frame resolution. | |
| gr.ImageEditor renders at a fixed CSS height, so the returned layer may be | |
| e.g. 854Γ480 for a 1920Γ1080 source. Using display-resolution coordinates | |
| against full-frame VideoMeta dimensions would place the crop in the wrong | |
| position. | |
| Also enforces uint8 binary (0 / 255) output regardless of the layer dtype | |
| returned by Gradio β guards against future API changes where the layer | |
| becomes float32 (0β1) rather than uint8 (0β255). | |
| """ | |
| if raw_mask.shape[0] == target_h and raw_mask.shape[1] == target_w: | |
| result = raw_mask | |
| else: | |
| result = np.array( | |
| Image.fromarray(raw_mask).resize((target_w, target_h), Image.NEAREST) | |
| ) | |
| # Normalise to binary uint8 regardless of incoming dtype | |
| return (result > 0).astype(np.uint8) * 255 | |
| def _create_crop_preview( | |
| first_frame: np.ndarray, | |
| crop_region: CropRegion, | |
| inpaint_mask: np.ndarray, | |
| ) -> np.ndarray: | |
| """Overlay crop rectangle and mask on the first frame for preview.""" | |
| img = Image.fromarray(first_frame).convert("RGBA") | |
| overlay = Image.new("RGBA", img.size, (0, 0, 0, 0)) | |
| draw = ImageDraw.Draw(overlay) | |
| cr = crop_region | |
| # Semi-transparent teal fill for crop region | |
| draw.rectangle( | |
| list(cr.pil_box), | |
| fill=(13, 148, 136, 40), | |
| outline=(45, 212, 191, 200), | |
| width=2, | |
| ) | |
| # Inpaint mask overlay (red, semi-transparent) | |
| if inpaint_mask is not None: | |
| mask_full = np.zeros((img.height, img.width), dtype=np.uint8) | |
| mask_full[cr.frame_y : cr.frame_y2, cr.frame_x : cr.frame_x2] = inpaint_mask | |
| rgba = np.zeros((img.height, img.width, 4), dtype=np.uint8) | |
| rgba[mask_full > 0] = (239, 68, 68, 140) | |
| overlay = Image.alpha_composite(overlay, Image.fromarray(rgba, mode="RGBA")) | |
| composite = Image.alpha_composite(img, overlay).convert("RGB") | |
| return np.array(composite) | |
| _VIDEO_META_FIELDS = frozenset(f.name for f in fields(VideoMeta)) | |
| def _meta_from_state(d: dict) -> VideoMeta: | |
| """Reconstruct a VideoMeta from a Gradio State dict, ignoring unknown keys. | |
| Cached browser sessions across app versions may carry extra fields that | |
| no longer exist on VideoMeta β passing them via ``**d`` would raise | |
| TypeError, so we filter to known fields. Optional fields use the | |
| dataclass defaults if missing. | |
| """ | |
| return VideoMeta(**{k: v for k, v in d.items() if k in _VIDEO_META_FIELDS}) | |
| def _compute_crop_and_mask( | |
| raw_mask: np.ndarray, | |
| meta_state: dict, | |
| context_px: int, | |
| ): | |
| """Run the mask β bbox β crop_region β inpaint_mask chain. | |
| Shared by :func:`on_preview_crop` and :func:`run_pipeline`; returns | |
| ``(meta, bbox, crop_region, inpaint_mask)``. ``raw_mask`` is at editor | |
| display resolution (whatever Gradio gave back); this function rescales | |
| it to the source-frame resolution defined by *meta_state*. | |
| """ | |
| meta = _meta_from_state(meta_state) | |
| full_mask = _rescale_mask_to_frame(raw_mask, meta.width, meta.height) | |
| bbox = mask_to_bbox(full_mask) | |
| crop_region = compute_crop_region( | |
| bbox, meta.width, meta.height, context_px=context_px, | |
| ) | |
| inpaint_mask = build_inpaint_mask( | |
| crop_region, source_mask=full_mask, dilate_px=DEFAULT_DILATE_PX, | |
| ) | |
| return meta, bbox, crop_region, inpaint_mask | |
| # --------------------------------------------------------------------------- | |
| # Callbacks | |
| # --------------------------------------------------------------------------- | |
| def on_video_upload(video_path: str | None): | |
| """Extract first frame and populate the ImageEditor.""" | |
| # Tuple shape: (editor_update, crop_preview_update, meta_state, status). | |
| def _msg(text: str): | |
| return gr.update(), gr.update(), None, text | |
| if not video_path: | |
| return _msg("Upload a video to begin.") | |
| try: | |
| meta = probe(video_path) | |
| # ββ Input validation β guard against disk exhaustion on ZeroGPU ββ | |
| # MAX_FRAMES catches VFR / container-less sources where ffprobe | |
| # returns N/A for duration; duration_s would be 0.0 after our | |
| # parse-fallback, so the duration check alone could let an | |
| # arbitrarily long clip through. | |
| max_frames = round(UPLOAD_DURATION_S * max(meta.fps, 1.0)) | |
| if meta.duration_s > UPLOAD_DURATION_S: | |
| return _msg( | |
| f"β Clip too long ({meta.duration_s:.1f}s). " | |
| f"Max {UPLOAD_DURATION_S:.0f}s; only the first " | |
| f"{PROCESS_DURATION_S:.0f}s would be processed anyway." | |
| ) | |
| if meta.frame_count > max_frames: | |
| return _msg( | |
| f"β Clip too long ({meta.frame_count} frames at " | |
| f"{meta.fps:.2f} fps). Max {UPLOAD_DURATION_S:.0f} seconds." | |
| ) | |
| if meta.width * meta.height > MAX_UPLOAD_W * MAX_UPLOAD_H: | |
| return _msg( | |
| f"β Resolution too high ({meta.width}Γ{meta.height}). " | |
| f"Max {MAX_UPLOAD_W}Γ{MAX_UPLOAD_H}." | |
| ) | |
| will_trim = meta.duration_s > PROCESS_DURATION_S | |
| will_cap_fps = meta.fps > PROCESS_FPS_MAX | |
| first_frame = extract_first_frame_array(video_path) | |
| meta_str = ( | |
| f"{meta.width}Γ{meta.height} Β· {meta.fps:.3g} fps Β· " | |
| f"{meta.duration_s:.1f}s Β· {meta.frame_count} frames" | |
| ) | |
| if meta.color_trc: | |
| meta_str += f" Β· {meta.color_trc}" | |
| editor_val = { | |
| "background": first_frame, | |
| "layers": [], | |
| "composite": None, | |
| } | |
| notes = [] | |
| if will_trim: | |
| notes.append( | |
| f"β οΈ Clip is {meta.duration_s:.1f}s β only the first " | |
| f"{PROCESS_DURATION_S:.0f}s will be processed." | |
| ) | |
| if will_cap_fps: | |
| notes.append( | |
| f"β οΈ Source is {meta.fps:.0f} fps β output will be " | |
| f"{PROCESS_FPS_MAX:.0f} fps to fit GPU budget." | |
| ) | |
| # Double newline between notes so they don't visually run together | |
| # in the Gradio status box. | |
| notes_str = ("\n\n" + "\n\n".join(notes)) if notes else "" | |
| return ( | |
| gr.update(value=editor_val), | |
| gr.update(value=None), | |
| asdict(meta), | |
| f"β Loaded β {meta_str}{notes_str}" | |
| f"\n\nNow draw over the watermark with the brush tool.", | |
| ) | |
| except Exception as e: | |
| return _msg(f"β Error: {e}") | |
| def on_preview_crop(editor_value: dict | None, meta_state: dict | None, context_px: int): | |
| """Compute crop region from mask and render a preview overlay.""" | |
| if meta_state is None or editor_value is None: | |
| return gr.update(), "Upload a video first." | |
| raw_mask = _get_mask_from_editor(editor_value) | |
| if raw_mask is None: | |
| return gr.update(), "β οΈ No drawing detected. Use the brush to paint over the watermark." | |
| try: | |
| meta, bbox, crop_region, inpaint_mask = _compute_crop_and_mask( | |
| raw_mask, meta_state, context_px, | |
| ) | |
| bg = editor_value.get("background") | |
| if bg is None: | |
| first_frame = np.zeros((meta.height, meta.width, 3), dtype=np.uint8) | |
| else: | |
| first_frame = np.array(Image.fromarray(np.asarray(bg)).convert("RGB")) | |
| # Ensure first_frame is at full source resolution for the overlay | |
| if first_frame.shape[1] != meta.width or first_frame.shape[0] != meta.height: | |
| first_frame = np.array( | |
| Image.fromarray(first_frame).resize( | |
| (meta.width, meta.height), Image.LANCZOS | |
| ) | |
| ) | |
| preview = _create_crop_preview(first_frame, crop_region, inpaint_mask) | |
| status = ( | |
| f"β Crop computed\n" | |
| f" Watermark bbox : {bbox.width}Γ{bbox.height} px\n" | |
| f" Crop region : {crop_region.frame_w}Γ{crop_region.frame_h} " | |
| f"@ ({crop_region.frame_x}, {crop_region.frame_y})\n" | |
| f" VACE target : {crop_region.target_w}Γ{crop_region.target_h}\n" | |
| f"\nLooks good? Hit Remove Watermark." | |
| ) | |
| return gr.update(value=preview), status | |
| except Exception as e: | |
| return gr.update(), f"β {e}" | |
| def on_clear_mask(editor_value: dict | None): | |
| """Clear all paint layers from the editor while preserving the loaded frame.""" | |
| if editor_value is None: | |
| return gr.update(), "Upload a video to begin." | |
| bg = editor_value.get("background") | |
| return ( | |
| gr.update(value={"background": bg, "layers": [], "composite": None}), | |
| "Mask cleared. Draw over the watermark to start again.", | |
| ) | |
| def on_snap_to_rectangle(editor_value: dict | None): | |
| """Replace the user's freehand scribble with a clean rectangle covering its bbox. | |
| Most watermarks are rectangular (corner logos, channel bugs, subtitle bars), | |
| so this gives the same coverage as careful brush-filling but in one click | |
| after a rough scribble. Reuses the existing brush β no custom tool needed. | |
| """ | |
| if editor_value is None: | |
| return gr.update(), "Upload a video to begin." | |
| raw_mask = _get_mask_from_editor(editor_value) | |
| if raw_mask is None: | |
| return ( | |
| gr.update(), | |
| "β οΈ Draw a rough scribble over the watermark first, then snap.", | |
| ) | |
| bg = editor_value.get("background") | |
| if bg is None: | |
| return gr.update(), "Upload a video first." | |
| bg_arr = np.asarray(bg) | |
| H, W = bg_arr.shape[:2] | |
| # raw_mask comes back at editor (display) resolution β same as bg | |
| ys, xs = np.where(raw_mask > 0) | |
| y1, y2 = int(ys.min()), int(ys.max()) | |
| x1, x2 = int(xs.min()), int(xs.max()) | |
| # Build an RGBA layer painted with the brush colour (#ef4444) inside the bbox. | |
| # Matching the brush colour means the editor renders it identically to a | |
| # carefully-painted rectangle. | |
| new_layer = np.zeros((H, W, 4), dtype=np.uint8) | |
| new_layer[y1:y2 + 1, x1:x2 + 1] = (239, 68, 68, 255) | |
| return ( | |
| gr.update(value={ | |
| "background": bg, | |
| "layers": [new_layer], | |
| "composite": None, | |
| }), | |
| f"β Snapped to {x2 - x1 + 1}Γ{y2 - y1 + 1} px rectangle " | |
| f"at ({x1},{y1})β({x2},{y2}). Adjust with brush/eraser if needed.", | |
| ) | |
| def _gpu_inpaint_lama( | |
| frame_paths: list, | |
| crop_region: CropRegion, | |
| inpaint_mask: np.ndarray, | |
| out_dir, | |
| total: int, | |
| progress, | |
| ) -> None: | |
| """LaMa branch β streams one frame at a time, never holds the full list.""" | |
| alpha = feathered_alpha(inpaint_mask) | |
| out_dir = Path(out_dir) | |
| def _prog(i: int) -> None: | |
| progress( | |
| 0.20 + 0.65 * ((i + 1) / total), | |
| desc=f"LaMa {i + 1}/{total}β¦", | |
| ) | |
| crops_iter = inpaint_frames_lama_stream( | |
| frame_paths, crop_region, inpaint_mask, _prog, | |
| ) | |
| composite_and_save(frame_paths, crops_iter, crop_region, alpha, out_dir) | |
| def _gpu_inpaint_vace( | |
| frame_paths: list, | |
| crop_region: CropRegion, | |
| inpaint_mask: np.ndarray, | |
| out_dir, | |
| progress, | |
| ) -> None: | |
| """VACE branch β chunked temporal inference with streaming output. | |
| Memory footprint per chunk (~250 MB) is independent of clip length β | |
| see :func:`pipeline.vace.inpaint_frames_vace_stream` for the chunking | |
| rationale. The chunk loop emits each frame as it's ready, so this | |
| function composites + saves frame-by-frame instead of buffering all | |
| inpainted crops in RAM. | |
| """ | |
| alpha = feathered_alpha(inpaint_mask) | |
| out_dir = Path(out_dir) | |
| progress(0.20, desc="Loading VACE-14B (first run ~30-60s; cached after)β¦") | |
| def _prog_chunk(ci: int, n_chunks: int) -> None: | |
| # Map chunk completion to overall pipeline progress 0.25 β 0.90. | |
| frac = (ci + 1) / max(n_chunks, 1) | |
| progress(0.25 + 0.65 * frac, desc=f"VACE chunk {ci + 1}/{n_chunks}β¦") | |
| crops_iter = inpaint_frames_vace_stream( | |
| frame_paths, crop_region, inpaint_mask, progress_fn=_prog_chunk, | |
| ) | |
| composite_and_save(frame_paths, crops_iter, crop_region, alpha, out_dir) | |
| def run_pipeline( | |
| video_path: str | None, | |
| editor_value: dict | None, | |
| mode: str, | |
| context_px: int, | |
| meta_state: dict | None, | |
| progress=gr.Progress(), | |
| ): | |
| """ | |
| Pipeline orchestrator β CPU work only. | |
| GPU allocation is acquired and released inside the per-mode @spaces.GPU | |
| functions (_gpu_inpaint_lama / _gpu_inpaint_vace). Frame extraction | |
| and video encoding/muxing are pure CPU/disk I/O and do not consume | |
| GPU quota. | |
| """ | |
| if video_path is None: | |
| raise gr.Error("Upload a video first.") | |
| if meta_state is None: | |
| raise gr.Error("Video metadata missing β re-upload the video.") | |
| if mode not in ALL_MODES: | |
| raise gr.Error(f"Unknown mode '{mode}'. Choose from: {ALL_MODES}") | |
| raw_mask = _get_mask_from_editor(editor_value) | |
| if raw_mask is None: | |
| raise gr.Error("Draw over the watermark before processing.") | |
| progress(0.05, desc="Computing crop regionβ¦") | |
| try: | |
| meta, _bbox, crop_region, inpaint_mask = _compute_crop_and_mask( | |
| raw_mask, meta_state, context_px, | |
| ) | |
| except ValueError as e: | |
| # mask_to_bbox / compute_crop_region raise ValueError with user- | |
| # facing messages ("drawn area too small", "watermark too large", | |
| # "frame too small", etc.). Surface as gr.Error so the UI shows a | |
| # clean red toast instead of a generic stack-traced exception. | |
| raise gr.Error(str(e)) from e | |
| # Cap working fps so per-frame LaMa and per-chunk VACE both fit within | |
| # their @spaces.GPU duration budgets at the worst-case input rate. | |
| # ``working_meta`` is what frames_to_video uses to set the output's | |
| # encode framerate β must match what extract_frames was given so the | |
| # output mp4's duration equals the trimmed input duration. | |
| working_fps = min(meta.fps, PROCESS_FPS_MAX) | |
| working_meta = replace(meta, fps=working_fps) | |
| with VideoWorkspace() as ws: | |
| try: | |
| # Preserve the original file extension so FFmpeg can detect the | |
| # container format. Gradio always adds an extension for video | |
| # uploads, but fall back to .mp4 if the path somehow has none. | |
| src_suffix = Path(video_path).suffix or ".mp4" | |
| safe_video = ws.path("source" + src_suffix) | |
| shutil.copy2(video_path, safe_video) | |
| # ββ Extract frames (CFR-forced for VFR safety) βββββββββββββ | |
| progress(0.10, desc="Extracting framesβ¦") | |
| frame_paths = extract_frames( | |
| safe_video, ws.frames_dir, fps=working_fps, | |
| max_duration_s=PROCESS_DURATION_S, | |
| ) | |
| total = len(frame_paths) | |
| # ββ GPU: inpaint + composite + save ββββββββββββββββββββββββ | |
| progress(0.15, desc="Starting inpaintingβ¦") | |
| if mode == MODE_FAST: | |
| _gpu_inpaint_lama( | |
| frame_paths, crop_region, inpaint_mask, | |
| ws.out_frames_dir, total, progress, | |
| ) | |
| else: # MODE_QUALITY (already validated above) | |
| # If the prewarm thread is still downloading, wait for it | |
| # CPU-side rather than burning the @spaces.GPU(duration=300) | |
| # budget on the wait. On a fresh deploy where the user | |
| # clicks Quality before prewarm finishes, this could be | |
| # several minutes; the progress message tells them what's | |
| # happening. | |
| if not is_prewarm_done(): | |
| progress(0.16, desc="Waiting for VACE checkpoint cache to finish prewarmingβ¦") | |
| wait_for_prewarm() | |
| # If prewarm raised, the cache is incomplete and the | |
| # local_files_only=True from_pretrained calls inside | |
| # _get_pipe would fail with a confusing cache-miss error. | |
| # Surface the real cause and route the user to Fast mode. | |
| err = get_prewarm_error() | |
| if err is not None: | |
| raise gr.Error( | |
| f"VACE checkpoint download failed: {err}. " | |
| f"Use Fast (LaMa) mode, or restart the Space to retry the download." | |
| ) | |
| _gpu_inpaint_vace( | |
| frame_paths, crop_region, inpaint_mask, | |
| ws.out_frames_dir, progress, | |
| ) | |
| # ββ CPU: encode + mux βββββββββββββββββββββββββββββββββββββββ | |
| progress(0.95, desc="Encoding videoβ¦") | |
| silent_path = ws.path("silent.mp4") | |
| frames_to_video(ws.out_frames_dir, silent_path, working_meta) | |
| # The final mp4 outlives the VideoWorkspace (returned to Gradio | |
| # for download), so it goes to the system tempdir, not ``ws``. | |
| fd, final_path_str = tempfile.mkstemp(suffix=".mp4", prefix="wm_out_") | |
| os.close(fd) | |
| final_path = Path(final_path_str) | |
| try: | |
| attach_audio(safe_video, silent_path, final_path) | |
| except Exception: | |
| final_path.unlink(missing_ok=True) | |
| raise | |
| except gr.Error: | |
| # Already a user-facing red toast; let it through unchanged. | |
| raise | |
| except (ValueError, RuntimeError) as e: | |
| # Pipeline operations (ffmpeg, validation) raise these with | |
| # readable messages β surface as gr.Error for a clean toast. | |
| raise gr.Error(f"β {e}") from e | |
| except Exception as e: | |
| # Unexpected errors (CUDA OOM, model crashes, OSError on disk | |
| # full, etc.) β keep the exception type prefix so debugging | |
| # signal isn't lost, but still wrap as gr.Error so the user | |
| # sees a styled toast instead of a raw stack trace. | |
| raise gr.Error(f"β {type(e).__name__}: {e}") from e | |
| progress(1.0, desc="Done!") | |
| return str(final_path), f"β Done β {total} frames processed ({mode})" | |
| # --------------------------------------------------------------------------- | |
| # UI | |
| # --------------------------------------------------------------------------- | |
| def _card_title(text: str, step: int | None = None, top_margin: bool = False) -> gr.HTML: | |
| """Render a card heading. ``step`` adds the numbered badge prefix.""" | |
| margin = ' style="margin-top:16px"' if top_margin else "" | |
| badge = f'<span class="step-badge">{step}</span>' if step is not None else "" | |
| return gr.HTML(f'<div class="card-title"{margin}>{badge}{text}</div>') | |
| with gr.Blocks(title="Video Watermark Remover", css=CSS) as demo: | |
| # State | |
| meta_state = gr.State(None) | |
| # ββ Header ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| gr.HTML(""" | |
| <div class="header-block"> | |
| <h1>π¦ Video Watermark Remover</h1> | |
| <p>Draw over the watermark Β· choose a mode Β· get clean footage</p> | |
| </div> | |
| """) | |
| # ββ Step 1 + 2 side by side βββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Row(equal_height=False): | |
| with gr.Column(scale=1): | |
| _card_title("Upload Video", step=1) | |
| video_input = gr.Video( | |
| label=( | |
| f"Source clip (up to {UPLOAD_DURATION_S:.0f}s, " | |
| f"β€{MAX_UPLOAD_W}Γ{MAX_UPLOAD_H}; " | |
| f"first {PROCESS_DURATION_S:.0f}s processed)" | |
| ), | |
| elem_id="video-input", | |
| ) | |
| _card_title("Mode", step=2, top_margin=True) | |
| mode_radio = gr.Radio( | |
| choices=list(ALL_MODES), | |
| value=MODE_FAST, | |
| label="", | |
| elem_classes=["mode-radio"], | |
| ) | |
| _card_title("βοΈ Advanced", top_margin=True) | |
| context_slider = gr.Slider( | |
| minimum=32, | |
| maximum=192, | |
| value=64, | |
| step=16, | |
| label="Context padding (px)", | |
| info="Extra scene context around the watermark given to the model", | |
| ) | |
| with gr.Column(scale=2): | |
| _card_title("Draw Over the Watermark", step=3) | |
| editor = gr.ImageEditor( | |
| label="Paint over the watermark (brush tool)", | |
| type="numpy", | |
| height=480, | |
| brush=gr.Brush(colors=["#ef4444"], default_size=12), | |
| eraser=gr.Eraser(default_size=12), | |
| ) | |
| # ββ Action buttons βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Row(): | |
| clear_btn = gr.Button( | |
| "π§Ή Clear Mask", | |
| elem_classes=["btn-secondary"], | |
| ) | |
| snap_btn = gr.Button( | |
| "β¬ Snap to Rectangle", | |
| elem_classes=["btn-secondary"], | |
| ) | |
| preview_btn = gr.Button( | |
| "π Preview Crop Region", | |
| elem_classes=["btn-secondary"], | |
| ) | |
| process_btn = gr.Button( | |
| "β¨ Remove Watermark", | |
| variant="primary", | |
| elem_classes=["btn-primary"], | |
| ) | |
| # ββ Status βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| status_box = gr.Textbox( | |
| label="Status", | |
| value="Upload a video to begin.", | |
| lines=4, | |
| interactive=False, | |
| elem_classes=["status-box"], | |
| ) | |
| # ββ Outputs ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Row(): | |
| with gr.Column(): | |
| _card_title("Crop Preview") | |
| crop_preview = gr.Image( | |
| label="", | |
| type="numpy", | |
| show_label=False, | |
| ) | |
| with gr.Column(): | |
| _card_title("Output Video") | |
| video_output = gr.Video( | |
| label="", | |
| show_label=False, | |
| ) | |
| # ββ Wiring βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| video_input.upload( | |
| fn=on_video_upload, | |
| inputs=[video_input], | |
| outputs=[editor, crop_preview, meta_state, status_box], | |
| ) | |
| clear_btn.click( | |
| fn=on_clear_mask, | |
| inputs=[editor], | |
| outputs=[editor, status_box], | |
| ) | |
| snap_btn.click( | |
| fn=on_snap_to_rectangle, | |
| inputs=[editor], | |
| outputs=[editor, status_box], | |
| ) | |
| preview_btn.click( | |
| fn=on_preview_crop, | |
| inputs=[editor, meta_state, context_slider], | |
| outputs=[crop_preview, status_box], | |
| ) | |
| process_btn.click( | |
| fn=run_pipeline, | |
| inputs=[video_input, editor, mode_radio, context_slider, meta_state], | |
| outputs=[video_output, status_box], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |