Jack Wu
refactor: modernize type hinting by using collection ABCs and built-in generic types across the codebase
b6ba6fc
"""
app.py
------
Gradio UI for the Video Watermark Remover Space (ZeroGPU).
Flow:
1. Upload video β†’ extract first frame β†’ display in ImageEditor for mask drawing
2. User brushes over the watermark
3. Preview Crop β†’ shows crop region + mask overlay
4. Mode: Fast (LaMa) | Quality (VACE-14B)
5. Remove Watermark β†’ runs pipeline β†’ output video
"""
from __future__ import annotations
import os
import shutil
import tempfile
from dataclasses import asdict, fields, replace
from pathlib import Path
import gradio as gr
import numpy as np
from PIL import Image, ImageDraw
# ---------------------------------------------------------------------------
# ZeroGPU spaces shim β€” import succeeds locally too
# ---------------------------------------------------------------------------
try:
import spaces # type: ignore
except ImportError:
class spaces: # type: ignore
@staticmethod
def GPU(duration=60):
def decorator(fn):
return fn
return decorator
# ---------------------------------------------------------------------------
# Pipeline imports
# ---------------------------------------------------------------------------
from pipeline.composite import composite_and_save, feathered_alpha
from pipeline.crop import (
DEFAULT_DILATE_PX, CropRegion, build_inpaint_mask, compute_crop_region,
mask_to_bbox,
)
from pipeline.lama import inpaint_frames_lama_stream
from pipeline.vace import (
get_prewarm_error, inpaint_frames_vace_stream, is_prewarm_done,
prewarm_vace_cache, wait_for_prewarm,
)
from pipeline.video import (
VideoMeta, VideoWorkspace,
attach_audio, extract_first_frame_array, extract_frames, frames_to_video, probe,
)
# ---------------------------------------------------------------------------
# Tunables
# ---------------------------------------------------------------------------
# Working window the pipeline actually inpaints. Longer uploads are
# accepted (up to UPLOAD_DURATION_S below) and silently trimmed during
# frame extraction.
PROCESS_DURATION_S = 15.0
# Maximum accepted clip length. Anything between PROCESS_DURATION_S and
# UPLOAD_DURATION_S is auto-trimmed to PROCESS_DURATION_S during processing.
UPLOAD_DURATION_S = 60.0
# Maximum accepted source resolution. Larger uploads are rejected outright.
MAX_UPLOAD_W = 1920
MAX_UPLOAD_H = 1080
# Cap working fps for both LaMa per-frame and VACE chunked inference. At
# 60 fps Γ— 15 s the per-frame LaMa budget overflows the 240 s @spaces.GPU
# lease (~270 s estimated), and VACE chunking produces ~13 chunks worth
# >300 s. At 30 fps both fit comfortably. Sources above this rate are
# extracted *and encoded* at PROCESS_FPS_MAX so the output mp4 duration
# matches the trimmed input.
PROCESS_FPS_MAX = 30.0
# Mode labels used in both the UI radio choices and the dispatch logic.
# Defining them once prevents drift between the two sites.
MODE_FAST = "Fast (LaMa)"
MODE_QUALITY = "Quality (VACE-14B)"
ALL_MODES = (MODE_FAST, MODE_QUALITY)
# Kick off the ~75 GB VACE checkpoint download in a background thread so
# the cache is populated before the first Quality-mode click. Idempotent
# and non-blocking β€” the app starts serving immediately. Skip with
# VACE_PREWARM=0 if you only ever use Fast/LaMa mode.
prewarm_vace_cache()
# ---------------------------------------------------------------------------
# CSS β€” dark theme; lives in static/style.css for syntax highlighting & diffability
# ---------------------------------------------------------------------------
CSS = (Path(__file__).resolve().parent / "static" / "style.css").read_text()
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _get_mask_from_editor(editor_value: dict | None) -> np.ndarray | None:
"""Extract a uint8 mask (HΓ—W) from gr.ImageEditor output (Gradio 4.x).
Returns ``None`` if the editor is empty *or* the user has only drawn
blank pixels β€” callers don't need to distinguish; both mean "no usable
mask drawn". The returned array is at *display* resolution (the editor
canvas size), not source-frame resolution; always rescale via
:func:`_rescale_mask_to_frame` before interpreting against VideoMeta.
Unions all layers so drawings across multiple paint layers are preserved.
Gradio 4.x layers are RGBA ndarrays. Gradio 5+ changed the format to dicts
with a 'url' key β€” if requirements.txt ever lifts the gradio<5 cap, this
function needs to be rewritten.
"""
if editor_value is None:
return None
layers = [
layer for layer in (editor_value.get("layers") or [])
if isinstance(layer, np.ndarray)
]
if not layers:
return None
combined: np.ndarray | None = None
for arr in layers:
if arr.ndim == 3 and arr.shape[2] == 4:
channel = arr[:, :, 3] # RGBA β†’ alpha
elif arr.ndim == 3:
channel = arr.max(axis=2) # RGB β†’ luminance max
elif arr.ndim == 2:
channel = arr.astype(np.uint8) # already single-channel
else:
continue # unexpected shape β€” skip
combined = channel if combined is None else np.maximum(combined, channel)
if combined is None or combined.max() == 0:
return None
return combined
def _rescale_mask_to_frame(
raw_mask: np.ndarray,
target_w: int,
target_h: int,
) -> np.ndarray:
"""Rescale a mask from editor/display resolution to full source-frame resolution.
gr.ImageEditor renders at a fixed CSS height, so the returned layer may be
e.g. 854Γ—480 for a 1920Γ—1080 source. Using display-resolution coordinates
against full-frame VideoMeta dimensions would place the crop in the wrong
position.
Also enforces uint8 binary (0 / 255) output regardless of the layer dtype
returned by Gradio β€” guards against future API changes where the layer
becomes float32 (0–1) rather than uint8 (0–255).
"""
if raw_mask.shape[0] == target_h and raw_mask.shape[1] == target_w:
result = raw_mask
else:
result = np.array(
Image.fromarray(raw_mask).resize((target_w, target_h), Image.NEAREST)
)
# Normalise to binary uint8 regardless of incoming dtype
return (result > 0).astype(np.uint8) * 255
def _create_crop_preview(
first_frame: np.ndarray,
crop_region: CropRegion,
inpaint_mask: np.ndarray,
) -> np.ndarray:
"""Overlay crop rectangle and mask on the first frame for preview."""
img = Image.fromarray(first_frame).convert("RGBA")
overlay = Image.new("RGBA", img.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(overlay)
cr = crop_region
# Semi-transparent teal fill for crop region
draw.rectangle(
list(cr.pil_box),
fill=(13, 148, 136, 40),
outline=(45, 212, 191, 200),
width=2,
)
# Inpaint mask overlay (red, semi-transparent)
if inpaint_mask is not None:
mask_full = np.zeros((img.height, img.width), dtype=np.uint8)
mask_full[cr.frame_y : cr.frame_y2, cr.frame_x : cr.frame_x2] = inpaint_mask
rgba = np.zeros((img.height, img.width, 4), dtype=np.uint8)
rgba[mask_full > 0] = (239, 68, 68, 140)
overlay = Image.alpha_composite(overlay, Image.fromarray(rgba, mode="RGBA"))
composite = Image.alpha_composite(img, overlay).convert("RGB")
return np.array(composite)
_VIDEO_META_FIELDS = frozenset(f.name for f in fields(VideoMeta))
def _meta_from_state(d: dict) -> VideoMeta:
"""Reconstruct a VideoMeta from a Gradio State dict, ignoring unknown keys.
Cached browser sessions across app versions may carry extra fields that
no longer exist on VideoMeta β€” passing them via ``**d`` would raise
TypeError, so we filter to known fields. Optional fields use the
dataclass defaults if missing.
"""
return VideoMeta(**{k: v for k, v in d.items() if k in _VIDEO_META_FIELDS})
def _compute_crop_and_mask(
raw_mask: np.ndarray,
meta_state: dict,
context_px: int,
):
"""Run the mask β†’ bbox β†’ crop_region β†’ inpaint_mask chain.
Shared by :func:`on_preview_crop` and :func:`run_pipeline`; returns
``(meta, bbox, crop_region, inpaint_mask)``. ``raw_mask`` is at editor
display resolution (whatever Gradio gave back); this function rescales
it to the source-frame resolution defined by *meta_state*.
"""
meta = _meta_from_state(meta_state)
full_mask = _rescale_mask_to_frame(raw_mask, meta.width, meta.height)
bbox = mask_to_bbox(full_mask)
crop_region = compute_crop_region(
bbox, meta.width, meta.height, context_px=context_px,
)
inpaint_mask = build_inpaint_mask(
crop_region, source_mask=full_mask, dilate_px=DEFAULT_DILATE_PX,
)
return meta, bbox, crop_region, inpaint_mask
# ---------------------------------------------------------------------------
# Callbacks
# ---------------------------------------------------------------------------
def on_video_upload(video_path: str | None):
"""Extract first frame and populate the ImageEditor."""
# Tuple shape: (editor_update, crop_preview_update, meta_state, status).
def _msg(text: str):
return gr.update(), gr.update(), None, text
if not video_path:
return _msg("Upload a video to begin.")
try:
meta = probe(video_path)
# ── Input validation β€” guard against disk exhaustion on ZeroGPU ──
# MAX_FRAMES catches VFR / container-less sources where ffprobe
# returns N/A for duration; duration_s would be 0.0 after our
# parse-fallback, so the duration check alone could let an
# arbitrarily long clip through.
max_frames = round(UPLOAD_DURATION_S * max(meta.fps, 1.0))
if meta.duration_s > UPLOAD_DURATION_S:
return _msg(
f"❌ Clip too long ({meta.duration_s:.1f}s). "
f"Max {UPLOAD_DURATION_S:.0f}s; only the first "
f"{PROCESS_DURATION_S:.0f}s would be processed anyway."
)
if meta.frame_count > max_frames:
return _msg(
f"❌ Clip too long ({meta.frame_count} frames at "
f"{meta.fps:.2f} fps). Max {UPLOAD_DURATION_S:.0f} seconds."
)
if meta.width * meta.height > MAX_UPLOAD_W * MAX_UPLOAD_H:
return _msg(
f"❌ Resolution too high ({meta.width}Γ—{meta.height}). "
f"Max {MAX_UPLOAD_W}Γ—{MAX_UPLOAD_H}."
)
will_trim = meta.duration_s > PROCESS_DURATION_S
will_cap_fps = meta.fps > PROCESS_FPS_MAX
first_frame = extract_first_frame_array(video_path)
meta_str = (
f"{meta.width}Γ—{meta.height} Β· {meta.fps:.3g} fps Β· "
f"{meta.duration_s:.1f}s Β· {meta.frame_count} frames"
)
if meta.color_trc:
meta_str += f" Β· {meta.color_trc}"
editor_val = {
"background": first_frame,
"layers": [],
"composite": None,
}
notes = []
if will_trim:
notes.append(
f"⚠️ Clip is {meta.duration_s:.1f}s β€” only the first "
f"{PROCESS_DURATION_S:.0f}s will be processed."
)
if will_cap_fps:
notes.append(
f"⚠️ Source is {meta.fps:.0f} fps β€” output will be "
f"{PROCESS_FPS_MAX:.0f} fps to fit GPU budget."
)
# Double newline between notes so they don't visually run together
# in the Gradio status box.
notes_str = ("\n\n" + "\n\n".join(notes)) if notes else ""
return (
gr.update(value=editor_val),
gr.update(value=None),
asdict(meta),
f"βœ“ Loaded β€” {meta_str}{notes_str}"
f"\n\nNow draw over the watermark with the brush tool.",
)
except Exception as e:
return _msg(f"❌ Error: {e}")
def on_preview_crop(editor_value: dict | None, meta_state: dict | None, context_px: int):
"""Compute crop region from mask and render a preview overlay."""
if meta_state is None or editor_value is None:
return gr.update(), "Upload a video first."
raw_mask = _get_mask_from_editor(editor_value)
if raw_mask is None:
return gr.update(), "⚠️ No drawing detected. Use the brush to paint over the watermark."
try:
meta, bbox, crop_region, inpaint_mask = _compute_crop_and_mask(
raw_mask, meta_state, context_px,
)
bg = editor_value.get("background")
if bg is None:
first_frame = np.zeros((meta.height, meta.width, 3), dtype=np.uint8)
else:
first_frame = np.array(Image.fromarray(np.asarray(bg)).convert("RGB"))
# Ensure first_frame is at full source resolution for the overlay
if first_frame.shape[1] != meta.width or first_frame.shape[0] != meta.height:
first_frame = np.array(
Image.fromarray(first_frame).resize(
(meta.width, meta.height), Image.LANCZOS
)
)
preview = _create_crop_preview(first_frame, crop_region, inpaint_mask)
status = (
f"βœ“ Crop computed\n"
f" Watermark bbox : {bbox.width}Γ—{bbox.height} px\n"
f" Crop region : {crop_region.frame_w}Γ—{crop_region.frame_h} "
f"@ ({crop_region.frame_x}, {crop_region.frame_y})\n"
f" VACE target : {crop_region.target_w}Γ—{crop_region.target_h}\n"
f"\nLooks good? Hit Remove Watermark."
)
return gr.update(value=preview), status
except Exception as e:
return gr.update(), f"❌ {e}"
def on_clear_mask(editor_value: dict | None):
"""Clear all paint layers from the editor while preserving the loaded frame."""
if editor_value is None:
return gr.update(), "Upload a video to begin."
bg = editor_value.get("background")
return (
gr.update(value={"background": bg, "layers": [], "composite": None}),
"Mask cleared. Draw over the watermark to start again.",
)
def on_snap_to_rectangle(editor_value: dict | None):
"""Replace the user's freehand scribble with a clean rectangle covering its bbox.
Most watermarks are rectangular (corner logos, channel bugs, subtitle bars),
so this gives the same coverage as careful brush-filling but in one click
after a rough scribble. Reuses the existing brush β€” no custom tool needed.
"""
if editor_value is None:
return gr.update(), "Upload a video to begin."
raw_mask = _get_mask_from_editor(editor_value)
if raw_mask is None:
return (
gr.update(),
"⚠️ Draw a rough scribble over the watermark first, then snap.",
)
bg = editor_value.get("background")
if bg is None:
return gr.update(), "Upload a video first."
bg_arr = np.asarray(bg)
H, W = bg_arr.shape[:2]
# raw_mask comes back at editor (display) resolution β€” same as bg
ys, xs = np.where(raw_mask > 0)
y1, y2 = int(ys.min()), int(ys.max())
x1, x2 = int(xs.min()), int(xs.max())
# Build an RGBA layer painted with the brush colour (#ef4444) inside the bbox.
# Matching the brush colour means the editor renders it identically to a
# carefully-painted rectangle.
new_layer = np.zeros((H, W, 4), dtype=np.uint8)
new_layer[y1:y2 + 1, x1:x2 + 1] = (239, 68, 68, 255)
return (
gr.update(value={
"background": bg,
"layers": [new_layer],
"composite": None,
}),
f"βœ“ Snapped to {x2 - x1 + 1}Γ—{y2 - y1 + 1} px rectangle "
f"at ({x1},{y1})β†’({x2},{y2}). Adjust with brush/eraser if needed.",
)
@spaces.GPU(duration=240)
def _gpu_inpaint_lama(
frame_paths: list,
crop_region: CropRegion,
inpaint_mask: np.ndarray,
out_dir,
total: int,
progress,
) -> None:
"""LaMa branch β€” streams one frame at a time, never holds the full list."""
alpha = feathered_alpha(inpaint_mask)
out_dir = Path(out_dir)
def _prog(i: int) -> None:
progress(
0.20 + 0.65 * ((i + 1) / total),
desc=f"LaMa {i + 1}/{total}…",
)
crops_iter = inpaint_frames_lama_stream(
frame_paths, crop_region, inpaint_mask, _prog,
)
composite_and_save(frame_paths, crops_iter, crop_region, alpha, out_dir)
@spaces.GPU(duration=300)
def _gpu_inpaint_vace(
frame_paths: list,
crop_region: CropRegion,
inpaint_mask: np.ndarray,
out_dir,
progress,
) -> None:
"""VACE branch β€” chunked temporal inference with streaming output.
Memory footprint per chunk (~250 MB) is independent of clip length β€”
see :func:`pipeline.vace.inpaint_frames_vace_stream` for the chunking
rationale. The chunk loop emits each frame as it's ready, so this
function composites + saves frame-by-frame instead of buffering all
inpainted crops in RAM.
"""
alpha = feathered_alpha(inpaint_mask)
out_dir = Path(out_dir)
progress(0.20, desc="Loading VACE-14B (first run ~30-60s; cached after)…")
def _prog_chunk(ci: int, n_chunks: int) -> None:
# Map chunk completion to overall pipeline progress 0.25 β†’ 0.90.
frac = (ci + 1) / max(n_chunks, 1)
progress(0.25 + 0.65 * frac, desc=f"VACE chunk {ci + 1}/{n_chunks}…")
crops_iter = inpaint_frames_vace_stream(
frame_paths, crop_region, inpaint_mask, progress_fn=_prog_chunk,
)
composite_and_save(frame_paths, crops_iter, crop_region, alpha, out_dir)
def run_pipeline(
video_path: str | None,
editor_value: dict | None,
mode: str,
context_px: int,
meta_state: dict | None,
progress=gr.Progress(),
):
"""
Pipeline orchestrator β€” CPU work only.
GPU allocation is acquired and released inside the per-mode @spaces.GPU
functions (_gpu_inpaint_lama / _gpu_inpaint_vace). Frame extraction
and video encoding/muxing are pure CPU/disk I/O and do not consume
GPU quota.
"""
if video_path is None:
raise gr.Error("Upload a video first.")
if meta_state is None:
raise gr.Error("Video metadata missing β€” re-upload the video.")
if mode not in ALL_MODES:
raise gr.Error(f"Unknown mode '{mode}'. Choose from: {ALL_MODES}")
raw_mask = _get_mask_from_editor(editor_value)
if raw_mask is None:
raise gr.Error("Draw over the watermark before processing.")
progress(0.05, desc="Computing crop region…")
try:
meta, _bbox, crop_region, inpaint_mask = _compute_crop_and_mask(
raw_mask, meta_state, context_px,
)
except ValueError as e:
# mask_to_bbox / compute_crop_region raise ValueError with user-
# facing messages ("drawn area too small", "watermark too large",
# "frame too small", etc.). Surface as gr.Error so the UI shows a
# clean red toast instead of a generic stack-traced exception.
raise gr.Error(str(e)) from e
# Cap working fps so per-frame LaMa and per-chunk VACE both fit within
# their @spaces.GPU duration budgets at the worst-case input rate.
# ``working_meta`` is what frames_to_video uses to set the output's
# encode framerate β€” must match what extract_frames was given so the
# output mp4's duration equals the trimmed input duration.
working_fps = min(meta.fps, PROCESS_FPS_MAX)
working_meta = replace(meta, fps=working_fps)
with VideoWorkspace() as ws:
try:
# Preserve the original file extension so FFmpeg can detect the
# container format. Gradio always adds an extension for video
# uploads, but fall back to .mp4 if the path somehow has none.
src_suffix = Path(video_path).suffix or ".mp4"
safe_video = ws.path("source" + src_suffix)
shutil.copy2(video_path, safe_video)
# ── Extract frames (CFR-forced for VFR safety) ─────────────
progress(0.10, desc="Extracting frames…")
frame_paths = extract_frames(
safe_video, ws.frames_dir, fps=working_fps,
max_duration_s=PROCESS_DURATION_S,
)
total = len(frame_paths)
# ── GPU: inpaint + composite + save ────────────────────────
progress(0.15, desc="Starting inpainting…")
if mode == MODE_FAST:
_gpu_inpaint_lama(
frame_paths, crop_region, inpaint_mask,
ws.out_frames_dir, total, progress,
)
else: # MODE_QUALITY (already validated above)
# If the prewarm thread is still downloading, wait for it
# CPU-side rather than burning the @spaces.GPU(duration=300)
# budget on the wait. On a fresh deploy where the user
# clicks Quality before prewarm finishes, this could be
# several minutes; the progress message tells them what's
# happening.
if not is_prewarm_done():
progress(0.16, desc="Waiting for VACE checkpoint cache to finish prewarming…")
wait_for_prewarm()
# If prewarm raised, the cache is incomplete and the
# local_files_only=True from_pretrained calls inside
# _get_pipe would fail with a confusing cache-miss error.
# Surface the real cause and route the user to Fast mode.
err = get_prewarm_error()
if err is not None:
raise gr.Error(
f"VACE checkpoint download failed: {err}. "
f"Use Fast (LaMa) mode, or restart the Space to retry the download."
)
_gpu_inpaint_vace(
frame_paths, crop_region, inpaint_mask,
ws.out_frames_dir, progress,
)
# ── CPU: encode + mux ───────────────────────────────────────
progress(0.95, desc="Encoding video…")
silent_path = ws.path("silent.mp4")
frames_to_video(ws.out_frames_dir, silent_path, working_meta)
# The final mp4 outlives the VideoWorkspace (returned to Gradio
# for download), so it goes to the system tempdir, not ``ws``.
fd, final_path_str = tempfile.mkstemp(suffix=".mp4", prefix="wm_out_")
os.close(fd)
final_path = Path(final_path_str)
try:
attach_audio(safe_video, silent_path, final_path)
except Exception:
final_path.unlink(missing_ok=True)
raise
except gr.Error:
# Already a user-facing red toast; let it through unchanged.
raise
except (ValueError, RuntimeError) as e:
# Pipeline operations (ffmpeg, validation) raise these with
# readable messages β€” surface as gr.Error for a clean toast.
raise gr.Error(f"❌ {e}") from e
except Exception as e:
# Unexpected errors (CUDA OOM, model crashes, OSError on disk
# full, etc.) β€” keep the exception type prefix so debugging
# signal isn't lost, but still wrap as gr.Error so the user
# sees a styled toast instead of a raw stack trace.
raise gr.Error(f"❌ {type(e).__name__}: {e}") from e
progress(1.0, desc="Done!")
return str(final_path), f"βœ“ Done β€” {total} frames processed ({mode})"
# ---------------------------------------------------------------------------
# UI
# ---------------------------------------------------------------------------
def _card_title(text: str, step: int | None = None, top_margin: bool = False) -> gr.HTML:
"""Render a card heading. ``step`` adds the numbered badge prefix."""
margin = ' style="margin-top:16px"' if top_margin else ""
badge = f'<span class="step-badge">{step}</span>' if step is not None else ""
return gr.HTML(f'<div class="card-title"{margin}>{badge}{text}</div>')
with gr.Blocks(title="Video Watermark Remover", css=CSS) as demo:
# State
meta_state = gr.State(None)
# ── Header ──────────────────────────────────────────────────────────────
gr.HTML("""
<div class="header-block">
<h1>🦎 Video Watermark Remover</h1>
<p>Draw over the watermark Β· choose a mode Β· get clean footage</p>
</div>
""")
# ── Step 1 + 2 side by side ─────────────────────────────────────────────
with gr.Row(equal_height=False):
with gr.Column(scale=1):
_card_title("Upload Video", step=1)
video_input = gr.Video(
label=(
f"Source clip (up to {UPLOAD_DURATION_S:.0f}s, "
f"≀{MAX_UPLOAD_W}Γ—{MAX_UPLOAD_H}; "
f"first {PROCESS_DURATION_S:.0f}s processed)"
),
elem_id="video-input",
)
_card_title("Mode", step=2, top_margin=True)
mode_radio = gr.Radio(
choices=list(ALL_MODES),
value=MODE_FAST,
label="",
elem_classes=["mode-radio"],
)
_card_title("βš™οΈ Advanced", top_margin=True)
context_slider = gr.Slider(
minimum=32,
maximum=192,
value=64,
step=16,
label="Context padding (px)",
info="Extra scene context around the watermark given to the model",
)
with gr.Column(scale=2):
_card_title("Draw Over the Watermark", step=3)
editor = gr.ImageEditor(
label="Paint over the watermark (brush tool)",
type="numpy",
height=480,
brush=gr.Brush(colors=["#ef4444"], default_size=12),
eraser=gr.Eraser(default_size=12),
)
# ── Action buttons ───────────────────────────────────────────────────────
with gr.Row():
clear_btn = gr.Button(
"🧹 Clear Mask",
elem_classes=["btn-secondary"],
)
snap_btn = gr.Button(
"⬛ Snap to Rectangle",
elem_classes=["btn-secondary"],
)
preview_btn = gr.Button(
"πŸ” Preview Crop Region",
elem_classes=["btn-secondary"],
)
process_btn = gr.Button(
"✨ Remove Watermark",
variant="primary",
elem_classes=["btn-primary"],
)
# ── Status ───────────────────────────────────────────────────────────────
status_box = gr.Textbox(
label="Status",
value="Upload a video to begin.",
lines=4,
interactive=False,
elem_classes=["status-box"],
)
# ── Outputs ──────────────────────────────────────────────────────────────
with gr.Row():
with gr.Column():
_card_title("Crop Preview")
crop_preview = gr.Image(
label="",
type="numpy",
show_label=False,
)
with gr.Column():
_card_title("Output Video")
video_output = gr.Video(
label="",
show_label=False,
)
# ── Wiring ───────────────────────────────────────────────────────────────
video_input.upload(
fn=on_video_upload,
inputs=[video_input],
outputs=[editor, crop_preview, meta_state, status_box],
)
clear_btn.click(
fn=on_clear_mask,
inputs=[editor],
outputs=[editor, status_box],
)
snap_btn.click(
fn=on_snap_to_rectangle,
inputs=[editor],
outputs=[editor, status_box],
)
preview_btn.click(
fn=on_preview_crop,
inputs=[editor, meta_state, context_slider],
outputs=[crop_preview, status_box],
)
process_btn.click(
fn=run_pipeline,
inputs=[video_input, editor, mode_radio, context_slider, meta_state],
outputs=[video_output, status_box],
)
if __name__ == "__main__":
demo.launch()