| from __future__ import annotations |
|
|
| from dataclasses import dataclass |
| from threading import RLock |
| from typing import Any |
|
|
| _VIRTUAL_MEDIA_SOURCES: dict[str, dict[str, dict[str, Any]]] = {} |
| _VIRTUAL_MEDIA_LOCK = RLock() |
|
|
|
|
| @dataclass(frozen=True) |
| class VirtualMediaSpec: |
| source_path: str |
| start_frame: int = 0 |
| end_frame: int | None = None |
| audio_track_no: int | None = None |
| extras: tuple[tuple[str, str], ...] = () |
|
|
| def as_suffix_items(self) -> list[tuple[str, str]]: |
| items: list[tuple[str, str]] = [] |
| if self.start_frame != 0: |
| items.append(("start_frame", str(int(self.start_frame)))) |
| if self.end_frame is not None: |
| items.append(("end_frame", str(int(self.end_frame)))) |
| if self.audio_track_no is not None: |
| items.append(("audio_track_no", str(int(self.audio_track_no)))) |
| items.extend(list(self.extras)) |
| return items |
|
|
|
|
| def parse_virtual_media_path(value: Any) -> VirtualMediaSpec | None: |
| if not isinstance(value, str): |
| return None |
| text = value.strip() |
| if "|" not in text: |
| return None |
| source_path, suffix = text.split("|", 1) |
| source_path = source_path.strip() |
| if len(source_path) == 0: |
| return None |
| values: dict[str, str] = {} |
| extras: list[tuple[str, str]] = [] |
| for raw_item in suffix.split(","): |
| item = raw_item.strip() |
| if len(item) == 0: |
| continue |
| key, sep, raw_value = item.partition("=") |
| key = key.strip().lower() |
| if sep == "": |
| extras.append((item, "")) |
| continue |
| value_text = raw_value.strip() |
| if key in ("start_frame", "end_frame", "audio_track_no"): |
| values[key] = value_text |
| else: |
| extras.append((key, value_text)) |
| return VirtualMediaSpec( |
| source_path=source_path, |
| start_frame=_parse_int(values.get("start_frame"), 0), |
| end_frame=_parse_optional_int(values.get("end_frame")), |
| audio_track_no=_parse_optional_int(values.get("audio_track_no")), |
| extras=tuple(extras), |
| ) |
|
|
|
|
| def strip_virtual_media_suffix(value: Any) -> Any: |
| spec = parse_virtual_media_path(value) |
| return spec.source_path if spec is not None else value |
|
|
|
|
| def build_virtual_media_path( |
| source_path: str, |
| *, |
| start_frame: int | None = None, |
| end_frame: int | None = None, |
| audio_track_no: int | None = None, |
| extras: dict[str, Any] | None = None, |
| ) -> str: |
| base_path = str(source_path or "").strip() |
| if len(base_path) == 0: |
| return base_path |
| parts: list[str] = [] |
| if start_frame is not None: |
| parts.append(f"start_frame={int(start_frame)}") |
| if end_frame is not None: |
| parts.append(f"end_frame={int(end_frame)}") |
| if audio_track_no is not None: |
| parts.append(f"audio_track_no={int(audio_track_no)}") |
| for key, value in (extras or {}).items(): |
| key_text = str(key or "").strip() |
| value_text = str(value or "").strip() |
| if len(key_text) == 0 or len(value_text) == 0: |
| continue |
| parts.append(f"{key_text}={value_text}") |
| return base_path if len(parts) == 0 else f"{base_path}|{','.join(parts)}" |
|
|
|
|
| def replace_virtual_media_source(value: Any, source_path: str) -> Any: |
| spec = parse_virtual_media_path(value) |
| if spec is None: |
| return source_path |
| extras = dict(spec.extras) |
| return build_virtual_media_path( |
| source_path, |
| start_frame=spec.start_frame if spec.start_frame != 0 else None, |
| end_frame=spec.end_frame, |
| audio_track_no=spec.audio_track_no, |
| extras=extras, |
| ) |
|
|
|
|
| def clamp_virtual_frame_range(spec: VirtualMediaSpec | None, total_frames: int) -> tuple[int, int | None]: |
| if spec is None: |
| return 0, None |
| explicit_frame_count = _parse_int(_extras_dict(spec).get("frame_count"), 0) |
| total_frames = explicit_frame_count if explicit_frame_count > 0 else int(total_frames or 0) |
| if total_frames <= 0: |
| return 0, None |
| start_frame = _resolve_relative_frame_index(spec.start_frame, total_frames, default_to_end=False) |
| end_frame = _resolve_relative_frame_index(spec.end_frame, total_frames, default_to_end=True) |
| if end_frame is None: |
| return start_frame, total_frames - 1 |
| end_frame = max(start_frame, end_frame) |
| return start_frame, end_frame |
|
|
|
|
| def get_virtual_media_vsource(value: Any) -> str | None: |
| spec = value if isinstance(value, VirtualMediaSpec) else parse_virtual_media_path(value) |
| if spec is None: |
| return None |
| vsource = _extras_dict(spec).get("vsource", "").strip() |
| return vsource or None |
|
|
|
|
| def get_virtual_media_entry(value: Any) -> dict[str, Any] | None: |
| spec = value if isinstance(value, VirtualMediaSpec) else parse_virtual_media_path(value) |
| if spec is None: |
| return None |
| vsource = get_virtual_media_vsource(spec) |
| if vsource is None: |
| return None |
| with _VIRTUAL_MEDIA_LOCK: |
| entry = _VIRTUAL_MEDIA_SOURCES.get(vsource, {}).get(str(spec.source_path or "").strip()) |
| return None if entry is None else dict(entry) |
|
|
|
|
| def store_virtual_video(vsource: str, name: str, tensor: Any, fps: float, *, hdr: bool = False) -> None: |
| import torch |
|
|
| tensor = tensor.detach().cpu().to(dtype=torch.float32).contiguous().clone() |
| with _VIRTUAL_MEDIA_LOCK: |
| _VIRTUAL_MEDIA_SOURCES.setdefault(str(vsource).strip(), {})[str(name).strip()] = { |
| "kind": "video", |
| "tensor": tensor, |
| "fps": max(float(fps or 0.0), 1.0), |
| "hdr": bool(hdr), |
| } |
| _clear_virtual_media_caches() |
|
|
|
|
| def get_virtual_video(value: Any, name: str | None = None) -> Any: |
| entry = _get_virtual_media_entry_from_key(value, name) |
| if entry is None or entry.get("kind") != "video": |
| return None |
| tensor = entry.get("tensor") |
| return None if tensor is None else tensor.clone() |
|
|
|
|
| def store_virtual_image(vsource: str, name: str, image: Any) -> None: |
| with _VIRTUAL_MEDIA_LOCK: |
| _VIRTUAL_MEDIA_SOURCES.setdefault(str(vsource).strip(), {})[str(name).strip()] = {"kind": "image", "image": image.copy()} |
| _clear_virtual_media_caches() |
|
|
|
|
| def get_virtual_image(value: Any, name: str | None = None) -> Any: |
| entry = _get_virtual_media_entry_from_key(value, name) |
| if entry is None or entry.get("kind") != "image": |
| return None |
| image = entry.get("image") |
| return None if image is None else image.copy() |
|
|
|
|
| def clear_virtual_media_source(vsource: str) -> None: |
| with _VIRTUAL_MEDIA_LOCK: |
| _VIRTUAL_MEDIA_SOURCES.pop(str(vsource).strip(), None) |
| _clear_virtual_media_caches() |
|
|
|
|
| def _parse_int(value: Any, default: int) -> int: |
| try: |
| return int(str(value or "").strip()) |
| except (TypeError, ValueError): |
| return default |
|
|
|
|
| def _parse_optional_int(value: Any) -> int | None: |
| text = str(value or "").strip() |
| if len(text) == 0: |
| return None |
| try: |
| return int(text) |
| except (TypeError, ValueError): |
| return None |
|
|
|
|
| def _resolve_relative_frame_index(value: int | None, total_frames: int, *, default_to_end: bool) -> int | None: |
| if total_frames <= 0: |
| return None if value is None else 0 |
| if value is None: |
| return total_frames - 1 if default_to_end else 0 |
| index = int(value) |
| if index < 0: |
| index = total_frames + index |
| return max(0, min(index, total_frames - 1)) |
|
|
|
|
| def _extras_dict(spec: VirtualMediaSpec) -> dict[str, str]: |
| return {str(key or "").strip().lower(): str(value or "").strip() for key, value in spec.extras} |
|
|
|
|
| def _get_virtual_media_entry_from_key(value: Any, name: str | None = None) -> dict[str, Any] | None: |
| if name is None: |
| return get_virtual_media_entry(value) |
| with _VIRTUAL_MEDIA_LOCK: |
| entry = _VIRTUAL_MEDIA_SOURCES.get(str(value).strip(), {}).get(str(name).strip()) |
| return None if entry is None else dict(entry) |
|
|
|
|
| def _clear_virtual_media_caches() -> None: |
| try: |
| from . import video_decode as _video_decode |
|
|
| _video_decode.probe_video_stream_metadata.cache_clear() |
| except Exception: |
| pass |
| try: |
| from . import utils as _utils |
|
|
| _utils._get_video_info_cached.cache_clear() |
| _utils._get_video_info_details_cached.cache_clear() |
| except Exception: |
| pass |
|
|