| import json |
| import os |
| import shutil |
| import subprocess |
| import threading |
| from functools import lru_cache |
|
|
| import numpy as np |
| import torch |
|
|
| from .virtual_media import clamp_virtual_frame_range, get_virtual_media_entry, parse_virtual_media_path, strip_virtual_media_suffix |
|
|
| _ZSCALE_TRANSFER_MAP = {"smpte2084": "smpte2084", "arib-std-b67": "arib-std-b67", "bt709": "bt709", "bt2020-10": "2020_10", "bt2020-12": "2020_12"} |
| _ZSCALE_PRIMARIES_MAP = {"bt2020": "2020", "bt709": "709", "smpte170m": "170m", "bt470bg": "470bg"} |
| _ZSCALE_MATRIX_MAP = {"bt2020nc": "2020_ncl", "bt2020c": "2020_cl", "bt709": "709", "smpte170m": "170m", "bt470bg": "470bg"} |
| _ZSCALE_RANGE_MAP = {"tv": "limited", "limited": "limited", "pc": "full", "full": "full"} |
| _HDR_REFERENCE_WHITE_NITS = 203 |
| _VIRTUAL_MEDIA_PRESEEK_FRAMES = 64 |
| _VIRTUAL_MEDIA_LOCAL_SEARCH_FRAMES = 8 |
|
|
|
|
| def _parse_media_ratio(value, default=None): |
| if value in [None, "", "N/A", "0:1", "0/0"]: |
| return default |
| if isinstance(value, (int, float)): |
| return float(value) |
| text = str(value).strip() |
| if ":" in text: |
| num, den = text.split(":", 1) |
| elif "/" in text: |
| num, den = text.split("/", 1) |
| else: |
| try: |
| return float(text) |
| except (TypeError, ValueError): |
| return default |
| try: |
| num = float(num) |
| den = float(den) |
| except (TypeError, ValueError): |
| return default |
| return default if den == 0 else num / den |
|
|
|
|
| def _resample_frame_indices(video_fps, video_frames_count, max_target_frames_count, target_fps, start_target_frame): |
| import math |
|
|
| video_frame_duration = 1 / video_fps |
| target_frame_duration = 1 / target_fps |
| target_time = start_target_frame * target_frame_duration |
| frame_no = math.ceil(target_time / video_frame_duration) |
| cur_time = frame_no * video_frame_duration |
| frame_ids = [] |
| while True: |
| if max_target_frames_count != 0 and len(frame_ids) >= max_target_frames_count: |
| break |
| diff = round((target_time - cur_time) / video_frame_duration, 5) |
| add_frames_count = math.ceil(diff) |
| frame_no += add_frames_count |
| if frame_no >= video_frames_count: |
| break |
| frame_ids.append(frame_no) |
| cur_time += add_frames_count * video_frame_duration |
| target_time += target_frame_duration |
| return frame_ids[:max_target_frames_count] |
|
|
|
|
| def _resolve_media_binary(binary_name: str): |
| env_map = {"ffmpeg": "FFMPEG_BINARY", "ffprobe": "FFPROBE_BINARY", "ffplay": "FFPLAY_BINARY"} |
| binary_path = os.environ.get(env_map.get(binary_name, ""), "") |
| if len(binary_path) > 0 and os.path.isfile(binary_path): |
| return binary_path |
| repo_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| candidate = os.path.join(repo_root, "ffmpeg_bins", binary_name + (".exe" if os.name == "nt" else "")) |
| if os.path.isfile(candidate): |
| return candidate |
| return shutil.which(binary_name + (".exe" if os.name == "nt" else "")) or shutil.which(binary_name) |
|
|
|
|
| def resolve_media_binary(binary_name: str): |
| return _resolve_media_binary(binary_name) |
|
|
|
|
| def _augment_virtual_metadata(video_path, metadata): |
| spec = parse_virtual_media_path(video_path) |
| if spec is None or metadata is None: |
| return metadata |
| total_frames = int(metadata.get("frame_count") or 0) |
| start_frame, end_frame = clamp_virtual_frame_range(spec, total_frames) |
| virtual_metadata = dict(metadata) |
| virtual_metadata["source_path"] = spec.source_path |
| virtual_metadata["virtual_start_frame"] = start_frame |
| virtual_metadata["virtual_end_frame"] = end_frame |
| if end_frame is None: |
| return virtual_metadata |
| virtual_frame_count = max(0, end_frame - start_frame + 1) |
| virtual_metadata["frame_count"] = virtual_frame_count |
| fps_float = float(virtual_metadata.get("fps_float") or 0.0) |
| fps = int(virtual_metadata.get("fps") or 0) |
| effective_fps = fps_float if fps_float > 0 else float(fps or 0) |
| if effective_fps > 0: |
| virtual_metadata["duration"] = virtual_frame_count / effective_fps |
| return virtual_metadata |
|
|
|
|
| def _build_vsource_metadata(video_path, entry): |
| if not isinstance(entry, dict): |
| return None |
| if entry.get("kind") == "image": |
| image = entry.get("image") |
| if image is None: |
| return None |
| width, height = image.size |
| fps_float = 1.0 |
| frame_count = 1 |
| elif entry.get("kind") == "video": |
| tensor = entry.get("tensor") |
| if tensor is None or int(getattr(tensor, "ndim", 0)) != 4: |
| return None |
| width = int(tensor.shape[3]) |
| height = int(tensor.shape[2]) |
| frame_count = int(tensor.shape[1]) |
| fps_float = max(float(entry.get("fps") or 0.0), 1.0) |
| else: |
| return None |
| return _augment_virtual_metadata(video_path, { |
| "source_path": parse_virtual_media_path(video_path).source_path if parse_virtual_media_path(video_path) is not None else "", |
| "width": width, |
| "height": height, |
| "display_width": width, |
| "display_height": height, |
| "fps_float": fps_float, |
| "fps": int(round(fps_float)), |
| "frame_count": frame_count, |
| "duration": float(frame_count / fps_float) if fps_float > 0 else 0.0, |
| "start_time": 0.0, |
| "sample_aspect_ratio": "1:1", |
| "display_aspect_ratio": "", |
| "color_transfer": "", |
| "color_primaries": "", |
| "color_space": "", |
| "color_range": "", |
| "needs_sar_fix": False, |
| "needs_tonemap": False, |
| "hdr": bool(entry.get("hdr")), |
| }) |
|
|
|
|
| @lru_cache(maxsize=128) |
| def probe_video_stream_metadata(video_path): |
| video_path = os.fspath(video_path) |
| if (entry := get_virtual_media_entry(video_path)) is not None: |
| return _build_vsource_metadata(video_path, entry) |
| source_path = os.fspath(strip_virtual_media_suffix(video_path)) |
| ffprobe_path = _resolve_media_binary("ffprobe") |
| if ffprobe_path is None: |
| return None |
| probe_cmd = [ffprobe_path, "-v", "error", "-select_streams", "v:0", "-show_streams", "-show_format", "-of", "json", source_path] |
| probe = subprocess.run(probe_cmd, capture_output=True, text=True, encoding="utf-8", errors="ignore", check=False) |
| if probe.returncode != 0: |
| return None |
| try: |
| probe_data = json.loads(probe.stdout) |
| except json.JSONDecodeError: |
| return None |
| streams = probe_data.get("streams") or [] |
| if len(streams) == 0: |
| return None |
| stream = streams[0] |
| codec_name = str(stream.get("codec_name") or "") |
| codec_profile = str(stream.get("profile") or "") |
| width = int(stream.get("width") or 0) |
| height = int(stream.get("height") or 0) |
| if width <= 0 or height <= 0: |
| return None |
| sar = _parse_media_ratio(stream.get("sample_aspect_ratio"), 1.0) or 1.0 |
| dar = _parse_media_ratio(stream.get("display_aspect_ratio")) |
| display_width = width |
| if abs(sar - 1.0) > 1e-6: |
| display_width = max(2, (int(width * sar) // 2) * 2) |
| elif dar is not None and dar > 0: |
| display_width = max(2, (int(height * dar) // 2) * 2) |
| fps_float = _parse_media_ratio(stream.get("avg_frame_rate"), 0.0) or _parse_media_ratio(stream.get("r_frame_rate"), 0.0) or 0.0 |
| duration = stream.get("duration") or (probe_data.get("format") or {}).get("duration") or 0.0 |
| try: |
| duration = float(duration) |
| except (TypeError, ValueError): |
| duration = 0.0 |
| try: |
| start_time = float(stream.get("start_time") or 0.0) |
| except (TypeError, ValueError): |
| start_time = 0.0 |
| try: |
| frame_count = int(stream.get("nb_frames")) |
| except (TypeError, ValueError): |
| frame_count = int(round(duration * fps_float)) if duration > 0 and fps_float > 0 else 0 |
| side_data = stream.get("side_data_list") or [] |
| color_transfer = str(stream.get("color_transfer") or "").lower() |
| color_primaries = str(stream.get("color_primaries") or "").lower() |
| color_space = str(stream.get("color_space") or "").lower() |
| color_range = str(stream.get("color_range") or "").lower() |
| sample_aspect_ratio = str(stream.get("sample_aspect_ratio") or "1:1") |
| display_aspect_ratio = str(stream.get("display_aspect_ratio") or "") |
| is_hdr = color_transfer in {"smpte2084", "arib-std-b67"} or color_primaries == "bt2020" or any( |
| str(item.get("side_data_type") or "").lower() in {"mastering display metadata", "content light level metadata"} for item in side_data |
| ) |
| return _augment_virtual_metadata(video_path, { |
| "codec_name": codec_name, |
| "codec_profile": codec_profile, |
| "width": width, |
| "height": height, |
| "display_width": display_width, |
| "display_height": height, |
| "fps_float": fps_float, |
| "fps": int(round(fps_float)) if fps_float > 0 else 0, |
| "frame_count": frame_count, |
| "duration": duration, |
| "start_time": start_time, |
| "sample_aspect_ratio": sample_aspect_ratio, |
| "display_aspect_ratio": display_aspect_ratio, |
| "color_transfer": color_transfer, |
| "color_primaries": color_primaries, |
| "color_space": color_space, |
| "color_range": color_range, |
| "needs_sar_fix": display_width != width, |
| "needs_tonemap": is_hdr, |
| }) |
|
|
|
|
| def _decode_virtual_media_frames(video_path, metadata, entry, start_frame, max_frames, target_fps, bridge): |
| if entry.get("kind") == "image": |
| if int(start_frame) > 0 or int(max_frames) <= 0: |
| frames = torch.empty((0, metadata["display_height"], metadata["display_width"], 3), dtype=torch.uint8) |
| else: |
| image = np.asarray(entry["image"].convert("RGB"), dtype=np.uint8)[None] |
| frames = torch.from_numpy(image) |
| else: |
| tensor = entry["tensor"] |
| start_index = int(metadata.get("virtual_start_frame") or 0) |
| end_index = metadata.get("virtual_end_frame") |
| tensor = tensor[:, start_index:] if end_index is None else tensor[:, start_index:int(end_index) + 1] |
| frame_count = int(tensor.shape[1]) |
| if target_fps is None or float(target_fps) <= 0: |
| start_index = max(0, int(start_frame)) |
| frames = tensor[:, start_index:start_index + max(0, int(max_frames))].permute(1, 2, 3, 0) |
| else: |
| source_fps = metadata["fps"] if metadata["fps"] > 0 else max(1, int(round(metadata["fps_float"] or 0))) |
| frame_nos = _resample_frame_indices(source_fps, frame_count, int(max_frames), float(target_fps), int(start_frame)) |
| frames = tensor[:, frame_nos].permute(1, 2, 3, 0) if len(frame_nos) > 0 else tensor[:, :0].permute(1, 2, 3, 0) |
| if entry.get("hdr"): |
| frames = frames.to(torch.float32).contiguous() |
| else: |
| frames = frames.add(1.0).mul(127.5).clamp_(0, 255).to(torch.uint8).contiguous() |
| return frames if bridge == "torch" else frames.numpy() |
|
|
|
|
| def video_needs_corrected_decode(video_path): |
| metadata = probe_video_stream_metadata(video_path) |
| return metadata is not None and (metadata["needs_sar_fix"] or metadata["needs_tonemap"]) |
|
|
|
|
| def _build_hdr_tonemap_filter(metadata): |
| zscale_parts = ["t=linear", f"npl={_HDR_REFERENCE_WHITE_NITS}"] |
| if transfer := _ZSCALE_TRANSFER_MAP.get(metadata["color_transfer"]): |
| zscale_parts.append(f"tin={transfer}") |
| if primaries := _ZSCALE_PRIMARIES_MAP.get(metadata["color_primaries"]): |
| zscale_parts.append(f"pin={primaries}") |
| if matrix := _ZSCALE_MATRIX_MAP.get(metadata["color_space"]): |
| zscale_parts.append(f"min={matrix}") |
| if color_range := _ZSCALE_RANGE_MAP.get(metadata.get("color_range")): |
| zscale_parts.append(f"rin={color_range}") |
| return ["zscale=" + ":".join(zscale_parts), "format=gbrpf32le", "tonemap=reinhard", "zscale=t=bt709:p=bt709:m=bt709:r=limited"] |
|
|
|
|
| def _build_hdr_linear_filter(metadata): |
| zscale_parts = [f"npl={_HDR_REFERENCE_WHITE_NITS}", "t=linear", "p=709", "m=gbr", "r=full"] |
| if transfer := _ZSCALE_TRANSFER_MAP.get(metadata["color_transfer"]): |
| zscale_parts.append(f"tin={transfer}") |
| if primaries := _ZSCALE_PRIMARIES_MAP.get(metadata["color_primaries"]): |
| zscale_parts.append(f"pin={primaries}") |
| if matrix := _ZSCALE_MATRIX_MAP.get(metadata["color_space"]): |
| zscale_parts.append(f"min={matrix}") |
| if color_range := _ZSCALE_RANGE_MAP.get(metadata.get("color_range")): |
| zscale_parts.append(f"rin={color_range}") |
| return ["zscale=" + ":".join(zscale_parts), "format=gbrpf32le"] |
|
|
|
|
| def _build_corrected_video_filter(metadata, target_fps=None, start_frame=0, end_frame=None, hdr_linear=False): |
| filters = [] |
| if target_fps is not None and float(target_fps) > 0: |
| filters.append(f"fps={float(target_fps):.12g}") |
| if start_frame > 0 or end_frame is not None: |
| trim_parts = [f"start_frame={int(start_frame)}"] |
| if end_frame is not None: |
| trim_parts.append(f"end_frame={int(end_frame)}") |
| filters.append("trim=" + ":".join(trim_parts)) |
| filters.append("setpts=PTS-STARTPTS") |
| if metadata["needs_sar_fix"]: |
| filters += [f"scale={int(metadata['display_width'])}:{int(metadata['display_height'])}:flags=lanczos", "setsar=1"] |
| if hdr_linear: |
| filters += _build_hdr_linear_filter(metadata) |
| return ",".join(filters) |
| if metadata["needs_tonemap"]: |
| filters += _build_hdr_tonemap_filter(metadata) |
| return ",".join(filters) |
|
|
|
|
| def _read_exact(stream, size): |
| buf = bytearray(size) |
| view = memoryview(buf) |
| read_pos = 0 |
| while read_pos < size: |
| chunk = stream.read(size - read_pos) |
| if not chunk: |
| return None if read_pos == 0 else bytes(view[:read_pos]) |
| view[read_pos:read_pos + len(chunk)] = chunk |
| read_pos += len(chunk) |
| return buf |
|
|
|
|
| def _drain_stream(stream, chunks): |
| while True: |
| chunk = stream.read(65536) |
| if not chunk: |
| break |
| chunks.append(chunk) |
|
|
|
|
| def _parse_first_showinfo_pts_time(stderr_text): |
| for line in str(stderr_text or "").splitlines(): |
| pts_marker = " pts_time:" |
| pts_index = line.find(pts_marker) |
| if pts_index < 0: |
| continue |
| pts_text = line[pts_index + len(pts_marker):].split(None, 1)[0].strip() |
| try: |
| return float(pts_text) |
| except (TypeError, ValueError): |
| continue |
| return None |
|
|
|
|
| def _decode_contiguous_video_frames_ffmpeg(video_path, start_frame, max_frames, bridge="torch", hdr_linear=False): |
| metadata = probe_video_stream_metadata(video_path) |
| if metadata is None: |
| raise RuntimeError(f"Unable to probe video metadata for {video_path}") |
| virtual_spec = parse_virtual_media_path(video_path) |
| decode_path = os.fspath(metadata.get("source_path") or strip_virtual_media_suffix(video_path)) |
| ffmpeg_path = _resolve_media_binary("ffmpeg") |
| if ffmpeg_path is None: |
| raise RuntimeError("ffmpeg binary not found") |
| start_frame = int(start_frame) |
| max_frames = int(max_frames) |
| if metadata.get("virtual_end_frame") is not None: |
| available_frames = max(0, int(metadata["frame_count"]) - max(0, start_frame)) |
| max_frames = min(max_frames, available_frames) |
| if max_frames <= 0: |
| empty_dtype = np.float32 if hdr_linear else np.uint8 |
| empty = np.empty((0, metadata["display_height"], metadata["display_width"], 3), dtype=empty_dtype) |
| return torch.from_numpy(empty) if bridge == "torch" else empty |
| actual_start = start_frame + int(metadata.get("virtual_start_frame") or 0) |
| fps_float = float(metadata.get("fps_float") or metadata.get("fps") or 0.0) |
| actual_end_exclusive = actual_start + max_frames |
| filter_start_frame = 0 |
| filter_end_frame = None |
| decode_seek_frame = actual_start |
| local_search_enabled = virtual_spec is not None and fps_float > 0 and actual_start > 0 |
| requested_frames = max_frames |
| if virtual_spec is not None: |
| if local_search_enabled: |
| decode_seek_frame = max(0, actual_start - _VIRTUAL_MEDIA_PRESEEK_FRAMES - _VIRTUAL_MEDIA_LOCAL_SEARCH_FRAMES) |
| filter_start_frame = actual_start - decode_seek_frame |
| requested_frames = filter_start_frame + max_frames + _VIRTUAL_MEDIA_LOCAL_SEARCH_FRAMES |
| elif fps_float > 0 and actual_start > 0: |
| decode_seek_frame = max(0, actual_start - _VIRTUAL_MEDIA_PRESEEK_FRAMES) |
| filter_start_frame = actual_start - decode_seek_frame |
| filter_end_frame = filter_start_frame + max_frames |
| else: |
| filter_start_frame = actual_start |
| filter_end_frame = actual_end_exclusive |
| video_filter = _build_corrected_video_filter(metadata, start_frame=filter_start_frame if virtual_spec is not None and not local_search_enabled else 0, end_frame=filter_end_frame if virtual_spec is not None and not local_search_enabled else None, hdr_linear=hdr_linear) |
| if local_search_enabled: |
| video_filter = "showinfo" if len(video_filter) == 0 else video_filter + ",showinfo" |
| cmd = [ffmpeg_path, "-v", "info" if local_search_enabled else "error", "-nostdin", "-threads", "0"] |
| if local_search_enabled: |
| cmd += ["-copyts"] |
| if fps_float > 0 and decode_seek_frame > 0: |
| cmd += ["-ss", f"{float(metadata.get('start_time') or 0.0) + (decode_seek_frame / fps_float):.12g}"] |
| cmd += ["-i", decode_path, "-an", "-sn"] |
| if len(video_filter) > 0: |
| cmd += ["-vf", video_filter] |
| out_pix_fmt = "gbrpf32le" if hdr_linear else "rgb24" |
| cmd += ["-fps_mode", "passthrough", "-frames:v", str(requested_frames), "-f", "rawvideo", "-pix_fmt", out_pix_fmt, "pipe:1"] |
| process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=10**7) |
| frame_bytes = metadata["display_width"] * metadata["display_height"] * 3 * (4 if hdr_linear else 1) |
| frame_dtype = np.float32 if hdr_linear else np.uint8 |
| frames_shape = (requested_frames, 3, metadata["display_height"], metadata["display_width"]) if hdr_linear else (requested_frames, metadata["display_height"], metadata["display_width"], 3) |
| frames = np.empty(frames_shape, dtype=frame_dtype) |
| frame_count = 0 |
| stderr_chunks = [] |
| stderr_thread = None |
| try: |
| if process.stderr is not None: |
| stderr_thread = threading.Thread(target=_drain_stream, args=(process.stderr, stderr_chunks), daemon=True) |
| stderr_thread.start() |
| while frame_count < requested_frames: |
| raw_frame = _read_exact(process.stdout, frame_bytes) |
| if raw_frame is None or len(raw_frame) < frame_bytes: |
| break |
| if hdr_linear: |
| frames[frame_count] = np.frombuffer(raw_frame, dtype=np.float32).reshape(3, metadata["display_height"], metadata["display_width"]) |
| else: |
| frames[frame_count] = np.frombuffer(raw_frame, dtype=np.uint8).reshape(metadata["display_height"], metadata["display_width"], 3) |
| frame_count += 1 |
| return_code = process.wait() |
| if stderr_thread is not None: |
| stderr_thread.join() |
| stderr = b"".join(stderr_chunks).decode("utf-8", errors="ignore").strip() |
| finally: |
| if process.stdout is not None: |
| process.stdout.close() |
| if process.stderr is not None: |
| process.stderr.close() |
| if return_code != 0 and frame_count == 0: |
| raise RuntimeError(f"ffmpeg decode failed for {video_path}: {stderr}") |
| frames = frames[:frame_count] |
| if local_search_enabled and frame_count > 0: |
| first_pts_time = _parse_first_showinfo_pts_time(stderr) |
| target_pts_time = float(metadata.get("start_time") or 0.0) + (actual_start / fps_float) |
| local_start_frame = filter_start_frame if first_pts_time is None else max(0, int(round((target_pts_time - first_pts_time) * fps_float))) |
| frames = frames[local_start_frame:local_start_frame + max_frames] |
| if hdr_linear: |
| frames = np.ascontiguousarray(frames[:, [2, 0, 1]].transpose(0, 2, 3, 1)) |
| return torch.from_numpy(frames) if bridge == "torch" else frames |
|
|
|
|
| def decode_video_frame_indices_ffmpeg(video_path, frame_indices, bridge="torch", hdr_linear=False): |
| if torch.is_tensor(frame_indices): |
| frame_indices = frame_indices.detach().cpu().tolist() |
| frame_indices = [int(frame_index) for frame_index in frame_indices] |
| metadata = probe_video_stream_metadata(video_path) |
| if metadata is None: |
| raise RuntimeError(f"Unable to probe video metadata for {video_path}") |
| if len(frame_indices) == 0: |
| empty_dtype = np.float32 if hdr_linear else np.uint8 |
| empty = np.empty((0, metadata["display_height"], metadata["display_width"], 3), dtype=empty_dtype) |
| return torch.from_numpy(empty) if bridge == "torch" else empty |
| start_frame = min(frame_indices) |
| if (entry := get_virtual_media_entry(video_path)) is not None: |
| decoded = _decode_virtual_media_frames(video_path, metadata, entry, start_frame, max(frame_indices) - start_frame + 1, None, "torch") |
| frames = decoded[[frame_index - start_frame for frame_index in frame_indices]] |
| return frames if bridge == "torch" else frames.numpy() |
| unique_indices = sorted(set(frame_indices)) |
| span = max(unique_indices) - start_frame + 1 |
| if span <= len(unique_indices) * 3: |
| decoded = decode_video_frames_ffmpeg(video_path, start_frame, span, target_fps=None, bridge="torch", hdr_linear=hdr_linear) |
| frames = decoded[[frame_index - start_frame for frame_index in frame_indices]] |
| return frames if bridge == "torch" else frames.numpy() |
| ffmpeg_path = _resolve_media_binary("ffmpeg") |
| if ffmpeg_path is None: |
| raise RuntimeError("ffmpeg binary not found") |
| decode_path = os.fspath(metadata.get("source_path") or strip_virtual_media_suffix(video_path)) |
| fps_float = float(metadata.get("fps_float") or metadata.get("fps") or 0.0) |
| actual_start = start_frame + int(metadata.get("virtual_start_frame") or 0) |
| rel_indices = [frame_index - start_frame for frame_index in unique_indices] |
| select_expr = "+".join(f"eq(n\\,{frame_index})" for frame_index in rel_indices) |
| video_filter = f"select={select_expr},setpts=N/FRAME_RATE/TB" |
| corrected_filter = _build_corrected_video_filter(metadata, hdr_linear=hdr_linear) |
| if len(corrected_filter) > 0: |
| video_filter += "," + corrected_filter |
| cmd = [ffmpeg_path, "-v", "error", "-nostdin", "-threads", "0"] |
| if fps_float > 0 and actual_start > 0: |
| cmd += ["-ss", f"{float(metadata.get('start_time') or 0.0) + (actual_start / fps_float):.12g}"] |
| cmd += ["-i", decode_path, "-an", "-sn", "-vf", video_filter] |
| out_pix_fmt = "gbrpf32le" if hdr_linear else "rgb24" |
| cmd += ["-fps_mode", "passthrough", "-frames:v", str(len(unique_indices)), "-f", "rawvideo", "-pix_fmt", out_pix_fmt, "pipe:1"] |
| process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=10**7) |
| frame_bytes = metadata["display_width"] * metadata["display_height"] * 3 * (4 if hdr_linear else 1) |
| frame_dtype = np.float32 if hdr_linear else np.uint8 |
| frames_shape = (len(unique_indices), 3, metadata["display_height"], metadata["display_width"]) if hdr_linear else (len(unique_indices), metadata["display_height"], metadata["display_width"], 3) |
| frames_np = np.empty(frames_shape, dtype=frame_dtype) |
| frame_count = 0 |
| stderr_chunks = [] |
| stderr_thread = None |
| try: |
| if process.stderr is not None: |
| stderr_thread = threading.Thread(target=_drain_stream, args=(process.stderr, stderr_chunks), daemon=True) |
| stderr_thread.start() |
| while frame_count < len(unique_indices): |
| raw_frame = _read_exact(process.stdout, frame_bytes) |
| if raw_frame is None or len(raw_frame) < frame_bytes: |
| break |
| if hdr_linear: |
| frames_np[frame_count] = np.frombuffer(raw_frame, dtype=np.float32).reshape(3, metadata["display_height"], metadata["display_width"]) |
| else: |
| frames_np[frame_count] = np.frombuffer(raw_frame, dtype=np.uint8).reshape(metadata["display_height"], metadata["display_width"], 3) |
| frame_count += 1 |
| return_code = process.wait() |
| if stderr_thread is not None: |
| stderr_thread.join() |
| stderr = b"".join(stderr_chunks).decode("utf-8", errors="ignore").strip() |
| finally: |
| if process.stdout is not None: |
| process.stdout.close() |
| if process.stderr is not None: |
| process.stderr.close() |
| if return_code != 0 or frame_count != len(unique_indices): |
| raise RuntimeError(f"ffmpeg indexed decode failed for {video_path}: {stderr}") |
| if hdr_linear: |
| frames_np = np.ascontiguousarray(frames_np[:, [2, 0, 1]].transpose(0, 2, 3, 1)) |
| frames = torch.from_numpy(frames_np) |
| positions = {frame_index: pos for pos, frame_index in enumerate(unique_indices)} |
| frames = frames[[positions[frame_index] for frame_index in frame_indices]] |
| return frames if bridge == "torch" else frames.numpy() |
|
|
|
|
| def decode_video_frames_ffmpeg(video_path, start_frame, max_frames, target_fps=None, bridge="torch", hdr_linear=False): |
| metadata = probe_video_stream_metadata(video_path) |
| if metadata is None: |
| raise RuntimeError(f"Unable to probe video metadata for {video_path}") |
| if (entry := get_virtual_media_entry(video_path)) is not None: |
| return _decode_virtual_media_frames(video_path, metadata, entry, start_frame, max_frames, target_fps, bridge) |
| start_frame = int(start_frame) |
| if metadata.get("virtual_end_frame") is not None and start_frame >= int(metadata["frame_count"]): |
| empty_dtype = np.float32 if hdr_linear else np.uint8 |
| empty = np.empty((0, metadata["display_height"], metadata["display_width"], 3), dtype=empty_dtype) |
| return torch.from_numpy(empty) if bridge == "torch" else empty |
| if target_fps is None or float(target_fps) <= 0: |
| return _decode_contiguous_video_frames_ffmpeg(video_path, start_frame, max_frames, bridge, hdr_linear=hdr_linear) |
| source_fps = metadata["fps"] if metadata["fps"] > 0 else max(1, int(round(metadata["fps_float"] or 0))) |
| frame_nos = _resample_frame_indices(source_fps, metadata["frame_count"], int(max_frames), float(target_fps), int(start_frame)) |
| if len(frame_nos) == 0: |
| empty_dtype = np.float32 if hdr_linear else np.uint8 |
| empty = np.empty((0, metadata["display_height"], metadata["display_width"], 3), dtype=empty_dtype) |
| return torch.from_numpy(empty) if bridge == "torch" else empty |
| decode_start = frame_nos[0] |
| decoded = _decode_contiguous_video_frames_ffmpeg(video_path, decode_start, frame_nos[-1] - decode_start + 1, bridge, hdr_linear=hdr_linear) |
| index_list = [frame_no - decode_start for frame_no in frame_nos if frame_no - decode_start < decoded.shape[0]] |
| if bridge == "torch": |
| return decoded[index_list] |
| return decoded[index_list] |
|
|
|
|
| def get_video_summary_extras(video_path): |
| metadata = probe_video_stream_metadata(video_path) |
| if metadata is None: |
| return [], [] |
| values, labels = [], [] |
| if metadata["needs_sar_fix"]: |
| values += [f"{metadata['width']}x{metadata['height']}", metadata["sample_aspect_ratio"]] |
| labels += ["Stored Raster", "Pixel Aspect Ratio"] |
| if len(metadata["display_aspect_ratio"]) > 0: |
| values += [f"{metadata['display_aspect_ratio']} (square-pixel {metadata['display_width']}x{metadata['display_height']})"] |
| labels += ["Display Aspect Ratio"] |
| if metadata["needs_tonemap"]: |
| hdr_parts = [] |
| if metadata["color_transfer"] == "smpte2084": |
| hdr_parts += ["HDR PQ"] |
| elif metadata["color_transfer"] == "arib-std-b67": |
| hdr_parts += ["HDR HLG"] |
| elif len(metadata["color_transfer"]) > 0: |
| hdr_parts += [metadata["color_transfer"].upper()] |
| if len(metadata["color_primaries"]) > 0: |
| hdr_parts += [metadata["color_primaries"].upper()] |
| if len(metadata["color_space"]) > 0 and metadata["color_space"] != metadata["color_primaries"]: |
| hdr_parts += [metadata["color_space"].upper()] |
| values += [" / ".join(hdr_parts) if len(hdr_parts) > 0 else "HDR source"] |
| labels += ["Color"] |
| return values, labels |
|
|