| """OneVision Encoder Codec View. |
| |
| A simplified, dependency-light port of the codec_tools pipeline from |
| lmms-eval-ov2. The original tool relies on a bitcost-patched ffmpeg 5.1 to |
| score every macroblock by its actual encoding bit cost; we approximate that |
| saliency signal with a Sobel gradient magnitude per patch (high gradient = |
| high local complexity = roughly what the encoder would spend bits on). |
| |
| Pipeline (mirrors codec_tools/pipeline/process_video_bitcost_readiness.py): |
| 1. Uniformly sample N frames from the input video. |
| 2. smart_resize each frame so dims are multiples of `patch` and the |
| total pixel count <= max_pixels. |
| 3. Slice every frame into a patch grid; score each patch by its |
| Sobel gradient magnitude mean. |
| 4. Pick the top-K highest-scoring patches per frame. |
| 5. Render a "selection visualization" video: kept patches stay in |
| full color, dropped patches are faded to a gray-white wash so the |
| viewer can see exactly which patches the codec stage chose. |
| 6. Pack the selected patches in time-order, raster scan, into a |
| single canvas image (the artifact LLaVA-OneVision2 consumes). |
| """ |
|
|
| import json |
| import math |
| import os |
| import shutil |
| import subprocess |
| import tempfile |
| import time |
| from typing import List, Tuple |
|
|
| import cv2 |
| import gradio as gr |
| import imageio_ffmpeg |
| import matplotlib |
| matplotlib.use("Agg") |
| import matplotlib.pyplot as plt |
| import numpy as np |
|
|
|
|
| PATCH_CHOICES = [14, 16, 28] |
|
|
| DEMO_VIDEO_PATH = os.path.join( |
| os.path.dirname(os.path.abspath(__file__)), |
| "examples", "demo_codec_heatmap.mp4", |
| ) |
| DEMO_PRESET = ( |
| DEMO_VIDEO_PATH, |
| 16, |
| 14, |
| 1024, |
| 150000, |
| "sbs", |
| 0.55, |
| 0.0, 0.0, |
| "combined", |
| True, |
| 96.0, |
| 0.55, |
| "dynamic", |
| 4, |
| ) |
|
|
|
|
| def smart_resize(frame: np.ndarray, max_pixels: int, factor: int) -> np.ndarray: |
| """Resize so h,w are multiples of `factor` and h*w <= max_pixels.""" |
| h, w = frame.shape[:2] |
| pixels = h * w |
| if pixels > max_pixels: |
| scale = math.sqrt(max_pixels / pixels) |
| h = max(factor, int(h * scale)) |
| w = max(factor, int(w * scale)) |
| h = max(factor, (h // factor) * factor) |
| w = max(factor, (w // factor) * factor) |
| return cv2.resize(frame, (w, h), interpolation=cv2.INTER_AREA) |
|
|
|
|
| def sample_frame_ids(total: int, n: int) -> List[int]: |
| if total <= 0: |
| return [] |
| if n >= total: |
| return list(range(total)) |
| return [int(round(i)) for i in np.linspace(0, total - 1, n)] |
|
|
|
|
| def decode_frames(video_path: str, frame_ids: List[int]) -> List[np.ndarray]: |
| cap = cv2.VideoCapture(video_path) |
| if not cap.isOpened(): |
| return [] |
| frames: List[np.ndarray] = [] |
| for fid in frame_ids: |
| cap.set(cv2.CAP_PROP_POS_FRAMES, int(fid)) |
| ok, fr = cap.read() |
| if ok: |
| frames.append(fr) |
| cap.release() |
| return frames |
|
|
|
|
| def video_metadata(video_path: str) -> dict: |
| cap = cv2.VideoCapture(video_path) |
| total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| fps = float(cap.get(cv2.CAP_PROP_FPS) or 0.0) |
| w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| cap.release() |
| meta = { |
| "total_frames": total, |
| "fps": round(fps, 3), |
| "width": w, |
| "height": h, |
| } |
| if shutil.which("ffprobe"): |
| try: |
| r = subprocess.run( |
| [ |
| "ffprobe", "-v", "quiet", "-select_streams", "v:0", |
| "-show_entries", "stream=codec_name,bit_rate,pix_fmt,profile", |
| "-of", "json", video_path, |
| ], |
| capture_output=True, text=True, check=True, timeout=15, |
| ) |
| data = json.loads(r.stdout).get("streams", [{}])[0] |
| meta["codec"] = data.get("codec_name") |
| meta["pix_fmt"] = data.get("pix_fmt") |
| meta["profile"] = data.get("profile") |
| meta["bitrate_bps"] = data.get("bit_rate") |
| except Exception as e: |
| meta["ffprobe_error"] = str(e) |
| return meta |
|
|
|
|
| def patch_score_grid(frame_bgr: np.ndarray, patch: int) -> np.ndarray: |
| """Return [hb, wb] grid of Sobel gradient magnitude means per patch.""" |
| gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY).astype(np.float32) |
| gx = cv2.Sobel(gray, cv2.CV_32F, 1, 0, ksize=3) |
| gy = cv2.Sobel(gray, cv2.CV_32F, 0, 1, ksize=3) |
| mag = np.sqrt(gx * gx + gy * gy) |
| h, w = mag.shape |
| hb, wb = h // patch, w // patch |
| mag = mag[: hb * patch, : wb * patch] |
| grid = mag.reshape(hb, patch, wb, patch).mean(axis=(1, 3)) |
| return grid.astype(np.float32) |
|
|
|
|
| def patch_score_frame_diff( |
| prev_bgr: np.ndarray, cur_bgr: np.ndarray, patch: int, |
| ) -> np.ndarray: |
| """Inter-frame absdiff per patch β proxy for motion / temporal complexity.""" |
| if prev_bgr is None or prev_bgr.shape != cur_bgr.shape: |
| return patch_score_grid(cur_bgr, patch) |
| diff = cv2.absdiff(prev_bgr, cur_bgr).mean(axis=2).astype(np.float32) |
| h, w = diff.shape |
| hb, wb = h // patch, w // patch |
| diff = diff[: hb * patch, : wb * patch] |
| return diff.reshape(hb, patch, wb, patch).mean(axis=(1, 3)) |
|
|
|
|
| def compute_score_grids( |
| frames: List[np.ndarray], patch: int, signal: str, |
| ) -> List[np.ndarray]: |
| """Build per-frame patch score grids from one of three signals: |
| - 'gradient' β Sobel magnitude only (intra-frame complexity) |
| - 'frame_diff' β absdiff vs previous frame (temporal motion) |
| - 'combined' β 0.5 * gradient_norm + 0.5 * frame_diff_norm |
| For 'combined', each component is independently shifted to [0,1] across |
| the whole sample so they contribute on equal footing.""" |
| sig = (signal or "gradient").lower() |
| if sig == "gradient": |
| return [patch_score_grid(f, patch) for f in frames] |
| if sig == "frame_diff": |
| out = [] |
| prev = None |
| for f in frames: |
| out.append(patch_score_frame_diff(prev, f, patch)) |
| prev = f |
| return out |
| |
| g = np.stack([patch_score_grid(f, patch) for f in frames], axis=0) |
| d_list = [] |
| prev = None |
| for f in frames: |
| d_list.append(patch_score_frame_diff(prev, f, patch)) |
| prev = f |
| d = np.stack(d_list, axis=0) |
|
|
| def _norm01(a: np.ndarray) -> np.ndarray: |
| a = a.astype(np.float32) - a.min() |
| m = a.max() |
| return a / m if m > 1e-8 else a |
|
|
| combined = 0.5 * _norm01(g) + 0.5 * _norm01(d) |
| return [combined[i] for i in range(combined.shape[0])] |
|
|
|
|
| def topk_mask(score: np.ndarray, k: int) -> np.ndarray: |
| """Per-frame top-K mask (legacy helper, no longer used by process()).""" |
| flat = score.flatten() |
| if k >= flat.size: |
| return np.ones_like(score, dtype=np.uint8) |
| if k <= 0: |
| return np.zeros_like(score, dtype=np.uint8) |
| thresh = np.partition(flat, -k)[-k] |
| return (score >= thresh).astype(np.uint8) |
|
|
|
|
| def global_topk_masks( |
| grids: List[np.ndarray], total_k: int, |
| ) -> Tuple[List[np.ndarray], int]: |
| """Pick the top `total_k` highest-scoring patches GLOBALLY across all |
| sampled frames, return one mask per frame plus the actual count. |
| |
| Some frames may end up with zero patches (low energy throughout) while |
| others may contribute many β that's the whole point: the codec-style |
| saliency lets the budget concentrate where it matters.""" |
| if not grids: |
| return [], 0 |
| arr = np.stack(grids, axis=0).astype(np.float32) |
| N, hb, wb = arr.shape |
| flat = arr.reshape(-1) |
| if total_k >= flat.size: |
| masks = [np.ones((hb, wb), dtype=np.uint8) for _ in range(N)] |
| return masks, int(flat.size) |
| if total_k <= 0: |
| return [np.zeros((hb, wb), dtype=np.uint8) for _ in range(N)], 0 |
| thresh = np.partition(flat, -total_k)[-total_k] |
| bool_mask = (arr >= thresh) |
| actual = int(bool_mask.sum()) |
| return [bool_mask[i].astype(np.uint8) for i in range(N)], actual |
|
|
|
|
| def build_dynamic_groups( |
| grids: List[np.ndarray], target_groups: int = 4, min_group_frames: int = 1, |
| ) -> List[Tuple[int, int]]: |
| """Adaptive temporal grouping by cumulative saliency energy. |
| |
| Walk sampled frames in time order, accumulate frame-level score sums, |
| and close the current group once the running total reaches |
| `total_energy / target_groups`. Groups end up roughly equal in |
| *information content* rather than equal in frame count β this is the |
| same intuition as codec_tools' readiness mode, simplified for the |
| demo (no temporal-coverage / marginal-gain refinement).""" |
| n = len(grids) |
| if n == 0: |
| return [] |
| if n <= target_groups: |
| return [(i, i) for i in range(n)] |
|
|
| energies = np.array([float(g.sum()) for g in grids], dtype=np.float64) |
| total = energies.sum() |
| if total <= 1e-8: |
| |
| size = max(1, n // target_groups) |
| groups: List[Tuple[int, int]] = [] |
| cursor = 0 |
| while cursor < n and len(groups) < target_groups: |
| end = min(n - 1, cursor + size - 1) |
| if len(groups) == target_groups - 1: |
| end = n - 1 |
| groups.append((cursor, end)) |
| cursor = end + 1 |
| return groups |
|
|
| target_per_group = total / target_groups |
| groups = [] |
| start = 0 |
| cum = 0.0 |
| for i in range(n): |
| cum += energies[i] |
| groups_left = target_groups - len(groups) - 1 |
| frames_left_after = n - i - 1 |
| |
| |
| threshold_hit = cum >= target_per_group |
| room_ok = frames_left_after >= groups_left * min_group_frames |
| size_ok = (i - start + 1) >= min_group_frames |
| if threshold_hit and room_ok and size_ok and len(groups) < target_groups - 1: |
| groups.append((start, i)) |
| start = i + 1 |
| cum = 0.0 |
| |
| if start <= n - 1: |
| groups.append((start, n - 1)) |
| return groups |
|
|
|
|
| def grouped_topk_masks( |
| grids: List[np.ndarray], total_k: int, gop: str, |
| ) -> Tuple[List[np.ndarray], int, List[Tuple[int, int]], str]: |
| """Select patches under a GOP grouping strategy. |
| |
| GOP modes: |
| - "global": one big group across the whole video β top-K global. |
| - "<int>" (e.g. "4"/"8"/"16"): fixed group size in frames; the |
| budget is split equally across groups, top-K picked within each. |
| - "dynamic": adaptive groups (see build_dynamic_groups), targeting |
| 4 groups by default; each group gets an equal share of the budget. |
| |
| Returns (per-frame masks, actual selected count, [(start,end),...] groups, resolved_label). |
| """ |
| n = len(grids) |
| if n == 0: |
| return [], 0, [], gop |
|
|
| mode = (gop or "global").strip().lower() |
|
|
| if mode in ("global", "none", "0", ""): |
| masks, actual = global_topk_masks(grids, int(total_k)) |
| return masks, actual, [(0, n - 1)], "global" |
|
|
| if mode == "dynamic": |
| groups = build_dynamic_groups(grids, target_groups=min(4, max(1, n))) |
| else: |
| try: |
| g_size = max(1, int(mode)) |
| except ValueError: |
| g_size = n |
| groups = [] |
| cursor = 0 |
| while cursor < n: |
| end = min(n - 1, cursor + g_size - 1) |
| groups.append((cursor, end)) |
| cursor = end + 1 |
|
|
| num_groups = max(1, len(groups)) |
| per_group_budget = max(1, int(total_k) // num_groups) |
|
|
| |
| out_masks = [np.zeros(g.shape, dtype=np.uint8) for g in grids] |
| actual_total = 0 |
| for (s, e) in groups: |
| sub = grids[s:e + 1] |
| sub_masks, sub_actual = global_topk_masks(sub, per_group_budget) |
| for i, sm in enumerate(sub_masks): |
| out_masks[s + i] = sm |
| actual_total += sub_actual |
| return out_masks, actual_total, groups, mode |
|
|
|
|
| def faded_background(frame_bgr: np.ndarray, fade: float = 0.55) -> np.ndarray: |
| """Convert to gray-white wash: gray * (1-fade) + white * fade.""" |
| gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) |
| gray_bgr = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR).astype(np.float32) |
| white = np.full_like(gray_bgr, 255.0) |
| out = gray_bgr * (1.0 - fade) + white * fade |
| return out.astype(np.uint8) |
|
|
|
|
| def overlay_selection( |
| frame_bgr: np.ndarray, mask_grid: np.ndarray, patch: int, |
| outline: bool = True, fade: float = 0.55, |
| ) -> np.ndarray: |
| """Composite: kept patches keep color; dropped patches become gray-white. |
| Optionally draw a thin outline around kept patches.""" |
| h, w = frame_bgr.shape[:2] |
| hb, wb = mask_grid.shape |
| pix_mask = np.kron(mask_grid, np.ones((patch, patch), dtype=np.uint8)) |
| pix_mask = pix_mask[:h, :w] |
| bg = faded_background(frame_bgr, fade=float(fade)) |
| keep = pix_mask.astype(bool)[..., None] |
| out = np.where(keep, frame_bgr, bg) |
| if outline: |
| for i in range(hb): |
| for j in range(wb): |
| if mask_grid[i, j]: |
| y0, x0 = i * patch, j * patch |
| cv2.rectangle( |
| out, (x0, y0), (x0 + patch - 1, y0 + patch - 1), |
| (0, 220, 255), 1, |
| ) |
| return out |
|
|
|
|
| def _normalize_scores(grids: List[np.ndarray], pct: float = 99.0) -> np.ndarray: |
| """Stack into [N, hb, wb], shift by per-video min, divide by global pct. |
| Using the percentile (instead of max) suppresses outlier patches the same |
| way codec_tools does with bitcost_pct=99.""" |
| arr = np.stack(grids, axis=0).astype(np.float32) |
| arr = arr - arr.min() |
| cap = np.percentile(arr, pct) if arr.size else 1.0 |
| if cap <= 1e-8: |
| cap = float(arr.max() or 1.0) |
| arr = np.clip(arr / cap, 0.0, 1.0) |
| return arr |
|
|
|
|
| def overlay_heatmap( |
| frame_bgr: np.ndarray, score_grid: np.ndarray, patch: int, |
| alpha: float = 0.55, |
| ) -> np.ndarray: |
| """Render a continuous JET heatmap of patch scores blended over the frame. |
| Low score = blue, high score = red. `score_grid` is in [0, 1].""" |
| h, w = frame_bgr.shape[:2] |
| score = (np.clip(score_grid, 0.0, 1.0) * 255.0).astype(np.uint8) |
| pix = np.kron(score, np.ones((patch, patch), dtype=np.uint8)) |
| pix = pix[:h, :w] |
| heat = cv2.applyColorMap(pix, cv2.COLORMAP_JET) |
| out = cv2.addWeighted(frame_bgr, 1.0 - alpha, heat, alpha, 0.0) |
| return out |
|
|
|
|
| def overlay_sbs( |
| frame_bgr: np.ndarray, mask_grid: np.ndarray, score_grid: np.ndarray, |
| patch: int, alpha: float = 0.55, fade: float = 0.55, |
| ) -> np.ndarray: |
| """Side-by-side: [selection | heatmap] with a thin separator.""" |
| left = overlay_selection(frame_bgr, mask_grid, patch, outline=True, fade=fade) |
| right = overlay_heatmap(frame_bgr, score_grid, patch, alpha=alpha) |
| h, w = left.shape[:2] |
| sep = np.full((h, 4, 3), 30, dtype=np.uint8) |
| sbs = np.concatenate([left, sep, right], axis=1) |
| cv2.putText(sbs, "selection", (8, 22), cv2.FONT_HERSHEY_SIMPLEX, |
| 0.6, (255, 255, 255), 2, cv2.LINE_AA) |
| cv2.putText(sbs, "heatmap", (w + 12, 22), cv2.FONT_HERSHEY_SIMPLEX, |
| 0.6, (255, 255, 255), 2, cv2.LINE_AA) |
| return sbs |
|
|
|
|
| def write_mp4(frames: List[np.ndarray], path: str, fps: float) -> None: |
| """Write H.264 mp4 via imageio-ffmpeg's bundled ffmpeg (browser-friendly).""" |
| if not frames: |
| raise ValueError("no frames to write") |
| h, w = frames[0].shape[:2] |
| ff = imageio_ffmpeg.get_ffmpeg_exe() |
| cmd = [ |
| ff, "-y", "-loglevel", "error", |
| "-f", "rawvideo", "-vcodec", "rawvideo", |
| "-s", f"{w}x{h}", "-pix_fmt", "bgr24", |
| "-r", f"{fps:.3f}", "-i", "-", |
| "-an", "-vcodec", "libx264", "-pix_fmt", "yuv420p", |
| "-preset", "veryfast", "-crf", "23", |
| "-movflags", "+faststart", |
| path, |
| ] |
| proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE) |
| try: |
| for f in frames: |
| if f.shape[0] % 2 or f.shape[1] % 2: |
| f = f[: f.shape[0] // 2 * 2, : f.shape[1] // 2 * 2] |
| proc.stdin.write(np.ascontiguousarray(f).tobytes()) |
| proc.stdin.close() |
| err = proc.stderr.read().decode("utf-8", errors="ignore") |
| rc = proc.wait() |
| if rc != 0: |
| raise RuntimeError(f"ffmpeg failed (rc={rc}): {err}") |
| finally: |
| if proc.poll() is None: |
| proc.kill() |
|
|
|
|
| def _build_ippp_canvas( |
| frames: List[np.ndarray], masks: List[np.ndarray], |
| i_idx: int, p_range: range, patch: int, |
| ) -> Tuple[np.ndarray, int]: |
| """Build one IPPP canvas at the *same dimensions as the I-frame*. |
| |
| Codec convention: every frame in a group shares the picture size; a |
| P-frame only encodes the macroblocks that need to change. So: |
| 1. Initialise the canvas to the I-frame's full image. |
| 2. For each P-frame in time order, replace each saliency-selected |
| patch position with the P-frame's pixels at that position. |
| 3. The canvas now reads as 'what the encoder would have reconstructed |
| at the end of this group' β same shape as the I-frame, with the |
| high-energy regions updated by later P-frames. |
| |
| Returns (canvas, n_overlays) where n_overlays is the count of P-frame |
| patches that overwrote a position (a position may be hit multiple |
| times by different P-frames; we count each hit).""" |
| i_frame = frames[i_idx] |
| h, w = i_frame.shape[:2] |
| hb, wb = h // patch, w // patch |
| canvas_h, canvas_w = hb * patch, wb * patch |
| canvas = i_frame[:canvas_h, :canvas_w].copy() |
|
|
| n_overlays = 0 |
| for k in p_range: |
| if k >= len(frames): |
| break |
| f, m = frames[k], masks[k] |
| for i in range(m.shape[0]): |
| for j in range(m.shape[1]): |
| if m[i, j]: |
| canvas[ |
| i * patch:(i + 1) * patch, |
| j * patch:(j + 1) * patch, |
| ] = f[ |
| i * patch:(i + 1) * patch, |
| j * patch:(j + 1) * patch, |
| ] |
| n_overlays += 1 |
| return canvas, n_overlays |
|
|
|
|
| def _allocate_canvases_per_group( |
| target_canvases: int, num_groups: int, |
| ) -> List[int]: |
| """Split a total target canvas count across N groups as evenly as |
| possible; the first `remainder` groups get +1 each.""" |
| target = max(1, int(target_canvases)) |
| n = max(1, int(num_groups)) |
| base, rem = divmod(target, n) |
| out = [base + (1 if i < rem else 0) for i in range(n)] |
| |
| return [max(1, x) for x in out] |
|
|
|
|
| def pack_canvases_per_group( |
| frames: List[np.ndarray], |
| masks: List[np.ndarray], |
| groups: List[Tuple[int, int]], |
| patch: int, |
| target_canvases: int = 4, |
| ) -> Tuple[List[np.ndarray], List[Tuple[int, int, int]], int]: |
| """Pack exactly `target_canvases` IPPP canvases for the whole video, |
| distributing them across GOP groups as evenly as possible. |
| |
| Each group's frame range [s..e] is split into K consecutive sub-ranges |
| (K = canvases allocated to that group). Each sub-range [ss..ee] becomes |
| one canvas: |
| - frame ss is the I-frame: its whole image goes to the canvas top. |
| - frames ss+1..ee are P-frames: only saliency-selected patches go |
| below the I-frame, packed time-major in a wb-wide raster grid. |
| |
| Returns: |
| canvases β list of np.ndarray, length == target_canvases |
| (or fewer if some groups have only 1 frame). |
| sub_ranges β list of (group_idx, sub_start, sub_end) parallel to |
| canvases, for caption / debugging. |
| total_selected β I-frame patches (counted as full grid) + P-frame |
| selected patches across all canvases. |
| """ |
| canvases: List[np.ndarray] = [] |
| sub_ranges: List[Tuple[int, int, int]] = [] |
| total_selected = 0 |
| if not groups or not frames: |
| return [np.full((patch, patch, 3), 255, dtype=np.uint8)], [(0, 0, 0)], 0 |
|
|
| per_group_counts = _allocate_canvases_per_group(target_canvases, len(groups)) |
|
|
| for g_idx, (s, e) in enumerate(groups): |
| if s >= len(frames): |
| continue |
| group_len = e - s + 1 |
| k = max(1, min(per_group_counts[g_idx], group_len)) |
| |
| base, rem = divmod(group_len, k) |
| cursor = s |
| for sub_i in range(k): |
| sub_len = base + (1 if sub_i < rem else 0) |
| ss = cursor |
| ee = min(e, cursor + sub_len - 1) |
| cursor = ee + 1 |
| canvas, n_p_overlays = _build_ippp_canvas( |
| frames, masks, i_idx=ss, p_range=range(ss + 1, ee + 1), |
| patch=patch, |
| ) |
| canvases.append(canvas) |
| sub_ranges.append((g_idx, ss, ee)) |
| |
| |
| |
| |
| |
| hb, wb = frames[ss].shape[0] // patch, frames[ss].shape[1] // patch |
| total_selected += hb * wb + n_p_overlays |
|
|
| if not canvases: |
| canvases = [np.full((patch, patch, 3), 255, dtype=np.uint8)] |
| sub_ranges = [(0, 0, 0)] |
| return canvases, sub_ranges, total_selected |
|
|
|
|
| def make_charts( |
| grids: List[np.ndarray], |
| masks: List[np.ndarray], |
| frame_ids: List[int], |
| fps: float, |
| total_duration_sec: float, |
| total_patches_budget: int, |
| saliency_signal: str, |
| groups: List[Tuple[int, int]] = None, |
| gop_label: str = "global", |
| ): |
| """One overlaid step chart: cumulative patches selected vs time, for |
| the codec saliency curve and a uniform-sampling baseline at the same |
| total budget. |
| |
| X = time (s) |
| Y = cumulative count of selected patches |
| Both curves end near the budget (codec: == total selected; uniform: |
| n_uniform_frames Γ grid_size, β€ budget). The codec curve rises in |
| bursts where saliency is high; uniform rises in equal steps.""" |
| fig, ax = plt.subplots(figsize=(9.2, 3.6), constrained_layout=True) |
|
|
| fps_safe = float(fps) if fps and fps > 0 else 25.0 |
| if grids: |
| hb, wb = grids[0].shape |
| else: |
| hb = wb = 1 |
| grid_size = hb * wb |
| duration = float(total_duration_sec) if total_duration_sec and total_duration_sec > 0 else ( |
| (max(frame_ids) / fps_safe) if frame_ids else 1.0 |
| ) |
|
|
| |
| def _step(xs, cum): |
| """Return (xx, yy) for a left-continuous step plot through (xs, cum).""" |
| if not xs: |
| return [0.0, duration], [0.0, 0.0] |
| xx, yy = [0.0], [0.0] |
| prev = 0.0 |
| for x, c in zip(xs, cum): |
| xx.extend([x, x]); yy.extend([prev, c]) |
| prev = c |
| xx.append(duration); yy.append(prev) |
| return xx, yy |
|
|
| times = [fid / fps_safe for fid in frame_ids] |
| counts = [int(m.sum()) for m in masks] |
| codec_cum = list(np.cumsum(counts)) if counts else [] |
| codec_total = int(codec_cum[-1]) if codec_cum else 0 |
| xx_c, yy_c = _step(times, codec_cum) |
|
|
| |
| |
| |
| |
| n_uniform = len(times) if times else 1 |
| budget_int = int(total_patches_budget) |
| if n_uniform > 0 and budget_int > 0: |
| base = budget_int // n_uniform |
| rem = budget_int - base * n_uniform |
| uni_per_step = [base + (1 if i < rem else 0) for i in range(n_uniform)] |
| else: |
| uni_per_step = [] |
| uni_cum = list(np.cumsum(uni_per_step)) if uni_per_step else [] |
| uni_total = int(uni_cum[-1]) if uni_cum else 0 |
| uni_times = times if times else [duration * 0.5] |
| xx_u, yy_u = _step(uni_times, uni_cum) |
|
|
| |
| |
| if counts: |
| c_min, c_max = int(min(counts)), int(max(counts)) |
| c_avg = codec_total / max(1, len(counts)) |
| codec_lbl = ( |
| f"Codec Β· {saliency_signal} ({codec_total:,} total Β· " |
| f"per-frame min {c_min} Β· avg {c_avg:.1f} Β· max {c_max})" |
| ) |
| else: |
| codec_lbl = f"Codec Β· {saliency_signal} ({codec_total:,} patches)" |
| if uni_per_step: |
| u_per = uni_per_step[0] |
| u_extra = sum(1 for x in uni_per_step if x != u_per) |
| if u_extra == 0: |
| uni_lbl = f"Uniform baseline ({uni_total:,} total Β· {u_per}/frame)" |
| else: |
| uni_lbl = ( |
| f"Uniform baseline ({uni_total:,} total Β· " |
| f"~{budget_int // max(1, n_uniform)}/frame, Β±1)" |
| ) |
| else: |
| uni_lbl = f"Uniform baseline ({uni_total:,} patches)" |
|
|
| ax.fill_between(xx_c, yy_c, step=None, alpha=0.12, color="#4f46e5") |
| ax.plot(xx_c, yy_c, color="#4f46e5", linewidth=2.2, label=codec_lbl) |
| ax.fill_between(xx_u, yy_u, step=None, alpha=0.10, color="#06b6d4") |
| ax.plot( |
| xx_u, yy_u, color="#06b6d4", linewidth=2.2, linestyle="--", |
| label=uni_lbl, |
| ) |
|
|
| |
| budget = int(total_patches_budget) |
| ax.axhline(budget, color="#94a3b8", linestyle=":", linewidth=1.1, alpha=0.85) |
| ax.text( |
| duration * 0.995, budget * 1.015, |
| f"budget {budget:,}", color="#475569", |
| fontsize=8.5, va="bottom", ha="right", |
| ) |
|
|
| |
| if groups and len(groups) > 1 and times: |
| for (_, end_idx) in groups[:-1]: |
| if end_idx + 1 < len(times): |
| bx = (times[end_idx] + times[end_idx + 1]) / 2.0 |
| else: |
| bx = times[end_idx] |
| ax.axvline( |
| bx, color="#cbd5e1", linestyle=(0, (3, 3)), |
| alpha=0.55, linewidth=0.8, |
| ) |
|
|
| n_groups = len(groups) if groups else 1 |
| gop_str = gop_label if gop_label in ("global", "dynamic") else f"GOP={gop_label}" |
| ax.set_title( |
| f"Cumulative patches selected over time Β· {saliency_signal} Β· " |
| f"{gop_str} ({n_groups} groups)", |
| fontsize=11, color="#1e293b", |
| ) |
| ax.set_xlabel("time (s)", fontsize=9.5) |
| ax.set_ylabel("# patches selected (cumulative)", fontsize=9.5) |
| ax.set_xlim(-duration * 0.02, duration * 1.02) |
| ymax = max(budget, codec_total, uni_total) * 1.08 + 1 |
| ax.set_ylim(0, ymax) |
| ax.tick_params(axis="both", labelsize=8.5) |
| ax.grid(True, alpha=0.25, linestyle="--", axis="y") |
| ax.spines[["top", "right"]].set_visible(False) |
| ax.legend(loc="upper left", fontsize=9, frameon=False) |
|
|
| fig.patch.set_facecolor("white") |
| return fig |
|
|
|
|
| def process( |
| video_path, |
| sample_frames: int, |
| patch_size: int, |
| total_patches: int, |
| max_pixels: int, |
| viz_mode: str = "selection", |
| heatmap_alpha: float = 0.55, |
| start_sec: float = 0.0, |
| end_sec: float = 0.0, |
| saliency_signal: str = "gradient", |
| score_log_scale: bool = False, |
| bitcost_pct: float = 99.0, |
| fade_strength: float = 0.55, |
| gop: str = "global", |
| target_canvases: int = 4, |
| progress=gr.Progress(track_tqdm=False), |
| ): |
| if not video_path: |
| return None, [], "Please upload a video.", None |
|
|
| t0 = time.time() |
| progress(0.05, desc="Reading metadata") |
| meta = video_metadata(video_path) |
| total = meta.get("total_frames") or 0 |
| if total <= 0: |
| return None, [], json.dumps( |
| {"error": "Could not read frame count.", "metadata": meta}, |
| indent=2, ensure_ascii=False, |
| ), None |
|
|
| progress(0.10, desc="Sampling frames") |
| fps = float(meta.get("fps") or 0.0) |
| s_sec = max(0.0, float(start_sec or 0.0)) |
| e_sec = float(end_sec or 0.0) |
| if fps > 0 and (s_sec > 0 or e_sec > 0): |
| f_start = max(0, int(round(s_sec * fps))) |
| f_end = ( |
| min(total - 1, int(round(e_sec * fps)) - 1) |
| if e_sec > 0 else total - 1 |
| ) |
| if f_end <= f_start: |
| f_end = total - 1 |
| window_total = f_end - f_start + 1 |
| if int(sample_frames) >= window_total: |
| fids = list(range(f_start, f_end + 1)) |
| else: |
| fids = [ |
| int(round(x)) |
| for x in np.linspace(f_start, f_end, int(sample_frames)) |
| ] |
| else: |
| f_start, f_end = 0, total - 1 |
| fids = sample_frame_ids(total, int(sample_frames)) |
| raw = decode_frames(video_path, fids) |
| if not raw: |
| return None, [], json.dumps( |
| {"error": "Failed to decode frames.", "metadata": meta}, |
| indent=2, ensure_ascii=False, |
| ), None |
|
|
| progress(0.25, desc="smart_resize") |
| resized = [smart_resize(f, int(max_pixels), int(patch_size)) for f in raw] |
| th, tw = resized[0].shape[:2] |
| resized = [ |
| cv2.resize(f, (tw, th), interpolation=cv2.INTER_AREA) |
| if f.shape[:2] != (th, tw) else f |
| for f in resized |
| ] |
|
|
| progress(0.40, desc=f"Scoring patches ({saliency_signal})") |
| grids = compute_score_grids(resized, int(patch_size), saliency_signal) |
| if score_log_scale: |
| grids = [np.log1p(np.clip(g, 0.0, None)) for g in grids] |
| masks, actual_selected, groups, gop_resolved = grouped_topk_masks( |
| grids, int(total_patches), str(gop or "global"), |
| ) |
| norm_scores = _normalize_scores(grids, pct=float(bitcost_pct)) |
|
|
| mode = (viz_mode or "selection").lower() |
| if mode not in ("selection", "heatmap", "sbs"): |
| mode = "selection" |
| progress(0.60, desc=f"Rendering {mode} video") |
| if mode == "heatmap": |
| vis = [ |
| overlay_heatmap(f, s, int(patch_size), alpha=float(heatmap_alpha)) |
| for f, s in zip(resized, norm_scores) |
| ] |
| elif mode == "sbs": |
| vis = [ |
| overlay_sbs( |
| f, m, s, int(patch_size), |
| alpha=float(heatmap_alpha), fade=float(fade_strength), |
| ) |
| for f, m, s in zip(resized, masks, norm_scores) |
| ] |
| else: |
| vis = [ |
| overlay_selection(f, m, int(patch_size), fade=float(fade_strength)) |
| for f, m in zip(resized, masks) |
| ] |
|
|
| out_dir = tempfile.mkdtemp(prefix="codec_view_") |
| vis_path = os.path.join(out_dir, f"{mode}_vis.mp4") |
| vis_fps = max(2.0, min(8.0, (meta.get("fps") or 25.0) / 4.0)) |
| write_mp4(vis, vis_path, vis_fps) |
|
|
| progress(0.85, desc="Packing canvases (IPPP)") |
| canvases, sub_ranges, n_selected = pack_canvases_per_group( |
| resized, masks, groups, int(patch_size), |
| target_canvases=int(target_canvases), |
| ) |
| canvas_items: List[Tuple[str, str]] = [] |
| for idx, canv in enumerate(canvases): |
| cp = os.path.join(out_dir, f"canvas_{idx:03d}.png") |
| cv2.imwrite(cp, canv) |
| g_idx, ss, ee = sub_ranges[idx] if idx < len(sub_ranges) else (0, idx, idx) |
| n_p = max(0, ee - ss) |
| caption = ( |
| f"Canvas {idx + 1}/{len(canvases)} Β· group {g_idx + 1} Β· " |
| f"I@#{ss} + {n_p} P-frame{'s' if n_p != 1 else ''}" |
| ) |
| canvas_items.append((cp, caption)) |
|
|
| hb, wb = grids[0].shape |
| grid_size = int(grids[0].shape[0] * grids[0].shape[1]) if grids else 0 |
| |
| |
| n_uniform = max(1, len(fids)) |
| uniform_per_frame = ( |
| int(int(total_patches)) // n_uniform if n_uniform > 0 else 0 |
| ) |
| info = { |
| "input": meta, |
| "params": { |
| "sample_frames": int(sample_frames), |
| "patch_size": int(patch_size), |
| "total_patches_budget": int(total_patches), |
| "max_pixels": int(max_pixels), |
| "start_sec": float(s_sec), |
| "end_sec": float(e_sec) if e_sec > 0 else None, |
| "saliency_signal": saliency_signal, |
| "score_log_scale": bool(score_log_scale), |
| "bitcost_pct": float(bitcost_pct), |
| "fade_strength": float(fade_strength), |
| "gop": gop_resolved, |
| "target_canvases": int(target_canvases), |
| }, |
| "gop_groups": [ |
| { |
| "start_frame_idx": int(s), |
| "end_frame_idx": int(e), |
| "n_frames": int(e - s + 1), |
| "selected": int(sum(int(m.sum()) for m in masks[s:e + 1])), |
| } |
| for (s, e) in groups |
| ], |
| "frame_window": { |
| "first_decoded": int(f_start), |
| "last_decoded": int(f_end), |
| "actual_frame_ids": [int(x) for x in fids], |
| }, |
| "codec_per_frame_patches": [int(m.sum()) for m in masks], |
| "uniform_baseline": { |
| "frames": int(n_uniform), |
| "patches_per_frame": int(uniform_per_frame), |
| "total_patches": int(uniform_per_frame * n_uniform), |
| "explanation": ( |
| "Same N frames as codec, evenly spaced in time. The patch " |
| "budget is split equally per frame ({budget} Γ· {n} = " |
| "{per}); the codec, by contrast, concentrates the same " |
| "budget on high-saliency patches across those frames." |
| ).format( |
| budget=int(total_patches), |
| n=int(n_uniform), |
| per=int(uniform_per_frame), |
| ), |
| }, |
| "resized_frame_size": f"{tw}x{th}", |
| "patch_grid_per_frame": f"{hb}x{wb} = {hb * wb} patches", |
| "actual_selected_total": int(actual_selected), |
| "total_selected_patches_incl_i_frames": int(n_selected), |
| "canvases": [ |
| { |
| "index": i, |
| "size": f"{canvases[i].shape[1]}x{canvases[i].shape[0]}", |
| "group": int(sub_ranges[i][0]) if i < len(sub_ranges) else None, |
| "sub_range": list(sub_ranges[i][1:3]) if i < len(sub_ranges) else None, |
| "structure": "IPPP β first frame full (I), rest contribute " |
| "only their selected patches (P).", |
| } |
| for i in range(len(canvases)) |
| ], |
| "n_canvases": int(len(canvases)), |
| "vis_video_fps": round(vis_fps, 2), |
| "viz_mode": mode, |
| "heatmap_alpha": float(heatmap_alpha) if mode != "selection" else None, |
| "score_normalization": f"shift-min, /p{bitcost_pct:.1f}, clip" |
| + (" (log1p applied)" if score_log_scale else ""), |
| "elapsed_sec": round(time.time() - t0, 2), |
| } |
| progress(0.95, desc="Building charts") |
| duration_sec = (total / fps) if fps > 0 else 0.0 |
| chart_fig = make_charts( |
| grids, masks, fids, fps, duration_sec, |
| int(total_patches), saliency_signal, |
| groups=groups, gop_label=gop_resolved, |
| ) |
|
|
| progress(1.0, desc="Done") |
| return ( |
| vis_path, canvas_items, |
| json.dumps(info, indent=2, ensure_ascii=False), |
| chart_fig, |
| ) |
|
|
|
|
| CUSTOM_CSS = """ |
| :root, .gradio-container, .gradio-container.dark { |
| --ovc-grad: linear-gradient(135deg, #4f46e5 0%, #2563eb 50%, #06b6d4 100%); |
| --ovc-grad-soft: linear-gradient(135deg, rgba(79,70,229,0.10), rgba(6,182,212,0.10)); |
| --ovc-ring: rgba(99,102,241,0.32); |
| --ovc-ring-strong: rgba(99,102,241,0.55); |
| } |
| .gradio-container { max-width: 1320px !important; margin: 0 auto !important; } |
| @keyframes ovc-shift { |
| 0% { background-position: 0% 50%; } |
| 50% { background-position: 100% 50%; } |
| 100% { background-position: 0% 50%; } |
| } |
| @keyframes ovc-pulse { |
| 0%, 100% { box-shadow: 0 6px 18px rgba(37, 99, 235, 0.32); } |
| 50% { box-shadow: 0 8px 26px rgba(37, 99, 235, 0.50); } |
| } |
| @keyframes ovc-fade-in { |
| from { opacity: 0; transform: translateY(4px); } |
| to { opacity: 1; transform: translateY(0); } |
| } |
| |
| /* Hero */ |
| #ovc-hero { |
| text-align: center; |
| padding: 44px 16px 22px; |
| border-radius: 22px; |
| background: |
| radial-gradient(120% 80% at 50% -10%, rgba(79,70,229,0.20), transparent 60%), |
| linear-gradient(180deg, rgba(79,70,229,0.06), rgba(6,182,212,0.03)), |
| repeating-linear-gradient(0deg, rgba(99,102,241,0.05) 0 1px, transparent 1px 28px), |
| repeating-linear-gradient(90deg, rgba(99,102,241,0.05) 0 1px, transparent 1px 28px); |
| border: 1px solid rgba(99,102,241,0.22); |
| margin-bottom: 18px; |
| position: relative; |
| overflow: hidden; |
| } |
| #ovc-hero::after { |
| content: ""; |
| position: absolute; inset: auto -20% -40% -20%; |
| height: 60%; |
| background: radial-gradient(60% 80% at 50% 0%, rgba(6,182,212,0.22), transparent 70%); |
| pointer-events: none; |
| } |
| #ovc-hero h1 { |
| font-size: 2.7rem; |
| font-weight: 800; |
| background: var(--ovc-grad); |
| background-size: 200% 200%; |
| animation: ovc-shift 9s ease-in-out infinite; |
| -webkit-background-clip: text; |
| background-clip: text; |
| color: transparent; |
| margin: 0 0 6px; |
| letter-spacing: -0.028em; |
| line-height: 1.04; |
| } |
| #ovc-hero p.tagline { |
| font-size: 1.05rem; |
| color: var(--body-text-color-subdued); |
| margin: 0 auto 16px; |
| max-width: 760px; |
| line-height: 1.6; |
| } |
| .ovc-links { |
| display: flex; flex-wrap: wrap; gap: 10px; |
| justify-content: center; margin: 14px auto 6px; |
| position: relative; z-index: 1; |
| } |
| .ovc-links a { |
| text-decoration: none; |
| font-weight: 600; |
| font-size: 0.9rem; |
| padding: 7px 14px; |
| border-radius: 999px; |
| background: var(--background-fill-primary, #fff); |
| border: 1px solid rgba(99,102,241,0.32); |
| color: #4338ca; |
| transition: transform 0.12s ease, box-shadow 0.18s ease, |
| background 0.18s ease, color 0.18s ease, border-color 0.18s ease; |
| display: inline-flex; align-items: center; |
| box-shadow: 0 1px 2px rgba(15,23,42,0.04); |
| } |
| .ovc-links a:hover { |
| background: var(--ovc-grad); |
| color: #fff; |
| border-color: transparent; |
| transform: translateY(-1px); |
| box-shadow: 0 6px 16px rgba(79,70,229,0.32); |
| } |
| .gradio-container.dark .ovc-links a { |
| background: rgba(30,41,59,0.7); |
| color: #c7d2fe; |
| border-color: rgba(99,102,241,0.4); |
| } |
| |
| /* Cards */ |
| .ovc-card { |
| border-radius: 16px !important; |
| padding: 16px 18px !important; |
| border: 1px solid rgba(148,163,184,0.26) !important; |
| background: var(--background-fill-primary) !important; |
| box-shadow: 0 1px 3px rgba(15,23,42,0.04); |
| transition: box-shadow 0.18s ease, border-color 0.18s ease, transform 0.18s ease; |
| animation: ovc-fade-in 0.32s ease-out; |
| } |
| .ovc-card:hover { |
| border-color: rgba(99,102,241,0.32) !important; |
| box-shadow: 0 6px 22px rgba(15,23,42,0.07); |
| } |
| /* Primary outputs: subtle accent ring + lift */ |
| .ovc-card-primary { |
| border: 1px solid var(--ovc-ring) !important; |
| background: |
| linear-gradient(180deg, rgba(79,70,229,0.025), rgba(6,182,212,0.012)), |
| var(--background-fill-primary) !important; |
| box-shadow: 0 4px 18px rgba(79,70,229,0.08) !important; |
| } |
| .ovc-card-primary:hover { |
| border-color: var(--ovc-ring-strong) !important; |
| box-shadow: 0 10px 28px rgba(79,70,229,0.14) !important; |
| } |
| .ovc-card h3 { |
| display: inline-flex; |
| align-items: center; |
| gap: 8px; |
| font-size: 0.74rem !important; |
| font-weight: 700 !important; |
| text-transform: uppercase; |
| letter-spacing: 0.10em; |
| color: #4f46e5 !important; |
| background: rgba(79,70,229,0.08); |
| padding: 4px 10px !important; |
| border-radius: 999px; |
| margin: 0 0 12px !important; |
| } |
| .ovc-card h3::before { |
| content: ""; |
| display: inline-block; |
| width: 6px; height: 6px; border-radius: 50%; |
| background: var(--ovc-grad); |
| transform: translateY(0); |
| } |
| |
| /* Run button */ |
| #ovc-run button { |
| width: 100%; |
| height: 54px !important; |
| font-size: 1.06rem !important; |
| font-weight: 700 !important; |
| letter-spacing: 0.01em; |
| background: var(--ovc-grad) !important; |
| background-size: 200% 200% !important; |
| animation: ovc-shift 6s ease-in-out infinite, ovc-pulse 2.6s ease-in-out infinite; |
| border: none !important; |
| color: #fff !important; |
| border-radius: 14px !important; |
| transition: transform 0.06s ease; |
| } |
| #ovc-run button:hover { |
| transform: translateY(-1px); |
| animation-play-state: paused; |
| } |
| #ovc-run button:active { transform: translateY(0); } |
| |
| /* Preset buttons */ |
| .ovc-preset button { |
| background: var(--ovc-grad-soft) !important; |
| color: #4338ca !important; |
| border: 1px solid rgba(79,70,229,0.25) !important; |
| border-radius: 10px !important; |
| font-weight: 600 !important; |
| transition: all 0.15s ease; |
| } |
| .ovc-preset button:hover { |
| background: var(--ovc-grad) !important; |
| color: #fff !important; |
| border-color: transparent !important; |
| } |
| |
| /* Footer */ |
| #ovc-footer { |
| text-align: center; |
| color: var(--body-text-color-subdued); |
| font-size: 0.80rem; |
| padding: 22px 8px 10px; |
| margin-top: 14px; |
| border-top: 1px solid rgba(148,163,184,0.18); |
| } |
| #ovc-footer code { |
| background: rgba(79,70,229,0.08); |
| padding: 1px 6px; |
| border-radius: 4px; |
| } |
| |
| /* Tighter spacing for sliders inside cards */ |
| .ovc-card .gradio-slider { margin-bottom: 4px !important; } |
| |
| /* Tame Gradio's dark default placeholders inside our cards: blanket-override |
| any background on the inner wrappers, then paint a brand-tinted gradient on |
| the canonical containers. This lights up the empty Video/Image/Plot zones |
| so they no longer look like black holes. */ |
| .ovc-card .video-container, |
| .ovc-card .image-container, |
| .ovc-card .image-frame, |
| .ovc-card .preview, |
| .ovc-card .plot-container, |
| .ovc-card .empty, |
| .ovc-card video, |
| .ovc-card [data-testid="video"], |
| .ovc-card [data-testid="image"], |
| .ovc-card .icon-button, |
| .ovc-card .options, |
| .ovc-card .source-selection, |
| .ovc-card .upload-container { |
| background: transparent !important; |
| background-color: transparent !important; |
| } |
| .ovc-card .container, |
| .ovc-card .wrap, |
| .ovc-card .video-container, |
| .ovc-card .image-container, |
| .ovc-card .plot-container { |
| border-radius: 12px !important; |
| } |
| .ovc-card .video-container, |
| .ovc-card .image-container, |
| .ovc-card .plot-container, |
| .ovc-card-primary .video-container, |
| .ovc-card-primary .image-container, |
| .ovc-card-primary .plot-container { |
| background: linear-gradient(180deg, rgba(99,102,241,0.05), rgba(6,182,212,0.02)) !important; |
| border: 1px dashed rgba(148,163,184,0.32) !important; |
| } |
| .ovc-card .gradio-video, .ovc-card .gradio-image, .ovc-card .gradio-plot { |
| border-color: rgba(148,163,184,0.22) !important; |
| background: transparent !important; |
| } |
| /* Empty placeholder text inside Gradio components */ |
| .ovc-card .empty, .ovc-card .empty p, .ovc-card .empty span { |
| color: #94a3b8 !important; |
| } |
| |
| /* Stats tile grid (rendered into a gr.HTML by render_stats_html) */ |
| .ovc-stats { |
| display: grid; |
| grid-template-columns: repeat(auto-fit, minmax(150px, 1fr)); |
| gap: 10px; |
| } |
| .ovc-stat { |
| padding: 12px 14px; |
| border-radius: 14px; |
| background: linear-gradient(135deg, rgba(79,70,229,0.07), rgba(6,182,212,0.04)); |
| border: 1px solid rgba(99,102,241,0.18); |
| transition: transform 0.18s ease, box-shadow 0.18s ease; |
| } |
| .ovc-stat:hover { |
| transform: translateY(-1px); |
| box-shadow: 0 6px 18px rgba(79,70,229,0.10); |
| } |
| .ovc-stat .value { |
| font-size: 1.55rem; font-weight: 800; |
| background: var(--ovc-grad); |
| -webkit-background-clip: text; background-clip: text; color: transparent; |
| letter-spacing: -0.02em; |
| line-height: 1.1; |
| word-break: break-word; |
| } |
| .ovc-stat .label { |
| font-size: 0.74rem; color: #64748b; |
| text-transform: uppercase; letter-spacing: 0.06em; |
| margin-top: 4px; |
| font-weight: 600; |
| } |
| |
| /* βββ Mobile / narrow viewport adjustments βββββββββββββββββββββββββββ */ |
| @media (max-width: 768px) { |
| .gradio-container { padding: 6px !important; } |
| |
| /* Force the controls/outputs row to stack vertically on phones */ |
| .gradio-container .ovc-main { |
| flex-direction: column !important; |
| gap: 12px !important; |
| } |
| .gradio-container .ovc-main > div { |
| width: 100% !important; |
| min-width: 0 !important; |
| max-width: 100% !important; |
| flex: 1 1 100% !important; |
| } |
| |
| /* Hero scales down */ |
| #ovc-hero { padding: 28px 14px 16px; border-radius: 16px; margin-bottom: 12px; } |
| #ovc-hero h1 { font-size: 2.05rem; letter-spacing: -0.02em; } |
| #ovc-hero p.tagline { font-size: 0.96rem; line-height: 1.5; margin-bottom: 12px; } |
| .ovc-links { gap: 6px; margin-top: 10px; } |
| .ovc-links a { font-size: 0.78rem; padding: 5px 10px; } |
| /* Cards tighter */ |
| .ovc-card { padding: 12px 14px !important; border-radius: 14px !important; } |
| .ovc-card h3 { font-size: 0.70rem !important; margin-bottom: 8px !important; } |
| |
| /* Run button */ |
| #ovc-run button { height: 48px !important; font-size: 0.98rem !important; } |
| |
| /* Stats tile sizing */ |
| .ovc-stats { grid-template-columns: repeat(auto-fit, minmax(115px, 1fr)); gap: 8px; } |
| .ovc-stat { padding: 10px 12px; } |
| .ovc-stat .value { font-size: 1.25rem; } |
| .ovc-stat .label { font-size: 0.68rem; } |
| |
| /* Outputs: shorter video so it does not dominate the screen */ |
| .ovc-card video { max-height: 280px !important; } |
| } |
| |
| @media (max-width: 480px) { |
| #ovc-hero { padding: 22px 12px 14px; } |
| #ovc-hero h1 { font-size: 1.7rem; } |
| #ovc-hero p.tagline { font-size: 0.9rem; } |
| /* Put each link on a row of two (browsers will pack 2 per row at this size) */ |
| .ovc-links a { font-size: 0.74rem; padding: 4px 9px; } |
| #ovc-run button { height: 46px !important; font-size: 0.94rem !important; } |
| } |
| """ |
|
|
| THEME = gr.themes.Soft( |
| primary_hue="indigo", |
| secondary_hue="blue", |
| neutral_hue="slate", |
| font=[gr.themes.GoogleFont("Inter"), "system-ui", "sans-serif"], |
| ).set( |
| body_background_fill="*neutral_50", |
| block_radius="14px", |
| button_primary_background_fill="*primary_500", |
| button_primary_background_fill_hover="*primary_600", |
| ) |
|
|
| HERO_HTML = """ |
| <div id="ovc-hero"> |
| <h1>OneVision Encoder</h1> |
| <p class="tagline"> |
| Codec-style patch saliency for video understanding — see which |
| patches the encoder picks from your video and pack them into the |
| canvas LLaVA-OneVision consumes. |
| </p> |
| <div class="ovc-links"> |
| <a href="https://www.lmms-lab.com/onevision-encoder/index.html" target="_blank" rel="noopener">π Homepage</a> |
| <a href="https://huggingface.co/collections/lmms-lab-encoder/onevision-encoder" target="_blank" rel="noopener">π€ Models</a> |
| <a href="https://arxiv.org/abs/2602.08683" target="_blank" rel="noopener">π Tech Report</a> |
| <a href="docs/model_card.md" target="_blank" rel="noopener">π Model Card</a> |
| <a href="docs/data_card.md" target="_blank" rel="noopener">π Data Card</a> |
| </div> |
| </div> |
| """ |
|
|
| try: |
| _GR_MAJOR = int(gr.__version__.split(".")[0]) |
| except Exception: |
| _GR_MAJOR = 4 |
| _BLOCK_KW: dict = {"title": "OneVision Encoder"} |
| _LAUNCH_KW: dict = {} |
| if _GR_MAJOR >= 6: |
| |
| _LAUNCH_KW["theme"] = THEME |
| _LAUNCH_KW["css"] = CUSTOM_CSS |
| else: |
| _BLOCK_KW["theme"] = THEME |
| _BLOCK_KW["css"] = CUSTOM_CSS |
|
|
|
|
| VIZ_CHOICES = [ |
| ("Selection β kept patches in color, others fade to gray-white", "selection"), |
| ("Heatmap β full-frame JET overlay (blue=low, red=high)", "heatmap"), |
| ("Both", "sbs"), |
| ] |
| SIGNAL_CHOICES = [ |
| ("Gradient β intra-frame Sobel (sharp edges, textures, text)", "gradient"), |
| ("Frame diff β inter-frame motion (movers, action)", "frame_diff"), |
| ("Combined β 0.5Β·gradient + 0.5Β·frame_diff (general purpose)", "combined"), |
| ] |
|
|
|
|
|
|
|
|
| with gr.Blocks(**_BLOCK_KW) as demo: |
| gr.HTML(HERO_HTML) |
|
|
| with gr.Row(equal_height=False, elem_classes="ovc-main"): |
| |
| with gr.Column(scale=4, min_width=320): |
| with gr.Group(elem_classes="ovc-card"): |
| gr.Markdown("### Input") |
| video_in = gr.Video(label="Video", sources=["upload"], height=240) |
| with gr.Row(elem_classes="ovc-preset"): |
| btn_demo = gr.Button( |
| "Load demo video", size="sm", |
| visible=os.path.exists(DEMO_VIDEO_PATH), |
| ) |
|
|
| with gr.Group(elem_classes="ovc-card"): |
| gr.Markdown("### Pipeline") |
| viz_mode = gr.Radio( |
| VIZ_CHOICES, value="selection", |
| label="Visualization mode", |
| ) |
| sample_frames = gr.Slider( |
| 4, 64, value=16, step=1, label="Sampled frames", |
| ) |
| top_k = gr.Slider( |
| 64, 8192, value=1024, step=32, |
| label="Total patches budget (whole video)", |
| info="The single budget shared across the whole video. " |
| "The codec saliency picks these patches GLOBALLY β " |
| "high-energy frames may contribute many, low-energy " |
| "frames may contribute zero.", |
| ) |
| patch_size = gr.Radio( |
| PATCH_CHOICES, value=14, label="Patch size (px)", |
| ) |
| gop = gr.Radio( |
| [ |
| ("GOP = 4 β fixed 4-frame groups", "4"), |
| ("GOP = 8 β fixed 8-frame groups", "8"), |
| ("GOP = 16 β fixed 16-frame groups", "16"), |
| ("Dynamic β adaptive groups by saliency energy", "dynamic"), |
| ], |
| value="8", |
| label="GOP (group of pictures)", |
| info="Splits sampled frames into groups; the patch budget " |
| "is allocated equally across groups, top-K within " |
| "each. Dynamic mode mirrors codec_tools' readiness " |
| "grouping (equal-energy groups).", |
| ) |
| target_canvases = gr.Slider( |
| 1, 16, value=4, step=1, |
| label="Target canvases (total per video)", |
| info="Fixed canvas count regardless of GOP. The budget is " |
| "split across groups; each group is further sliced " |
| "into sub-ranges of consecutive frames, one IPPP " |
| "canvas per sub-range.", |
| ) |
|
|
| with gr.Accordion("Time window", open=False): |
| with gr.Row(): |
| start_sec = gr.Number(value=0.0, precision=2, label="Start (s)") |
| end_sec = gr.Number(value=0.0, precision=2, label="End (s)") |
| gr.Markdown( |
| "<small>Set both to 0 to use the full video.</small>", |
| ) |
|
|
| with gr.Accordion("Saliency", open=False): |
| saliency_signal = gr.Radio( |
| SIGNAL_CHOICES, value="gradient", |
| label="Scoring signal", |
| ) |
| score_log_scale = gr.Checkbox( |
| value=False, |
| label="Apply log1p to scores", |
| info="Compresses dynamic range β brings up mid-energy patches.", |
| ) |
| bitcost_pct = gr.Slider( |
| 80.0, 99.9, value=99.0, step=0.1, |
| label="Heatmap normalization percentile", |
| info="Higher = harder to saturate red; lower = more vivid.", |
| ) |
|
|
| with gr.Accordion("Visual style", open=False): |
| heatmap_alpha = gr.Slider( |
| 0.1, 0.9, value=0.55, step=0.05, |
| label="Heatmap blend Ξ±", |
| ) |
| fade_strength = gr.Slider( |
| 0.0, 0.9, value=0.55, step=0.05, |
| label="Selection fade strength", |
| ) |
| max_pixels = gr.Slider( |
| 40000, 400000, value=150000, step=10000, |
| label="Max pixels per frame", |
| ) |
|
|
| with gr.Row(elem_id="ovc-run"): |
| run_btn = gr.Button("Run pipeline", variant="primary") |
|
|
| |
| with gr.Column(scale=6, min_width=420): |
| with gr.Group(elem_classes="ovc-card ovc-card-primary"): |
| gr.Markdown("### Patch selection visualization") |
| vis_out = gr.Video( |
| label="", show_label=False, autoplay=True, height=420, |
| ) |
|
|
| with gr.Group(elem_classes="ovc-card ovc-card-primary"): |
| gr.Markdown("### Cumulative patches over time") |
| gr.Markdown( |
| "<small>Same number of sampled frames and the same total " |
| "patch budget for both methods. <b>Indigo</b>: codec " |
| "saliency β rises in bursts where the frames carry more " |
| "information. <b>Cyan (dashed)</b>: uniform baseline β " |
| "the same budget split equally per frame, so each step " |
| "has the same height. Both curves end exactly at the " |
| "dotted <b>budget</b> reference line.</small>" |
| ) |
| chart_out = gr.Plot(label="", show_label=False) |
|
|
| with gr.Row(): |
| with gr.Column(scale=1): |
| with gr.Group(elem_classes="ovc-card"): |
| gr.Markdown("### Packed canvases (one per GOP group)") |
| gr.Markdown( |
| "<small>Each canvas is one GOP group rendered in " |
| "<b>IPPP order</b>: the group's first frame is the " |
| "<b>I-frame</b> kept whole (top), followed by the " |
| "<b>P-frame</b> selected patches packed below.</small>" |
| ) |
| canvas_out = gr.Gallery( |
| label="", show_label=False, |
| columns=2, rows=2, height=380, |
| object_fit="contain", |
| preview=True, |
| ) |
| with gr.Column(scale=1): |
| with gr.Group(elem_classes="ovc-card"): |
| gr.Markdown("### Raw JSON") |
| gr.Markdown( |
| "<small>Full reproducible record of this run " |
| "(params, frame ids, group spans). Collapsed by " |
| "default β click to expand.</small>" |
| ) |
| with gr.Accordion("Show full JSON", open=False): |
| info_out = gr.Code( |
| label="", language="json", lines=18, |
| ) |
|
|
| gr.HTML( |
| '<div id="ovc-footer">' |
| '<b>OneVision Encoder</b> Β· codec-style patch saliency demo Β· ' |
| 'Sobel + frame-diff stand in for the ffmpeg bitcost patch Β· ' |
| 'global top-K selection across all sampled frames.' |
| '</div>' |
| ) |
|
|
| run_btn.click( |
| process, |
| inputs=[ |
| video_in, sample_frames, patch_size, top_k, max_pixels, |
| viz_mode, heatmap_alpha, |
| start_sec, end_sec, |
| saliency_signal, score_log_scale, bitcost_pct, fade_strength, |
| gop, target_canvases, |
| ], |
| outputs=[vis_out, canvas_out, info_out, chart_out], |
| ) |
|
|
| btn_demo.click( |
| lambda: DEMO_PRESET, |
| inputs=None, |
| outputs=[ |
| video_in, sample_frames, patch_size, top_k, max_pixels, |
| viz_mode, heatmap_alpha, start_sec, end_sec, |
| saliency_signal, score_log_scale, bitcost_pct, fade_strength, |
| gop, target_canvases, |
| ], |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| demo.launch( |
| server_name="0.0.0.0", |
| server_port=int(os.environ.get("PORT", 7860)), |
| **_LAUNCH_KW, |
| ) |
|
|