yiyexy commited on
Commit
e5fed46
·
verified ·
1 Parent(s): 8ce9c58

Add codec video backend & docs (codec_video_processing_llava_onevision2.py)

Browse files
codec_video_processing_llava_onevision2.py ADDED
@@ -0,0 +1,391 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Codec-based video preprocessing for LlavaOnevision2 (trust_remote_code).
2
+
3
+ This module is the codec analogue of ``video_processing_llava_onevision2.py``.
4
+ It is invoked when a user calls::
5
+
6
+ processor(messages=..., video_backend="codec", max_pixels=...)
7
+
8
+ and is responsible for:
9
+
10
+ - Decoding the video and assembling canvas images via ``cv-preinfer``
11
+ (PyPI: ``codec-video-prep``, requires ``ffmpeg`` on PATH).
12
+ - Running the bundled ``Qwen2VLImageProcessor`` on those canvases with a
13
+ pixel budget that is *aligned* to the canvas dimensions (so the
14
+ smart_resize step never desynchronises ``image_grid_thw`` from the
15
+ codec-emitted ``src_patch_position`` array).
16
+ - Producing the per-patch ``patch_positions`` table that
17
+ ``modeling_llava_onevision2.py`` reads for the 2D-MRoPE block layout.
18
+
19
+ The result is a ``BatchFeature``-shaped dict containing the same keys that
20
+ the frame-sampling video path produces (``pixel_values`` /
21
+ ``image_grid_thw`` / ``patch_positions``), so downstream
22
+ ``modeling_llava_onevision2.py`` consumes it without changes.
23
+ """
24
+
25
+ from __future__ import annotations
26
+
27
+ import hashlib
28
+ import json
29
+ import os
30
+ import shutil
31
+ import subprocess
32
+ import tempfile
33
+ import warnings
34
+ from dataclasses import dataclass, field
35
+ from pathlib import Path
36
+ from typing import Optional
37
+
38
+ try:
39
+ import fcntl
40
+ except ImportError:
41
+ fcntl = None # type: ignore
42
+
43
+ import numpy as np
44
+ import torch
45
+ from PIL import Image
46
+
47
+
48
+ VISION_START = "<|vision_start|>"
49
+ VISION_END = "<|vision_end|>"
50
+ IMAGE_PAD = "<|image_pad|>"
51
+
52
+
53
+ # ----------------------------------------------------------------- config
54
+
55
+ @dataclass
56
+ class CodecConfig:
57
+ """All knobs for the codec preprocessing pipeline.
58
+
59
+ ``max_pixels`` is shared with the image_processor / video_processor pixel
60
+ budget. The processor sets it from the user's ``max_pixels=`` kwarg, so
61
+ canvas size and HF smart_resize budget stay consistent.
62
+ """
63
+
64
+ target_canvas: int = 32
65
+ group_size: int = 32
66
+ images_per_group: int = 4
67
+ patch: int = 14
68
+ max_pixels: int = 150000
69
+ min_group_frames: int = 8
70
+ max_group_frames: int = 64
71
+ spatial_mask_mode: str = "off"
72
+ cache_root: Path = field(default_factory=lambda: Path(
73
+ os.getenv(
74
+ "ONLINE_CODEC_CACHE_DIR",
75
+ os.path.join(
76
+ os.getenv("HF_HOME", os.path.expanduser("~/.cache/huggingface")),
77
+ "online_codec_v2",
78
+ ),
79
+ )
80
+ ))
81
+ timeout_seconds: int = int(os.getenv("ONLINE_CODEC_TIMEOUT", "7200"))
82
+
83
+ def validate(self) -> None:
84
+ if self.target_canvas <= 0:
85
+ raise ValueError("CodecConfig.target_canvas must be > 0")
86
+ if self.target_canvas % self.images_per_group != 0:
87
+ raise ValueError(
88
+ "CodecConfig.target_canvas must be divisible by images_per_group"
89
+ )
90
+ if self.group_size % self.images_per_group != 0:
91
+ raise ValueError(
92
+ "CodecConfig.group_size must be divisible by images_per_group"
93
+ )
94
+
95
+ def num_sampled_frames(self) -> int:
96
+ return (self.target_canvas // self.images_per_group) * self.group_size
97
+
98
+
99
+ # ---------------------------------------------------------- text/position
100
+
101
+ def _format_timestamp(seconds: float, decimals: int) -> str:
102
+ return f"<{seconds:.{decimals}f} seconds>"
103
+
104
+
105
+ def convert_positions_to_block_layout(
106
+ positions: torch.Tensor, t: int, h: int, w: int, spatial_merge_size: int = 2,
107
+ ) -> torch.Tensor:
108
+ """Reorder a (T*H*W, 3) patch position table into 2D-MRoPE block layout."""
109
+ sms = int(spatial_merge_size)
110
+ if sms == 1:
111
+ return positions
112
+ total = int(t) * int(h) * int(w)
113
+ indices = torch.arange(total, device=positions.device).view(t, h, w)
114
+ h_m, w_m = int(h) // sms, int(w) // sms
115
+ indices = (
116
+ indices.view(t, h_m, sms, w_m, sms)
117
+ .permute(0, 1, 3, 2, 4).contiguous().view(total)
118
+ )
119
+ return positions[indices]
120
+
121
+
122
+ def codec_positions_for_processor(
123
+ src_positions: np.ndarray, image_grid_thw: torch.Tensor, device: torch.device,
124
+ ) -> torch.Tensor:
125
+ positions = torch.from_numpy(src_positions).long().to(device)
126
+ expected_total = int(image_grid_thw.prod(dim=1).sum().item())
127
+ if expected_total != positions.shape[0]:
128
+ raise ValueError(
129
+ "codec patch position length mismatch: "
130
+ f"thw_total={expected_total}, positions={positions.shape[0]}"
131
+ )
132
+ chunks, offset = [], 0
133
+ for row in image_grid_thw:
134
+ t, h, w = int(row[0]), int(row[1]), int(row[2])
135
+ n = t * h * w
136
+ chunks.append(convert_positions_to_block_layout(positions[offset: offset + n], t, h, w))
137
+ offset += n
138
+ return torch.cat(chunks, dim=0)
139
+
140
+
141
+ def _timestamp_runs(
142
+ patch_positions: torch.Tensor, fps: float, decimals: int, spatial_merge_size: int = 2,
143
+ ) -> list[tuple[str, int]]:
144
+ t_values = patch_positions[:, 0]
145
+ unique_t, counts = torch.unique_consecutive(t_values, return_counts=True)
146
+ merge_factor = int(spatial_merge_size) ** 2
147
+ runs = []
148
+ for t_val, count in zip(unique_t.tolist(), counts.tolist()):
149
+ if int(t_val) < 0:
150
+ continue
151
+ token_count = int(count) // merge_factor
152
+ if token_count <= 0:
153
+ continue
154
+ runs.append((_format_timestamp(float(t_val) / float(fps), decimals), token_count))
155
+ return runs
156
+
157
+
158
+ def rewrite_text_with_codec_positions(
159
+ text: str, patch_positions: torch.Tensor, fps: float, decimals: int,
160
+ ) -> str:
161
+ """Replace the vision span in a chat-template string with codec-aware tokens."""
162
+ parts = []
163
+ for timestamp, token_count in _timestamp_runs(patch_positions, fps, decimals):
164
+ parts.extend([timestamp, VISION_START, IMAGE_PAD * token_count, VISION_END, "\n"])
165
+ vision_text = "".join(parts)
166
+ first_vs, last_ve = text.find(VISION_START), text.rfind(VISION_END)
167
+ if first_vs == -1 or last_ve == -1:
168
+ return text
169
+ tail_start = last_ve + len(VISION_END)
170
+ if tail_start < len(text) and text[tail_start] == "\n":
171
+ tail_start += 1
172
+ return text[:first_vs] + vision_text + text[tail_start:]
173
+
174
+
175
+ def drop_padding_canvases(
176
+ images: list[Image.Image], src_positions: np.ndarray,
177
+ ) -> tuple[list[Image.Image], np.ndarray, int]:
178
+ """Drop fully-padding canvases (all-negative timestamps) and their patches."""
179
+ n_canvas = len(images)
180
+ if n_canvas == 0:
181
+ return images, src_positions, 0
182
+ total_patches = src_positions.shape[0]
183
+ if total_patches % n_canvas != 0:
184
+ raise ValueError(
185
+ f"src_positions length {total_patches} not divisible by canvas count {n_canvas}"
186
+ )
187
+ ppc = total_patches // n_canvas
188
+ positions = src_positions.reshape(n_canvas, ppc, 3)
189
+ canvas_t = positions[..., 0]
190
+ keep_mask = (canvas_t >= 0).any(axis=1)
191
+ if bool((keep_mask & ~((canvas_t >= 0).all(axis=1))).any()):
192
+ raise ValueError("encountered half-padding canvas; padding is expected to be canvas-granular")
193
+ dropped = int(n_canvas - int(keep_mask.sum()))
194
+ if dropped == 0:
195
+ return images, src_positions, 0
196
+ kept_images = [img for img, keep in zip(images, keep_mask.tolist()) if keep]
197
+ kept_positions = positions[keep_mask].reshape(-1, 3)
198
+ return kept_images, kept_positions, dropped
199
+
200
+
201
+ # ------------------------------------------------------- cv-preinfer driver
202
+
203
+ def _get_video_total_frames(video_url: str) -> int:
204
+ import cv2
205
+ cap = cv2.VideoCapture(video_url)
206
+ try:
207
+ total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
208
+ finally:
209
+ cap.release()
210
+ return max(1, total)
211
+
212
+
213
+ def _cache_dir_for(video_url: str, cfg: CodecConfig) -> Path:
214
+ raw = (
215
+ f"{video_url}|tc={cfg.target_canvas}|gs={cfg.group_size}"
216
+ f"|ipg={cfg.images_per_group}|patch={cfg.patch}"
217
+ f"|mp={cfg.max_pixels}|mask={cfg.spatial_mask_mode}"
218
+ )
219
+ key = hashlib.md5(raw.encode()).hexdigest()
220
+ return cfg.cache_root / f"{Path(video_url).stem}_{key}"
221
+
222
+
223
+ def _load_codec_result(out_dir: Path) -> dict:
224
+ with open(out_dir / "meta.json", "r", encoding="utf-8") as f:
225
+ meta = json.load(f)
226
+ canvas_files = meta.get("canvas_files")
227
+ if not canvas_files:
228
+ for ext in ("npy", "jpg", "png"):
229
+ hits = sorted(p.name for p in out_dir.glob(f"canvas_*.{ext}"))
230
+ if hits:
231
+ canvas_files = hits
232
+ break
233
+ canvas_files = canvas_files or []
234
+ images = []
235
+ for name in canvas_files:
236
+ fp = out_dir / name
237
+ if name.endswith(".npy"):
238
+ images.append(Image.fromarray(np.load(fp)))
239
+ else:
240
+ images.append(Image.open(fp).convert("RGB"))
241
+ src_positions = np.load(out_dir / "src_patch_position.npy")
242
+ fps = float(meta.get("fps") or 30.0)
243
+ return {"images": images, "src_positions": src_positions, "fps": fps,
244
+ "out_dir": str(out_dir), "meta": meta}
245
+
246
+
247
+ def _run_cv_preinfer(video_url: str, out_dir: Path, cfg: CodecConfig) -> dict:
248
+ tmp_dir = Path(tempfile.mkdtemp(dir=str(cfg.cache_root), prefix=f".tmp_{out_dir.name[:48]}_"))
249
+ num_sampled = min(cfg.num_sampled_frames(), _get_video_total_frames(video_url))
250
+ cmd = [
251
+ "cv-preinfer", "--video", video_url, "--out_dir", str(tmp_dir),
252
+ "--num_sampled_frames", str(num_sampled),
253
+ "--grouping_mode", "readiness",
254
+ "--group_size", str(cfg.group_size),
255
+ "--images_per_group", str(cfg.images_per_group),
256
+ "--patch", str(cfg.patch),
257
+ "--max_pixels", str(cfg.max_pixels),
258
+ "--readiness_sum_threshold", "0",
259
+ "--min_group_frames", str(cfg.min_group_frames),
260
+ "--max_group_frames", str(cfg.max_group_frames),
261
+ "--avoid_keyframes",
262
+ "--canvas_format", "jpg",
263
+ ]
264
+ try:
265
+ result = subprocess.run(cmd, text=True, capture_output=True, timeout=cfg.timeout_seconds)
266
+ if result.returncode != 0:
267
+ detail = (result.stderr or result.stdout)[-2000:]
268
+ raise RuntimeError(f"online codec failed rc={result.returncode}: {detail}")
269
+ if out_dir.exists():
270
+ shutil.rmtree(out_dir)
271
+ tmp_dir.rename(out_dir)
272
+ except Exception:
273
+ shutil.rmtree(tmp_dir, ignore_errors=True)
274
+ raise
275
+ return _load_codec_result(out_dir)
276
+
277
+
278
+ def process_codec_video(video_url: str, cfg: CodecConfig) -> dict:
279
+ """Public entrypoint: video URL + config -> dict(images, src_positions, fps, ...).
280
+
281
+ Result is cached on disk under ``cfg.cache_root``; concurrent workers
282
+ coordinate via a flock-protected sentinel.
283
+
284
+ Soft-warning behaviour (B-mode):
285
+ - If the video has fewer frames than needed to fill ``target_canvas``,
286
+ we emit a one-time UserWarning describing the shortfall but proceed
287
+ normally (cv-preinfer will produce fewer canvases than requested).
288
+ - If the video is so short that cv-preinfer cannot form a single
289
+ group (``< min_group_frames``), we emit a clearer warning and let
290
+ cv-preinfer's own error propagate.
291
+ """
292
+ cfg.validate()
293
+ out_dir = _cache_dir_for(video_url, cfg)
294
+ if (out_dir / "meta.json").exists() and (out_dir / "src_patch_position.npy").exists():
295
+ return _load_codec_result(out_dir)
296
+
297
+ _maybe_warn_short_video(video_url, cfg)
298
+
299
+ cfg.cache_root.mkdir(parents=True, exist_ok=True)
300
+ lock_path = cfg.cache_root / f".{out_dir.name}.lock"
301
+ lock_fd = os.open(str(lock_path), os.O_CREAT | os.O_RDWR, 0o644)
302
+ try:
303
+ if fcntl is not None:
304
+ fcntl.flock(lock_fd, fcntl.LOCK_EX)
305
+ if (out_dir / "meta.json").exists() and (out_dir / "src_patch_position.npy").exists():
306
+ return _load_codec_result(out_dir)
307
+ return _run_cv_preinfer(video_url, out_dir, cfg)
308
+ finally:
309
+ try:
310
+ if fcntl is not None:
311
+ fcntl.flock(lock_fd, fcntl.LOCK_UN)
312
+ finally:
313
+ os.close(lock_fd)
314
+
315
+
316
+ def _maybe_warn_short_video(video_url: str, cfg: CodecConfig) -> None:
317
+ """Soft-warn (B-mode) when a video is too short to fill target_canvas.
318
+
319
+ Logic:
320
+ * needed_frames = num_sampled_frames() = (target_canvas/ipg)*group_size
321
+ * usable_frames = min(needed_frames, total_frames)
322
+ * expected_canv = (usable_frames // group_size) * images_per_group
323
+ If ``expected_canv < target_canvas`` we warn. If
324
+ ``total_frames < min_group_frames`` we warn more loudly (cv-preinfer
325
+ will fail downstream and that error is allowed to propagate).
326
+ """
327
+ try:
328
+ total_frames = _get_video_total_frames(video_url)
329
+ except Exception:
330
+ return # don't fail on probe errors; cv-preinfer will report its own
331
+
332
+ needed = cfg.num_sampled_frames()
333
+ usable = min(needed, total_frames)
334
+ expected_canv = (usable // cfg.group_size) * cfg.images_per_group
335
+
336
+ if total_frames < cfg.min_group_frames:
337
+ warnings.warn(
338
+ f"[codec] video {video_url!r} has only {total_frames} frames "
339
+ f"(< min_group_frames={cfg.min_group_frames}); cv-preinfer cannot "
340
+ f"form even a single group and will error out. Consider lowering "
341
+ f"min_group_frames or using video_backend='frames' for this clip.",
342
+ UserWarning,
343
+ stacklevel=2,
344
+ )
345
+ return
346
+
347
+ if expected_canv < cfg.target_canvas:
348
+ warnings.warn(
349
+ f"[codec] video {video_url!r} has {total_frames} frames; with "
350
+ f"group_size={cfg.group_size}, images_per_group={cfg.images_per_group} "
351
+ f"this yields ~{expected_canv} canvas(es) instead of the requested "
352
+ f"target_canvas={cfg.target_canvas}. Inference will proceed with the "
353
+ f"smaller canvas count.",
354
+ UserWarning,
355
+ stacklevel=2,
356
+ )
357
+
358
+
359
+ # ----------------------------------------------------- processor wiring
360
+
361
+ def codec_image_processor_outputs(
362
+ image_processor, images: list[Image.Image], max_pixels: int,
363
+ ) -> dict:
364
+ """Run ``Qwen2VLImageProcessor`` on codec canvases without smart_resize-ing.
365
+
366
+ The codec emits canvases already aligned to the patch grid. To keep
367
+ ``image_grid_thw`` consistent with ``src_patch_position``:
368
+ - ``max_pixels`` is clamped up to the largest canvas (never shrinks)
369
+ - ``min_pixels`` is clamped down to the smallest canvas (never upscales)
370
+
371
+ Without the ``min_pixels`` clamp, ``Qwen2VLImageProcessor``'s default
372
+ ``min_pixels=200704`` would grow any canvas below that threshold,
373
+ producing extra patches and a chunk/index mismatch downstream.
374
+ """
375
+ canvas_pixels = [im.width * im.height for im in images]
376
+ proc_max = max(int(max_pixels), max(canvas_pixels, default=int(max_pixels)))
377
+ proc_min = min(canvas_pixels) if canvas_pixels else 1
378
+ return image_processor(
379
+ images=images, min_pixels=proc_min, max_pixels=proc_max, return_tensors="pt",
380
+ )
381
+
382
+
383
+ __all__ = [
384
+ "CodecConfig",
385
+ "process_codec_video",
386
+ "drop_padding_canvases",
387
+ "codec_positions_for_processor",
388
+ "rewrite_text_with_codec_positions",
389
+ "codec_image_processor_outputs",
390
+ "VISION_START", "VISION_END", "IMAGE_PAD",
391
+ ]