FeilongTang's picture
Surface per-frame patch breakdown in the chart legend + JSON
0c2f8f4
Raw
History Blame
57.2 kB
"""OneVision Encoder Codec View.
A simplified, dependency-light port of the codec_tools pipeline from
lmms-eval-ov2. The original tool relies on a bitcost-patched ffmpeg 5.1 to
score every macroblock by its actual encoding bit cost; we approximate that
saliency signal with a Sobel gradient magnitude per patch (high gradient =
high local complexity = roughly what the encoder would spend bits on).
Pipeline (mirrors codec_tools/pipeline/process_video_bitcost_readiness.py):
1. Uniformly sample N frames from the input video.
2. smart_resize each frame so dims are multiples of `patch` and the
total pixel count <= max_pixels.
3. Slice every frame into a patch grid; score each patch by its
Sobel gradient magnitude mean.
4. Pick the top-K highest-scoring patches per frame.
5. Render a "selection visualization" video: kept patches stay in
full color, dropped patches are faded to a gray-white wash so the
viewer can see exactly which patches the codec stage chose.
6. Pack the selected patches in time-order, raster scan, into a
single canvas image (the artifact LLaVA-OneVision2 consumes).
"""
import json
import math
import os
import shutil
import subprocess
import tempfile
import time
from typing import List, Tuple
import cv2
import gradio as gr
import imageio_ffmpeg
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
PATCH_CHOICES = [14, 16, 28]
DEMO_VIDEO_PATH = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"examples", "demo_codec_heatmap.mp4",
)
DEMO_PRESET = (
DEMO_VIDEO_PATH, # video_in
16, # sample_frames
14, # patch_size
1024, # total_patches
150000, # max_pixels
"sbs", # viz_mode
0.55, # heatmap_alpha
0.0, 0.0, # start_sec, end_sec
"combined", # saliency_signal
True, # score_log_scale
96.0, # bitcost_pct
0.55, # fade_strength
"dynamic", # gop
4, # target_canvases
)
def smart_resize(frame: np.ndarray, max_pixels: int, factor: int) -> np.ndarray:
"""Resize so h,w are multiples of `factor` and h*w <= max_pixels."""
h, w = frame.shape[:2]
pixels = h * w
if pixels > max_pixels:
scale = math.sqrt(max_pixels / pixels)
h = max(factor, int(h * scale))
w = max(factor, int(w * scale))
h = max(factor, (h // factor) * factor)
w = max(factor, (w // factor) * factor)
return cv2.resize(frame, (w, h), interpolation=cv2.INTER_AREA)
def sample_frame_ids(total: int, n: int) -> List[int]:
if total <= 0:
return []
if n >= total:
return list(range(total))
return [int(round(i)) for i in np.linspace(0, total - 1, n)]
def decode_frames(video_path: str, frame_ids: List[int]) -> List[np.ndarray]:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return []
frames: List[np.ndarray] = []
for fid in frame_ids:
cap.set(cv2.CAP_PROP_POS_FRAMES, int(fid))
ok, fr = cap.read()
if ok:
frames.append(fr)
cap.release()
return frames
def video_metadata(video_path: str) -> dict:
cap = cv2.VideoCapture(video_path)
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = float(cap.get(cv2.CAP_PROP_FPS) or 0.0)
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
meta = {
"total_frames": total,
"fps": round(fps, 3),
"width": w,
"height": h,
}
if shutil.which("ffprobe"):
try:
r = subprocess.run(
[
"ffprobe", "-v", "quiet", "-select_streams", "v:0",
"-show_entries", "stream=codec_name,bit_rate,pix_fmt,profile",
"-of", "json", video_path,
],
capture_output=True, text=True, check=True, timeout=15,
)
data = json.loads(r.stdout).get("streams", [{}])[0]
meta["codec"] = data.get("codec_name")
meta["pix_fmt"] = data.get("pix_fmt")
meta["profile"] = data.get("profile")
meta["bitrate_bps"] = data.get("bit_rate")
except Exception as e:
meta["ffprobe_error"] = str(e)
return meta
def patch_score_grid(frame_bgr: np.ndarray, patch: int) -> np.ndarray:
"""Return [hb, wb] grid of Sobel gradient magnitude means per patch."""
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY).astype(np.float32)
gx = cv2.Sobel(gray, cv2.CV_32F, 1, 0, ksize=3)
gy = cv2.Sobel(gray, cv2.CV_32F, 0, 1, ksize=3)
mag = np.sqrt(gx * gx + gy * gy)
h, w = mag.shape
hb, wb = h // patch, w // patch
mag = mag[: hb * patch, : wb * patch]
grid = mag.reshape(hb, patch, wb, patch).mean(axis=(1, 3))
return grid.astype(np.float32)
def patch_score_frame_diff(
prev_bgr: np.ndarray, cur_bgr: np.ndarray, patch: int,
) -> np.ndarray:
"""Inter-frame absdiff per patch β€” proxy for motion / temporal complexity."""
if prev_bgr is None or prev_bgr.shape != cur_bgr.shape:
return patch_score_grid(cur_bgr, patch)
diff = cv2.absdiff(prev_bgr, cur_bgr).mean(axis=2).astype(np.float32)
h, w = diff.shape
hb, wb = h // patch, w // patch
diff = diff[: hb * patch, : wb * patch]
return diff.reshape(hb, patch, wb, patch).mean(axis=(1, 3))
def compute_score_grids(
frames: List[np.ndarray], patch: int, signal: str,
) -> List[np.ndarray]:
"""Build per-frame patch score grids from one of three signals:
- 'gradient' β€” Sobel magnitude only (intra-frame complexity)
- 'frame_diff' β€” absdiff vs previous frame (temporal motion)
- 'combined' β€” 0.5 * gradient_norm + 0.5 * frame_diff_norm
For 'combined', each component is independently shifted to [0,1] across
the whole sample so they contribute on equal footing."""
sig = (signal or "gradient").lower()
if sig == "gradient":
return [patch_score_grid(f, patch) for f in frames]
if sig == "frame_diff":
out = []
prev = None
for f in frames:
out.append(patch_score_frame_diff(prev, f, patch))
prev = f
return out
# combined
g = np.stack([patch_score_grid(f, patch) for f in frames], axis=0)
d_list = []
prev = None
for f in frames:
d_list.append(patch_score_frame_diff(prev, f, patch))
prev = f
d = np.stack(d_list, axis=0)
def _norm01(a: np.ndarray) -> np.ndarray:
a = a.astype(np.float32) - a.min()
m = a.max()
return a / m if m > 1e-8 else a
combined = 0.5 * _norm01(g) + 0.5 * _norm01(d)
return [combined[i] for i in range(combined.shape[0])]
def topk_mask(score: np.ndarray, k: int) -> np.ndarray:
"""Per-frame top-K mask (legacy helper, no longer used by process())."""
flat = score.flatten()
if k >= flat.size:
return np.ones_like(score, dtype=np.uint8)
if k <= 0:
return np.zeros_like(score, dtype=np.uint8)
thresh = np.partition(flat, -k)[-k]
return (score >= thresh).astype(np.uint8)
def global_topk_masks(
grids: List[np.ndarray], total_k: int,
) -> Tuple[List[np.ndarray], int]:
"""Pick the top `total_k` highest-scoring patches GLOBALLY across all
sampled frames, return one mask per frame plus the actual count.
Some frames may end up with zero patches (low energy throughout) while
others may contribute many β€” that's the whole point: the codec-style
saliency lets the budget concentrate where it matters."""
if not grids:
return [], 0
arr = np.stack(grids, axis=0).astype(np.float32) # [N, hb, wb]
N, hb, wb = arr.shape
flat = arr.reshape(-1)
if total_k >= flat.size:
masks = [np.ones((hb, wb), dtype=np.uint8) for _ in range(N)]
return masks, int(flat.size)
if total_k <= 0:
return [np.zeros((hb, wb), dtype=np.uint8) for _ in range(N)], 0
thresh = np.partition(flat, -total_k)[-total_k]
bool_mask = (arr >= thresh)
actual = int(bool_mask.sum())
return [bool_mask[i].astype(np.uint8) for i in range(N)], actual
def build_dynamic_groups(
grids: List[np.ndarray], target_groups: int = 4, min_group_frames: int = 1,
) -> List[Tuple[int, int]]:
"""Adaptive temporal grouping by cumulative saliency energy.
Walk sampled frames in time order, accumulate frame-level score sums,
and close the current group once the running total reaches
`total_energy / target_groups`. Groups end up roughly equal in
*information content* rather than equal in frame count β€” this is the
same intuition as codec_tools' readiness mode, simplified for the
demo (no temporal-coverage / marginal-gain refinement)."""
n = len(grids)
if n == 0:
return []
if n <= target_groups:
return [(i, i) for i in range(n)]
energies = np.array([float(g.sum()) for g in grids], dtype=np.float64)
total = energies.sum()
if total <= 1e-8:
# Degenerate: pure even split.
size = max(1, n // target_groups)
groups: List[Tuple[int, int]] = []
cursor = 0
while cursor < n and len(groups) < target_groups:
end = min(n - 1, cursor + size - 1)
if len(groups) == target_groups - 1:
end = n - 1
groups.append((cursor, end))
cursor = end + 1
return groups
target_per_group = total / target_groups
groups = []
start = 0
cum = 0.0
for i in range(n):
cum += energies[i]
groups_left = target_groups - len(groups) - 1
frames_left_after = n - i - 1
# Close this group if energy budget hit AND we still leave room for
# the remaining groups (each needs >= min_group_frames frames).
threshold_hit = cum >= target_per_group
room_ok = frames_left_after >= groups_left * min_group_frames
size_ok = (i - start + 1) >= min_group_frames
if threshold_hit and room_ok and size_ok and len(groups) < target_groups - 1:
groups.append((start, i))
start = i + 1
cum = 0.0
# Tail group (whatever frames remain).
if start <= n - 1:
groups.append((start, n - 1))
return groups
def grouped_topk_masks(
grids: List[np.ndarray], total_k: int, gop: str,
) -> Tuple[List[np.ndarray], int, List[Tuple[int, int]], str]:
"""Select patches under a GOP grouping strategy.
GOP modes:
- "global": one big group across the whole video β€” top-K global.
- "<int>" (e.g. "4"/"8"/"16"): fixed group size in frames; the
budget is split equally across groups, top-K picked within each.
- "dynamic": adaptive groups (see build_dynamic_groups), targeting
4 groups by default; each group gets an equal share of the budget.
Returns (per-frame masks, actual selected count, [(start,end),...] groups, resolved_label).
"""
n = len(grids)
if n == 0:
return [], 0, [], gop
mode = (gop or "global").strip().lower()
if mode in ("global", "none", "0", ""):
masks, actual = global_topk_masks(grids, int(total_k))
return masks, actual, [(0, n - 1)], "global"
if mode == "dynamic":
groups = build_dynamic_groups(grids, target_groups=min(4, max(1, n)))
else:
try:
g_size = max(1, int(mode))
except ValueError:
g_size = n
groups = []
cursor = 0
while cursor < n:
end = min(n - 1, cursor + g_size - 1)
groups.append((cursor, end))
cursor = end + 1
num_groups = max(1, len(groups))
per_group_budget = max(1, int(total_k) // num_groups)
# Initialize empty masks, then fill per-group selections.
out_masks = [np.zeros(g.shape, dtype=np.uint8) for g in grids]
actual_total = 0
for (s, e) in groups:
sub = grids[s:e + 1]
sub_masks, sub_actual = global_topk_masks(sub, per_group_budget)
for i, sm in enumerate(sub_masks):
out_masks[s + i] = sm
actual_total += sub_actual
return out_masks, actual_total, groups, mode
def faded_background(frame_bgr: np.ndarray, fade: float = 0.55) -> np.ndarray:
"""Convert to gray-white wash: gray * (1-fade) + white * fade."""
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
gray_bgr = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR).astype(np.float32)
white = np.full_like(gray_bgr, 255.0)
out = gray_bgr * (1.0 - fade) + white * fade
return out.astype(np.uint8)
def overlay_selection(
frame_bgr: np.ndarray, mask_grid: np.ndarray, patch: int,
outline: bool = True, fade: float = 0.55,
) -> np.ndarray:
"""Composite: kept patches keep color; dropped patches become gray-white.
Optionally draw a thin outline around kept patches."""
h, w = frame_bgr.shape[:2]
hb, wb = mask_grid.shape
pix_mask = np.kron(mask_grid, np.ones((patch, patch), dtype=np.uint8))
pix_mask = pix_mask[:h, :w]
bg = faded_background(frame_bgr, fade=float(fade))
keep = pix_mask.astype(bool)[..., None]
out = np.where(keep, frame_bgr, bg)
if outline:
for i in range(hb):
for j in range(wb):
if mask_grid[i, j]:
y0, x0 = i * patch, j * patch
cv2.rectangle(
out, (x0, y0), (x0 + patch - 1, y0 + patch - 1),
(0, 220, 255), 1,
)
return out
def _normalize_scores(grids: List[np.ndarray], pct: float = 99.0) -> np.ndarray:
"""Stack into [N, hb, wb], shift by per-video min, divide by global pct.
Using the percentile (instead of max) suppresses outlier patches the same
way codec_tools does with bitcost_pct=99."""
arr = np.stack(grids, axis=0).astype(np.float32)
arr = arr - arr.min()
cap = np.percentile(arr, pct) if arr.size else 1.0
if cap <= 1e-8:
cap = float(arr.max() or 1.0)
arr = np.clip(arr / cap, 0.0, 1.0)
return arr
def overlay_heatmap(
frame_bgr: np.ndarray, score_grid: np.ndarray, patch: int,
alpha: float = 0.55,
) -> np.ndarray:
"""Render a continuous JET heatmap of patch scores blended over the frame.
Low score = blue, high score = red. `score_grid` is in [0, 1]."""
h, w = frame_bgr.shape[:2]
score = (np.clip(score_grid, 0.0, 1.0) * 255.0).astype(np.uint8)
pix = np.kron(score, np.ones((patch, patch), dtype=np.uint8))
pix = pix[:h, :w]
heat = cv2.applyColorMap(pix, cv2.COLORMAP_JET)
out = cv2.addWeighted(frame_bgr, 1.0 - alpha, heat, alpha, 0.0)
return out
def overlay_sbs(
frame_bgr: np.ndarray, mask_grid: np.ndarray, score_grid: np.ndarray,
patch: int, alpha: float = 0.55, fade: float = 0.55,
) -> np.ndarray:
"""Side-by-side: [selection | heatmap] with a thin separator."""
left = overlay_selection(frame_bgr, mask_grid, patch, outline=True, fade=fade)
right = overlay_heatmap(frame_bgr, score_grid, patch, alpha=alpha)
h, w = left.shape[:2]
sep = np.full((h, 4, 3), 30, dtype=np.uint8)
sbs = np.concatenate([left, sep, right], axis=1)
cv2.putText(sbs, "selection", (8, 22), cv2.FONT_HERSHEY_SIMPLEX,
0.6, (255, 255, 255), 2, cv2.LINE_AA)
cv2.putText(sbs, "heatmap", (w + 12, 22), cv2.FONT_HERSHEY_SIMPLEX,
0.6, (255, 255, 255), 2, cv2.LINE_AA)
return sbs
def write_mp4(frames: List[np.ndarray], path: str, fps: float) -> None:
"""Write H.264 mp4 via imageio-ffmpeg's bundled ffmpeg (browser-friendly)."""
if not frames:
raise ValueError("no frames to write")
h, w = frames[0].shape[:2]
ff = imageio_ffmpeg.get_ffmpeg_exe()
cmd = [
ff, "-y", "-loglevel", "error",
"-f", "rawvideo", "-vcodec", "rawvideo",
"-s", f"{w}x{h}", "-pix_fmt", "bgr24",
"-r", f"{fps:.3f}", "-i", "-",
"-an", "-vcodec", "libx264", "-pix_fmt", "yuv420p",
"-preset", "veryfast", "-crf", "23",
"-movflags", "+faststart",
path,
]
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
try:
for f in frames:
if f.shape[0] % 2 or f.shape[1] % 2:
f = f[: f.shape[0] // 2 * 2, : f.shape[1] // 2 * 2]
proc.stdin.write(np.ascontiguousarray(f).tobytes())
proc.stdin.close()
err = proc.stderr.read().decode("utf-8", errors="ignore")
rc = proc.wait()
if rc != 0:
raise RuntimeError(f"ffmpeg failed (rc={rc}): {err}")
finally:
if proc.poll() is None:
proc.kill()
def _build_ippp_canvas(
frames: List[np.ndarray], masks: List[np.ndarray],
i_idx: int, p_range: range, patch: int,
) -> Tuple[np.ndarray, int]:
"""Build one IPPP canvas at the *same dimensions as the I-frame*.
Codec convention: every frame in a group shares the picture size; a
P-frame only encodes the macroblocks that need to change. So:
1. Initialise the canvas to the I-frame's full image.
2. For each P-frame in time order, replace each saliency-selected
patch position with the P-frame's pixels at that position.
3. The canvas now reads as 'what the encoder would have reconstructed
at the end of this group' β€” same shape as the I-frame, with the
high-energy regions updated by later P-frames.
Returns (canvas, n_overlays) where n_overlays is the count of P-frame
patches that overwrote a position (a position may be hit multiple
times by different P-frames; we count each hit)."""
i_frame = frames[i_idx]
h, w = i_frame.shape[:2]
hb, wb = h // patch, w // patch
canvas_h, canvas_w = hb * patch, wb * patch
canvas = i_frame[:canvas_h, :canvas_w].copy()
n_overlays = 0
for k in p_range:
if k >= len(frames):
break
f, m = frames[k], masks[k]
for i in range(m.shape[0]):
for j in range(m.shape[1]):
if m[i, j]:
canvas[
i * patch:(i + 1) * patch,
j * patch:(j + 1) * patch,
] = f[
i * patch:(i + 1) * patch,
j * patch:(j + 1) * patch,
]
n_overlays += 1
return canvas, n_overlays
def _allocate_canvases_per_group(
target_canvases: int, num_groups: int,
) -> List[int]:
"""Split a total target canvas count across N groups as evenly as
possible; the first `remainder` groups get +1 each."""
target = max(1, int(target_canvases))
n = max(1, int(num_groups))
base, rem = divmod(target, n)
out = [base + (1 if i < rem else 0) for i in range(n)]
# Floor to at least 1 canvas per group so no group is invisible.
return [max(1, x) for x in out]
def pack_canvases_per_group(
frames: List[np.ndarray],
masks: List[np.ndarray],
groups: List[Tuple[int, int]],
patch: int,
target_canvases: int = 4,
) -> Tuple[List[np.ndarray], List[Tuple[int, int, int]], int]:
"""Pack exactly `target_canvases` IPPP canvases for the whole video,
distributing them across GOP groups as evenly as possible.
Each group's frame range [s..e] is split into K consecutive sub-ranges
(K = canvases allocated to that group). Each sub-range [ss..ee] becomes
one canvas:
- frame ss is the I-frame: its whole image goes to the canvas top.
- frames ss+1..ee are P-frames: only saliency-selected patches go
below the I-frame, packed time-major in a wb-wide raster grid.
Returns:
canvases β€” list of np.ndarray, length == target_canvases
(or fewer if some groups have only 1 frame).
sub_ranges β€” list of (group_idx, sub_start, sub_end) parallel to
canvases, for caption / debugging.
total_selected β€” I-frame patches (counted as full grid) + P-frame
selected patches across all canvases.
"""
canvases: List[np.ndarray] = []
sub_ranges: List[Tuple[int, int, int]] = []
total_selected = 0
if not groups or not frames:
return [np.full((patch, patch, 3), 255, dtype=np.uint8)], [(0, 0, 0)], 0
per_group_counts = _allocate_canvases_per_group(target_canvases, len(groups))
for g_idx, (s, e) in enumerate(groups):
if s >= len(frames):
continue
group_len = e - s + 1
k = max(1, min(per_group_counts[g_idx], group_len))
# Split [s..e] into k consecutive sub-ranges of (almost) equal size.
base, rem = divmod(group_len, k)
cursor = s
for sub_i in range(k):
sub_len = base + (1 if sub_i < rem else 0)
ss = cursor
ee = min(e, cursor + sub_len - 1)
cursor = ee + 1
canvas, n_p_overlays = _build_ippp_canvas(
frames, masks, i_idx=ss, p_range=range(ss + 1, ee + 1),
patch=patch,
)
canvases.append(canvas)
sub_ranges.append((g_idx, ss, ee))
# Accounting:
# - I-frame counts as the full grid (anchor, every position
# starts from it).
# - Each P-frame overlay is +1 (positions may be overlaid
# multiple times by later P-frames; we count each hit).
hb, wb = frames[ss].shape[0] // patch, frames[ss].shape[1] // patch
total_selected += hb * wb + n_p_overlays
if not canvases:
canvases = [np.full((patch, patch, 3), 255, dtype=np.uint8)]
sub_ranges = [(0, 0, 0)]
return canvases, sub_ranges, total_selected
def make_charts(
grids: List[np.ndarray],
masks: List[np.ndarray],
frame_ids: List[int],
fps: float,
total_duration_sec: float,
total_patches_budget: int,
saliency_signal: str,
groups: List[Tuple[int, int]] = None,
gop_label: str = "global",
):
"""One overlaid step chart: cumulative patches selected vs time, for
the codec saliency curve and a uniform-sampling baseline at the same
total budget.
X = time (s)
Y = cumulative count of selected patches
Both curves end near the budget (codec: == total selected; uniform:
n_uniform_frames Γ— grid_size, ≀ budget). The codec curve rises in
bursts where saliency is high; uniform rises in equal steps."""
fig, ax = plt.subplots(figsize=(9.2, 3.6), constrained_layout=True)
fps_safe = float(fps) if fps and fps > 0 else 25.0
if grids:
hb, wb = grids[0].shape
else:
hb = wb = 1
grid_size = hb * wb
duration = float(total_duration_sec) if total_duration_sec and total_duration_sec > 0 else (
(max(frame_ids) / fps_safe) if frame_ids else 1.0
)
# ─── Build step curves ──────────────────────────────────────────────
def _step(xs, cum):
"""Return (xx, yy) for a left-continuous step plot through (xs, cum)."""
if not xs:
return [0.0, duration], [0.0, 0.0]
xx, yy = [0.0], [0.0]
prev = 0.0
for x, c in zip(xs, cum):
xx.extend([x, x]); yy.extend([prev, c])
prev = c
xx.append(duration); yy.append(prev)
return xx, yy
times = [fid / fps_safe for fid in frame_ids]
counts = [int(m.sum()) for m in masks]
codec_cum = list(np.cumsum(counts)) if counts else []
codec_total = int(codec_cum[-1]) if codec_cum else 0
xx_c, yy_c = _step(times, codec_cum)
# Uniform baseline: same N frames as codec (at the same timestamps),
# but the patch budget is split equally across them. Both curves now
# reach the same budget β€” what differs is *which* patches each method
# picks within each frame (saliency vs equal-allocation).
n_uniform = len(times) if times else 1
budget_int = int(total_patches_budget)
if n_uniform > 0 and budget_int > 0:
base = budget_int // n_uniform
rem = budget_int - base * n_uniform
uni_per_step = [base + (1 if i < rem else 0) for i in range(n_uniform)]
else:
uni_per_step = []
uni_cum = list(np.cumsum(uni_per_step)) if uni_per_step else []
uni_total = int(uni_cum[-1]) if uni_cum else 0
uni_times = times if times else [duration * 0.5]
xx_u, yy_u = _step(uni_times, uni_cum)
# ─── Plot ───────────────────────────────────────────────────────────
# Per-frame breakdown for the legend.
if counts:
c_min, c_max = int(min(counts)), int(max(counts))
c_avg = codec_total / max(1, len(counts))
codec_lbl = (
f"Codec Β· {saliency_signal} ({codec_total:,} total Β· "
f"per-frame min {c_min} Β· avg {c_avg:.1f} Β· max {c_max})"
)
else:
codec_lbl = f"Codec Β· {saliency_signal} ({codec_total:,} patches)"
if uni_per_step:
u_per = uni_per_step[0]
u_extra = sum(1 for x in uni_per_step if x != u_per)
if u_extra == 0:
uni_lbl = f"Uniform baseline ({uni_total:,} total Β· {u_per}/frame)"
else:
uni_lbl = (
f"Uniform baseline ({uni_total:,} total Β· "
f"~{budget_int // max(1, n_uniform)}/frame, Β±1)"
)
else:
uni_lbl = f"Uniform baseline ({uni_total:,} patches)"
ax.fill_between(xx_c, yy_c, step=None, alpha=0.12, color="#4f46e5")
ax.plot(xx_c, yy_c, color="#4f46e5", linewidth=2.2, label=codec_lbl)
ax.fill_between(xx_u, yy_u, step=None, alpha=0.10, color="#06b6d4")
ax.plot(
xx_u, yy_u, color="#06b6d4", linewidth=2.2, linestyle="--",
label=uni_lbl,
)
# Budget reference line
budget = int(total_patches_budget)
ax.axhline(budget, color="#94a3b8", linestyle=":", linewidth=1.1, alpha=0.85)
ax.text(
duration * 0.995, budget * 1.015,
f"budget {budget:,}", color="#475569",
fontsize=8.5, va="bottom", ha="right",
)
# Group boundaries
if groups and len(groups) > 1 and times:
for (_, end_idx) in groups[:-1]:
if end_idx + 1 < len(times):
bx = (times[end_idx] + times[end_idx + 1]) / 2.0
else:
bx = times[end_idx]
ax.axvline(
bx, color="#cbd5e1", linestyle=(0, (3, 3)),
alpha=0.55, linewidth=0.8,
)
n_groups = len(groups) if groups else 1
gop_str = gop_label if gop_label in ("global", "dynamic") else f"GOP={gop_label}"
ax.set_title(
f"Cumulative patches selected over time Β· {saliency_signal} Β· "
f"{gop_str} ({n_groups} groups)",
fontsize=11, color="#1e293b",
)
ax.set_xlabel("time (s)", fontsize=9.5)
ax.set_ylabel("# patches selected (cumulative)", fontsize=9.5)
ax.set_xlim(-duration * 0.02, duration * 1.02)
ymax = max(budget, codec_total, uni_total) * 1.08 + 1
ax.set_ylim(0, ymax)
ax.tick_params(axis="both", labelsize=8.5)
ax.grid(True, alpha=0.25, linestyle="--", axis="y")
ax.spines[["top", "right"]].set_visible(False)
ax.legend(loc="upper left", fontsize=9, frameon=False)
fig.patch.set_facecolor("white")
return fig
def process(
video_path,
sample_frames: int,
patch_size: int,
total_patches: int,
max_pixels: int,
viz_mode: str = "selection",
heatmap_alpha: float = 0.55,
start_sec: float = 0.0,
end_sec: float = 0.0,
saliency_signal: str = "gradient",
score_log_scale: bool = False,
bitcost_pct: float = 99.0,
fade_strength: float = 0.55,
gop: str = "global",
target_canvases: int = 4,
progress=gr.Progress(track_tqdm=False),
):
if not video_path:
return None, [], "Please upload a video.", None
t0 = time.time()
progress(0.05, desc="Reading metadata")
meta = video_metadata(video_path)
total = meta.get("total_frames") or 0
if total <= 0:
return None, [], json.dumps(
{"error": "Could not read frame count.", "metadata": meta},
indent=2, ensure_ascii=False,
), None
progress(0.10, desc="Sampling frames")
fps = float(meta.get("fps") or 0.0)
s_sec = max(0.0, float(start_sec or 0.0))
e_sec = float(end_sec or 0.0)
if fps > 0 and (s_sec > 0 or e_sec > 0):
f_start = max(0, int(round(s_sec * fps)))
f_end = (
min(total - 1, int(round(e_sec * fps)) - 1)
if e_sec > 0 else total - 1
)
if f_end <= f_start:
f_end = total - 1
window_total = f_end - f_start + 1
if int(sample_frames) >= window_total:
fids = list(range(f_start, f_end + 1))
else:
fids = [
int(round(x))
for x in np.linspace(f_start, f_end, int(sample_frames))
]
else:
f_start, f_end = 0, total - 1
fids = sample_frame_ids(total, int(sample_frames))
raw = decode_frames(video_path, fids)
if not raw:
return None, [], json.dumps(
{"error": "Failed to decode frames.", "metadata": meta},
indent=2, ensure_ascii=False,
), None
progress(0.25, desc="smart_resize")
resized = [smart_resize(f, int(max_pixels), int(patch_size)) for f in raw]
th, tw = resized[0].shape[:2]
resized = [
cv2.resize(f, (tw, th), interpolation=cv2.INTER_AREA)
if f.shape[:2] != (th, tw) else f
for f in resized
]
progress(0.40, desc=f"Scoring patches ({saliency_signal})")
grids = compute_score_grids(resized, int(patch_size), saliency_signal)
if score_log_scale:
grids = [np.log1p(np.clip(g, 0.0, None)) for g in grids]
masks, actual_selected, groups, gop_resolved = grouped_topk_masks(
grids, int(total_patches), str(gop or "global"),
)
norm_scores = _normalize_scores(grids, pct=float(bitcost_pct))
mode = (viz_mode or "selection").lower()
if mode not in ("selection", "heatmap", "sbs"):
mode = "selection"
progress(0.60, desc=f"Rendering {mode} video")
if mode == "heatmap":
vis = [
overlay_heatmap(f, s, int(patch_size), alpha=float(heatmap_alpha))
for f, s in zip(resized, norm_scores)
]
elif mode == "sbs":
vis = [
overlay_sbs(
f, m, s, int(patch_size),
alpha=float(heatmap_alpha), fade=float(fade_strength),
)
for f, m, s in zip(resized, masks, norm_scores)
]
else:
vis = [
overlay_selection(f, m, int(patch_size), fade=float(fade_strength))
for f, m in zip(resized, masks)
]
out_dir = tempfile.mkdtemp(prefix="codec_view_")
vis_path = os.path.join(out_dir, f"{mode}_vis.mp4")
vis_fps = max(2.0, min(8.0, (meta.get("fps") or 25.0) / 4.0))
write_mp4(vis, vis_path, vis_fps)
progress(0.85, desc="Packing canvases (IPPP)")
canvases, sub_ranges, n_selected = pack_canvases_per_group(
resized, masks, groups, int(patch_size),
target_canvases=int(target_canvases),
)
canvas_items: List[Tuple[str, str]] = []
for idx, canv in enumerate(canvases):
cp = os.path.join(out_dir, f"canvas_{idx:03d}.png")
cv2.imwrite(cp, canv)
g_idx, ss, ee = sub_ranges[idx] if idx < len(sub_ranges) else (0, idx, idx)
n_p = max(0, ee - ss)
caption = (
f"Canvas {idx + 1}/{len(canvases)} Β· group {g_idx + 1} Β· "
f"I@#{ss} + {n_p} P-frame{'s' if n_p != 1 else ''}"
)
canvas_items.append((cp, caption))
hb, wb = grids[0].shape
grid_size = int(grids[0].shape[0] * grids[0].shape[1]) if grids else 0
# Uniform baseline samples the SAME number of frames as codec, evenly
# spaced in time; the budget is split equally across them.
n_uniform = max(1, len(fids))
uniform_per_frame = (
int(int(total_patches)) // n_uniform if n_uniform > 0 else 0
)
info = {
"input": meta,
"params": {
"sample_frames": int(sample_frames),
"patch_size": int(patch_size),
"total_patches_budget": int(total_patches),
"max_pixels": int(max_pixels),
"start_sec": float(s_sec),
"end_sec": float(e_sec) if e_sec > 0 else None,
"saliency_signal": saliency_signal,
"score_log_scale": bool(score_log_scale),
"bitcost_pct": float(bitcost_pct),
"fade_strength": float(fade_strength),
"gop": gop_resolved,
"target_canvases": int(target_canvases),
},
"gop_groups": [
{
"start_frame_idx": int(s),
"end_frame_idx": int(e),
"n_frames": int(e - s + 1),
"selected": int(sum(int(m.sum()) for m in masks[s:e + 1])),
}
for (s, e) in groups
],
"frame_window": {
"first_decoded": int(f_start),
"last_decoded": int(f_end),
"actual_frame_ids": [int(x) for x in fids],
},
"codec_per_frame_patches": [int(m.sum()) for m in masks],
"uniform_baseline": {
"frames": int(n_uniform),
"patches_per_frame": int(uniform_per_frame),
"total_patches": int(uniform_per_frame * n_uniform),
"explanation": (
"Same N frames as codec, evenly spaced in time. The patch "
"budget is split equally per frame ({budget} Γ· {n} = "
"{per}); the codec, by contrast, concentrates the same "
"budget on high-saliency patches across those frames."
).format(
budget=int(total_patches),
n=int(n_uniform),
per=int(uniform_per_frame),
),
},
"resized_frame_size": f"{tw}x{th}",
"patch_grid_per_frame": f"{hb}x{wb} = {hb * wb} patches",
"actual_selected_total": int(actual_selected),
"total_selected_patches_incl_i_frames": int(n_selected),
"canvases": [
{
"index": i,
"size": f"{canvases[i].shape[1]}x{canvases[i].shape[0]}",
"group": int(sub_ranges[i][0]) if i < len(sub_ranges) else None,
"sub_range": list(sub_ranges[i][1:3]) if i < len(sub_ranges) else None,
"structure": "IPPP β€” first frame full (I), rest contribute "
"only their selected patches (P).",
}
for i in range(len(canvases))
],
"n_canvases": int(len(canvases)),
"vis_video_fps": round(vis_fps, 2),
"viz_mode": mode,
"heatmap_alpha": float(heatmap_alpha) if mode != "selection" else None,
"score_normalization": f"shift-min, /p{bitcost_pct:.1f}, clip"
+ (" (log1p applied)" if score_log_scale else ""),
"elapsed_sec": round(time.time() - t0, 2),
}
progress(0.95, desc="Building charts")
duration_sec = (total / fps) if fps > 0 else 0.0
chart_fig = make_charts(
grids, masks, fids, fps, duration_sec,
int(total_patches), saliency_signal,
groups=groups, gop_label=gop_resolved,
)
progress(1.0, desc="Done")
return (
vis_path, canvas_items,
json.dumps(info, indent=2, ensure_ascii=False),
chart_fig,
)
CUSTOM_CSS = """
:root, .gradio-container, .gradio-container.dark {
--ovc-grad: linear-gradient(135deg, #4f46e5 0%, #2563eb 50%, #06b6d4 100%);
--ovc-grad-soft: linear-gradient(135deg, rgba(79,70,229,0.10), rgba(6,182,212,0.10));
--ovc-ring: rgba(99,102,241,0.32);
--ovc-ring-strong: rgba(99,102,241,0.55);
}
.gradio-container { max-width: 1320px !important; margin: 0 auto !important; }
@keyframes ovc-shift {
0% { background-position: 0% 50%; }
50% { background-position: 100% 50%; }
100% { background-position: 0% 50%; }
}
@keyframes ovc-pulse {
0%, 100% { box-shadow: 0 6px 18px rgba(37, 99, 235, 0.32); }
50% { box-shadow: 0 8px 26px rgba(37, 99, 235, 0.50); }
}
@keyframes ovc-fade-in {
from { opacity: 0; transform: translateY(4px); }
to { opacity: 1; transform: translateY(0); }
}
/* Hero */
#ovc-hero {
text-align: center;
padding: 44px 16px 22px;
border-radius: 22px;
background:
radial-gradient(120% 80% at 50% -10%, rgba(79,70,229,0.20), transparent 60%),
linear-gradient(180deg, rgba(79,70,229,0.06), rgba(6,182,212,0.03)),
repeating-linear-gradient(0deg, rgba(99,102,241,0.05) 0 1px, transparent 1px 28px),
repeating-linear-gradient(90deg, rgba(99,102,241,0.05) 0 1px, transparent 1px 28px);
border: 1px solid rgba(99,102,241,0.22);
margin-bottom: 18px;
position: relative;
overflow: hidden;
}
#ovc-hero::after {
content: "";
position: absolute; inset: auto -20% -40% -20%;
height: 60%;
background: radial-gradient(60% 80% at 50% 0%, rgba(6,182,212,0.22), transparent 70%);
pointer-events: none;
}
#ovc-hero h1 {
font-size: 2.7rem;
font-weight: 800;
background: var(--ovc-grad);
background-size: 200% 200%;
animation: ovc-shift 9s ease-in-out infinite;
-webkit-background-clip: text;
background-clip: text;
color: transparent;
margin: 0 0 6px;
letter-spacing: -0.028em;
line-height: 1.04;
}
#ovc-hero p.tagline {
font-size: 1.05rem;
color: var(--body-text-color-subdued);
margin: 0 auto 16px;
max-width: 760px;
line-height: 1.6;
}
.ovc-links {
display: flex; flex-wrap: wrap; gap: 10px;
justify-content: center; margin: 14px auto 6px;
position: relative; z-index: 1;
}
.ovc-links a {
text-decoration: none;
font-weight: 600;
font-size: 0.9rem;
padding: 7px 14px;
border-radius: 999px;
background: var(--background-fill-primary, #fff);
border: 1px solid rgba(99,102,241,0.32);
color: #4338ca;
transition: transform 0.12s ease, box-shadow 0.18s ease,
background 0.18s ease, color 0.18s ease, border-color 0.18s ease;
display: inline-flex; align-items: center;
box-shadow: 0 1px 2px rgba(15,23,42,0.04);
}
.ovc-links a:hover {
background: var(--ovc-grad);
color: #fff;
border-color: transparent;
transform: translateY(-1px);
box-shadow: 0 6px 16px rgba(79,70,229,0.32);
}
.gradio-container.dark .ovc-links a {
background: rgba(30,41,59,0.7);
color: #c7d2fe;
border-color: rgba(99,102,241,0.4);
}
/* Cards */
.ovc-card {
border-radius: 16px !important;
padding: 16px 18px !important;
border: 1px solid rgba(148,163,184,0.26) !important;
background: var(--background-fill-primary) !important;
box-shadow: 0 1px 3px rgba(15,23,42,0.04);
transition: box-shadow 0.18s ease, border-color 0.18s ease, transform 0.18s ease;
animation: ovc-fade-in 0.32s ease-out;
}
.ovc-card:hover {
border-color: rgba(99,102,241,0.32) !important;
box-shadow: 0 6px 22px rgba(15,23,42,0.07);
}
/* Primary outputs: subtle accent ring + lift */
.ovc-card-primary {
border: 1px solid var(--ovc-ring) !important;
background:
linear-gradient(180deg, rgba(79,70,229,0.025), rgba(6,182,212,0.012)),
var(--background-fill-primary) !important;
box-shadow: 0 4px 18px rgba(79,70,229,0.08) !important;
}
.ovc-card-primary:hover {
border-color: var(--ovc-ring-strong) !important;
box-shadow: 0 10px 28px rgba(79,70,229,0.14) !important;
}
.ovc-card h3 {
display: inline-flex;
align-items: center;
gap: 8px;
font-size: 0.74rem !important;
font-weight: 700 !important;
text-transform: uppercase;
letter-spacing: 0.10em;
color: #4f46e5 !important;
background: rgba(79,70,229,0.08);
padding: 4px 10px !important;
border-radius: 999px;
margin: 0 0 12px !important;
}
.ovc-card h3::before {
content: "";
display: inline-block;
width: 6px; height: 6px; border-radius: 50%;
background: var(--ovc-grad);
transform: translateY(0);
}
/* Run button */
#ovc-run button {
width: 100%;
height: 54px !important;
font-size: 1.06rem !important;
font-weight: 700 !important;
letter-spacing: 0.01em;
background: var(--ovc-grad) !important;
background-size: 200% 200% !important;
animation: ovc-shift 6s ease-in-out infinite, ovc-pulse 2.6s ease-in-out infinite;
border: none !important;
color: #fff !important;
border-radius: 14px !important;
transition: transform 0.06s ease;
}
#ovc-run button:hover {
transform: translateY(-1px);
animation-play-state: paused;
}
#ovc-run button:active { transform: translateY(0); }
/* Preset buttons */
.ovc-preset button {
background: var(--ovc-grad-soft) !important;
color: #4338ca !important;
border: 1px solid rgba(79,70,229,0.25) !important;
border-radius: 10px !important;
font-weight: 600 !important;
transition: all 0.15s ease;
}
.ovc-preset button:hover {
background: var(--ovc-grad) !important;
color: #fff !important;
border-color: transparent !important;
}
/* Footer */
#ovc-footer {
text-align: center;
color: var(--body-text-color-subdued);
font-size: 0.80rem;
padding: 22px 8px 10px;
margin-top: 14px;
border-top: 1px solid rgba(148,163,184,0.18);
}
#ovc-footer code {
background: rgba(79,70,229,0.08);
padding: 1px 6px;
border-radius: 4px;
}
/* Tighter spacing for sliders inside cards */
.ovc-card .gradio-slider { margin-bottom: 4px !important; }
/* Tame Gradio's dark default placeholders inside our cards: blanket-override
any background on the inner wrappers, then paint a brand-tinted gradient on
the canonical containers. This lights up the empty Video/Image/Plot zones
so they no longer look like black holes. */
.ovc-card .video-container,
.ovc-card .image-container,
.ovc-card .image-frame,
.ovc-card .preview,
.ovc-card .plot-container,
.ovc-card .empty,
.ovc-card video,
.ovc-card [data-testid="video"],
.ovc-card [data-testid="image"],
.ovc-card .icon-button,
.ovc-card .options,
.ovc-card .source-selection,
.ovc-card .upload-container {
background: transparent !important;
background-color: transparent !important;
}
.ovc-card .container,
.ovc-card .wrap,
.ovc-card .video-container,
.ovc-card .image-container,
.ovc-card .plot-container {
border-radius: 12px !important;
}
.ovc-card .video-container,
.ovc-card .image-container,
.ovc-card .plot-container,
.ovc-card-primary .video-container,
.ovc-card-primary .image-container,
.ovc-card-primary .plot-container {
background: linear-gradient(180deg, rgba(99,102,241,0.05), rgba(6,182,212,0.02)) !important;
border: 1px dashed rgba(148,163,184,0.32) !important;
}
.ovc-card .gradio-video, .ovc-card .gradio-image, .ovc-card .gradio-plot {
border-color: rgba(148,163,184,0.22) !important;
background: transparent !important;
}
/* Empty placeholder text inside Gradio components */
.ovc-card .empty, .ovc-card .empty p, .ovc-card .empty span {
color: #94a3b8 !important;
}
/* Stats tile grid (rendered into a gr.HTML by render_stats_html) */
.ovc-stats {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(150px, 1fr));
gap: 10px;
}
.ovc-stat {
padding: 12px 14px;
border-radius: 14px;
background: linear-gradient(135deg, rgba(79,70,229,0.07), rgba(6,182,212,0.04));
border: 1px solid rgba(99,102,241,0.18);
transition: transform 0.18s ease, box-shadow 0.18s ease;
}
.ovc-stat:hover {
transform: translateY(-1px);
box-shadow: 0 6px 18px rgba(79,70,229,0.10);
}
.ovc-stat .value {
font-size: 1.55rem; font-weight: 800;
background: var(--ovc-grad);
-webkit-background-clip: text; background-clip: text; color: transparent;
letter-spacing: -0.02em;
line-height: 1.1;
word-break: break-word;
}
.ovc-stat .label {
font-size: 0.74rem; color: #64748b;
text-transform: uppercase; letter-spacing: 0.06em;
margin-top: 4px;
font-weight: 600;
}
/* ─── Mobile / narrow viewport adjustments ─────────────────────────── */
@media (max-width: 768px) {
.gradio-container { padding: 6px !important; }
/* Force the controls/outputs row to stack vertically on phones */
.gradio-container .ovc-main {
flex-direction: column !important;
gap: 12px !important;
}
.gradio-container .ovc-main > div {
width: 100% !important;
min-width: 0 !important;
max-width: 100% !important;
flex: 1 1 100% !important;
}
/* Hero scales down */
#ovc-hero { padding: 28px 14px 16px; border-radius: 16px; margin-bottom: 12px; }
#ovc-hero h1 { font-size: 2.05rem; letter-spacing: -0.02em; }
#ovc-hero p.tagline { font-size: 0.96rem; line-height: 1.5; margin-bottom: 12px; }
.ovc-links { gap: 6px; margin-top: 10px; }
.ovc-links a { font-size: 0.78rem; padding: 5px 10px; }
/* Cards tighter */
.ovc-card { padding: 12px 14px !important; border-radius: 14px !important; }
.ovc-card h3 { font-size: 0.70rem !important; margin-bottom: 8px !important; }
/* Run button */
#ovc-run button { height: 48px !important; font-size: 0.98rem !important; }
/* Stats tile sizing */
.ovc-stats { grid-template-columns: repeat(auto-fit, minmax(115px, 1fr)); gap: 8px; }
.ovc-stat { padding: 10px 12px; }
.ovc-stat .value { font-size: 1.25rem; }
.ovc-stat .label { font-size: 0.68rem; }
/* Outputs: shorter video so it does not dominate the screen */
.ovc-card video { max-height: 280px !important; }
}
@media (max-width: 480px) {
#ovc-hero { padding: 22px 12px 14px; }
#ovc-hero h1 { font-size: 1.7rem; }
#ovc-hero p.tagline { font-size: 0.9rem; }
/* Put each link on a row of two (browsers will pack 2 per row at this size) */
.ovc-links a { font-size: 0.74rem; padding: 4px 9px; }
#ovc-run button { height: 46px !important; font-size: 0.94rem !important; }
}
"""
THEME = gr.themes.Soft(
primary_hue="indigo",
secondary_hue="blue",
neutral_hue="slate",
font=[gr.themes.GoogleFont("Inter"), "system-ui", "sans-serif"],
).set(
body_background_fill="*neutral_50",
block_radius="14px",
button_primary_background_fill="*primary_500",
button_primary_background_fill_hover="*primary_600",
)
HERO_HTML = """
<div id="ovc-hero">
<h1>OneVision Encoder</h1>
<p class="tagline">
Codec-style patch saliency for video understanding &mdash; see which
patches the encoder picks from your video and pack them into the
canvas LLaVA-OneVision consumes.
</p>
<div class="ovc-links">
<a href="https://www.lmms-lab.com/onevision-encoder/index.html" target="_blank" rel="noopener">πŸ“&nbsp;Homepage</a>
<a href="https://huggingface.co/collections/lmms-lab-encoder/onevision-encoder" target="_blank" rel="noopener">πŸ€—&nbsp;Models</a>
<a href="https://arxiv.org/abs/2602.08683" target="_blank" rel="noopener">πŸ“„&nbsp;Tech Report</a>
<a href="docs/model_card.md" target="_blank" rel="noopener">πŸ“‹&nbsp;Model Card</a>
<a href="docs/data_card.md" target="_blank" rel="noopener">πŸ“Š&nbsp;Data Card</a>
</div>
</div>
"""
try:
_GR_MAJOR = int(gr.__version__.split(".")[0])
except Exception:
_GR_MAJOR = 4
_BLOCK_KW: dict = {"title": "OneVision Encoder"}
_LAUNCH_KW: dict = {}
if _GR_MAJOR >= 6:
# In Gradio 6.0 these moved off Blocks(...) onto launch(...).
_LAUNCH_KW["theme"] = THEME
_LAUNCH_KW["css"] = CUSTOM_CSS
else:
_BLOCK_KW["theme"] = THEME
_BLOCK_KW["css"] = CUSTOM_CSS
VIZ_CHOICES = [
("Selection β€” kept patches in color, others fade to gray-white", "selection"),
("Heatmap β€” full-frame JET overlay (blue=low, red=high)", "heatmap"),
("Both", "sbs"),
]
SIGNAL_CHOICES = [
("Gradient β€” intra-frame Sobel (sharp edges, textures, text)", "gradient"),
("Frame diff β€” inter-frame motion (movers, action)", "frame_diff"),
("Combined β€” 0.5Β·gradient + 0.5Β·frame_diff (general purpose)", "combined"),
]
with gr.Blocks(**_BLOCK_KW) as demo:
gr.HTML(HERO_HTML)
with gr.Row(equal_height=False, elem_classes="ovc-main"):
# ─── Controls (narrow column) ────────────────────────────────────
with gr.Column(scale=4, min_width=320):
with gr.Group(elem_classes="ovc-card"):
gr.Markdown("### Input")
video_in = gr.Video(label="Video", sources=["upload"], height=240)
with gr.Row(elem_classes="ovc-preset"):
btn_demo = gr.Button(
"Load demo video", size="sm",
visible=os.path.exists(DEMO_VIDEO_PATH),
)
with gr.Group(elem_classes="ovc-card"):
gr.Markdown("### Pipeline")
viz_mode = gr.Radio(
VIZ_CHOICES, value="selection",
label="Visualization mode",
)
sample_frames = gr.Slider(
4, 64, value=16, step=1, label="Sampled frames",
)
top_k = gr.Slider(
64, 8192, value=1024, step=32,
label="Total patches budget (whole video)",
info="The single budget shared across the whole video. "
"The codec saliency picks these patches GLOBALLY β€” "
"high-energy frames may contribute many, low-energy "
"frames may contribute zero.",
)
patch_size = gr.Radio(
PATCH_CHOICES, value=14, label="Patch size (px)",
)
gop = gr.Radio(
[
("GOP = 4 β€” fixed 4-frame groups", "4"),
("GOP = 8 β€” fixed 8-frame groups", "8"),
("GOP = 16 β€” fixed 16-frame groups", "16"),
("Dynamic β€” adaptive groups by saliency energy", "dynamic"),
],
value="8",
label="GOP (group of pictures)",
info="Splits sampled frames into groups; the patch budget "
"is allocated equally across groups, top-K within "
"each. Dynamic mode mirrors codec_tools' readiness "
"grouping (equal-energy groups).",
)
target_canvases = gr.Slider(
1, 16, value=4, step=1,
label="Target canvases (total per video)",
info="Fixed canvas count regardless of GOP. The budget is "
"split across groups; each group is further sliced "
"into sub-ranges of consecutive frames, one IPPP "
"canvas per sub-range.",
)
with gr.Accordion("Time window", open=False):
with gr.Row():
start_sec = gr.Number(value=0.0, precision=2, label="Start (s)")
end_sec = gr.Number(value=0.0, precision=2, label="End (s)")
gr.Markdown(
"<small>Set both to 0 to use the full video.</small>",
)
with gr.Accordion("Saliency", open=False):
saliency_signal = gr.Radio(
SIGNAL_CHOICES, value="gradient",
label="Scoring signal",
)
score_log_scale = gr.Checkbox(
value=False,
label="Apply log1p to scores",
info="Compresses dynamic range β€” brings up mid-energy patches.",
)
bitcost_pct = gr.Slider(
80.0, 99.9, value=99.0, step=0.1,
label="Heatmap normalization percentile",
info="Higher = harder to saturate red; lower = more vivid.",
)
with gr.Accordion("Visual style", open=False):
heatmap_alpha = gr.Slider(
0.1, 0.9, value=0.55, step=0.05,
label="Heatmap blend Ξ±",
)
fade_strength = gr.Slider(
0.0, 0.9, value=0.55, step=0.05,
label="Selection fade strength",
)
max_pixels = gr.Slider(
40000, 400000, value=150000, step=10000,
label="Max pixels per frame",
)
with gr.Row(elem_id="ovc-run"):
run_btn = gr.Button("Run pipeline", variant="primary")
# ─── Outputs (wide column) ───────────────────────────────────────
with gr.Column(scale=6, min_width=420):
with gr.Group(elem_classes="ovc-card ovc-card-primary"):
gr.Markdown("### Patch selection visualization")
vis_out = gr.Video(
label="", show_label=False, autoplay=True, height=420,
)
with gr.Group(elem_classes="ovc-card ovc-card-primary"):
gr.Markdown("### Cumulative patches over time")
gr.Markdown(
"<small>Same number of sampled frames and the same total "
"patch budget for both methods. <b>Indigo</b>: codec "
"saliency β€” rises in bursts where the frames carry more "
"information. <b>Cyan (dashed)</b>: uniform baseline β€” "
"the same budget split equally per frame, so each step "
"has the same height. Both curves end exactly at the "
"dotted <b>budget</b> reference line.</small>"
)
chart_out = gr.Plot(label="", show_label=False)
with gr.Row():
with gr.Column(scale=1):
with gr.Group(elem_classes="ovc-card"):
gr.Markdown("### Packed canvases (one per GOP group)")
gr.Markdown(
"<small>Each canvas is one GOP group rendered in "
"<b>IPPP order</b>: the group's first frame is the "
"<b>I-frame</b> kept whole (top), followed by the "
"<b>P-frame</b> selected patches packed below.</small>"
)
canvas_out = gr.Gallery(
label="", show_label=False,
columns=2, rows=2, height=380,
object_fit="contain",
preview=True,
)
with gr.Column(scale=1):
with gr.Group(elem_classes="ovc-card"):
gr.Markdown("### Raw JSON")
gr.Markdown(
"<small>Full reproducible record of this run "
"(params, frame ids, group spans). Collapsed by "
"default β€” click to expand.</small>"
)
with gr.Accordion("Show full JSON", open=False):
info_out = gr.Code(
label="", language="json", lines=18,
)
gr.HTML(
'<div id="ovc-footer">'
'<b>OneVision Encoder</b> Β· codec-style patch saliency demo Β· '
'Sobel + frame-diff stand in for the ffmpeg bitcost patch Β· '
'global top-K selection across all sampled frames.'
'</div>'
)
run_btn.click(
process,
inputs=[
video_in, sample_frames, patch_size, top_k, max_pixels,
viz_mode, heatmap_alpha,
start_sec, end_sec,
saliency_signal, score_log_scale, bitcost_pct, fade_strength,
gop, target_canvases,
],
outputs=[vis_out, canvas_out, info_out, chart_out],
)
btn_demo.click(
lambda: DEMO_PRESET,
inputs=None,
outputs=[
video_in, sample_frames, patch_size, top_k, max_pixels,
viz_mode, heatmap_alpha, start_sec, end_sec,
saliency_signal, score_log_scale, bitcost_pct, fade_strength,
gop, target_canvases,
],
)
if __name__ == "__main__":
demo.launch(
server_name="0.0.0.0",
server_port=int(os.environ.get("PORT", 7860)),
**_LAUNCH_KW,
)