import os from dataclasses import dataclass, field from io import BytesIO from typing import TYPE_CHECKING, Any, ClassVar, Dict, Optional, Union import numpy as np import pyarrow as pa from .. import config from ..download.download_config import DownloadConfig from ..table import array_cast from ..utils.file_utils import is_local_path, xopen from ..utils.py_utils import string_to_dict if TYPE_CHECKING: from decord import VideoReader from .features import FeatureType @dataclass class Video: """ **Experimental.** Video [`Feature`] to read video data from a video file. Input: The Video feature accepts as input: - A `str`: Absolute path to the video file (i.e. random access is allowed). - A `dict` with the keys: - `path`: String with relative path of the video file in a dataset repository. - `bytes`: Bytes of the video file. This is useful for archived files with sequential access. - A `decord.VideoReader`: decord video reader object. Args: mode (`str`, *optional*): The mode to convert the video to. If `None`, the native mode of the video is used. decode (`bool`, defaults to `True`): Whether to decode the video data. If `False`, returns the underlying dictionary in the format `{"path": video_path, "bytes": video_bytes}`. Examples: ```py >>> from datasets import Dataset, Video >>> ds = Dataset.from_dict({"video":["path/to/Screen Recording.mov"]}).cast_column("video", Video()) >>> ds.features["video"] Video(decode=True, id=None) >>> ds[0]["video"] >>> ds = ds.cast_column('video', Video(decode=False)) {'bytes': None, 'path': 'path/to/Screen Recording.mov'} ``` """ decode: bool = True id: Optional[str] = None # Automatically constructed dtype: ClassVar[str] = "decord.VideoReader" pa_type: ClassVar[Any] = pa.struct({"bytes": pa.binary(), "path": pa.string()}) _type: str = field(default="Video", init=False, repr=False) def __post_init__(self): if config.DECORD_AVAILABLE: patch_decord() def __call__(self): return self.pa_type def encode_example(self, value: Union[str, bytes, dict, np.ndarray, "VideoReader"]) -> dict: """Encode example into a format for Arrow. Args: value (`str`, `np.ndarray`, `VideoReader` or `dict`): Data passed as input to Video feature. Returns: `dict` with "path" and "bytes" fields """ if config.DECORD_AVAILABLE: from decord import VideoReader else: VideoReader = None if isinstance(value, list): value = np.array(value) if isinstance(value, str): return {"path": value, "bytes": None} elif isinstance(value, bytes): return {"path": None, "bytes": value} elif isinstance(value, np.ndarray): # convert the video array to bytes return encode_np_array(value) elif VideoReader and isinstance(value, VideoReader): # convert the decord video reader to bytes return encode_decord_video(value) elif value.get("path") is not None and os.path.isfile(value["path"]): # we set "bytes": None to not duplicate the data if they're already available locally return {"bytes": None, "path": value.get("path")} elif value.get("bytes") is not None or value.get("path") is not None: # store the video bytes, and path is used to infer the video format using the file extension return {"bytes": value.get("bytes"), "path": value.get("path")} else: raise ValueError( f"A video sample should have one of 'path' or 'bytes' but they are missing or None in {value}." ) def decode_example(self, value: dict, token_per_repo_id=None) -> "VideoReader": """Decode example video file into video data. Args: value (`str` or `dict`): A string with the absolute video file path, a dictionary with keys: - `path`: String with absolute or relative video file path. - `bytes`: The bytes of the video file. token_per_repo_id (`dict`, *optional*): To access and decode video files from private repositories on the Hub, you can pass a dictionary repo_id (`str`) -> token (`bool` or `str`). Returns: `decord.VideoReader` """ if not self.decode: raise RuntimeError("Decoding is disabled for this feature. Please use Video(decode=True) instead.") if config.DECORD_AVAILABLE: from decord import VideoReader else: raise ImportError("To support decoding videos, please install 'decord'.") if token_per_repo_id is None: token_per_repo_id = {} path, bytes_ = value["path"], value["bytes"] if bytes_ is None: if path is None: raise ValueError(f"A video should have one of 'path' or 'bytes' but both are None in {value}.") else: if is_local_path(path): video = VideoReader(path) else: source_url = path.split("::")[-1] pattern = ( config.HUB_DATASETS_URL if source_url.startswith(config.HF_ENDPOINT) else config.HUB_DATASETS_HFFS_URL ) try: repo_id = string_to_dict(source_url, pattern)["repo_id"] token = token_per_repo_id.get(repo_id) except ValueError: token = None download_config = DownloadConfig(token=token) with xopen(path, "rb", download_config=download_config) as f: bytes_ = BytesIO(f.read()) video = VideoReader(bytes_) else: video = VideoReader(BytesIO(bytes_)) return video def flatten(self) -> Union["FeatureType", Dict[str, "FeatureType"]]: """If in the decodable state, return the feature itself, otherwise flatten the feature into a dictionary.""" from .features import Value return ( self if self.decode else { "bytes": Value("binary"), "path": Value("string"), } ) def cast_storage(self, storage: Union[pa.StringArray, pa.StructArray, pa.ListArray]) -> pa.StructArray: """Cast an Arrow array to the Video arrow storage type. The Arrow types that can be converted to the Video pyarrow storage type are: - `pa.string()` - it must contain the "path" data - `pa.binary()` - it must contain the video bytes - `pa.struct({"bytes": pa.binary()})` - `pa.struct({"path": pa.string()})` - `pa.struct({"bytes": pa.binary(), "path": pa.string()})` - order doesn't matter - `pa.list(*)` - it must contain the video array data Args: storage (`Union[pa.StringArray, pa.StructArray, pa.ListArray]`): PyArrow array to cast. Returns: `pa.StructArray`: Array in the Video arrow storage type, that is `pa.struct({"bytes": pa.binary(), "path": pa.string()})`. """ if pa.types.is_string(storage.type): bytes_array = pa.array([None] * len(storage), type=pa.binary()) storage = pa.StructArray.from_arrays([bytes_array, storage], ["bytes", "path"], mask=storage.is_null()) elif pa.types.is_binary(storage.type): path_array = pa.array([None] * len(storage), type=pa.string()) storage = pa.StructArray.from_arrays([storage, path_array], ["bytes", "path"], mask=storage.is_null()) elif pa.types.is_struct(storage.type): if storage.type.get_field_index("bytes") >= 0: bytes_array = storage.field("bytes") else: bytes_array = pa.array([None] * len(storage), type=pa.binary()) if storage.type.get_field_index("path") >= 0: path_array = storage.field("path") else: path_array = pa.array([None] * len(storage), type=pa.string()) storage = pa.StructArray.from_arrays([bytes_array, path_array], ["bytes", "path"], mask=storage.is_null()) elif pa.types.is_list(storage.type): bytes_array = pa.array( [encode_np_array(np.array(arr))["bytes"] if arr is not None else None for arr in storage.to_pylist()], type=pa.binary(), ) path_array = pa.array([None] * len(storage), type=pa.string()) storage = pa.StructArray.from_arrays( [bytes_array, path_array], ["bytes", "path"], mask=bytes_array.is_null() ) return array_cast(storage, self.pa_type) def video_to_bytes(video: "VideoReader") -> bytes: """Convert a decord Video object to bytes using native compression if possible""" raise NotImplementedError() def encode_decord_video(video: "VideoReader") -> dict: if hasattr(video, "_hf_encoded"): return video._hf_encoded else: raise NotImplementedError( "Encoding a decord video is not implemented. " "Please call `datasets.features.video.patch_decord()` before loading videos to enable this." ) def encode_np_array(array: np.ndarray) -> dict: raise NotImplementedError() # Patching decord a little bit to: # 1. store the encoded video data {"path": ..., "bytes": ...} in `video._hf_encoded`` # 2. set the decord bridge to numpy/torch/tf/jax using `video._hf_bridge_out` (per video instance) instead of decord.bridge.bridge_out (global) # This doesn't affect the normal usage of decord. def _patched_init(self: "VideoReader", uri: Union[str, BytesIO], *args, **kwargs) -> None: from decord.bridge import bridge_out if hasattr(uri, "read"): self._hf_encoded = {"bytes": uri.read(), "path": None} uri.seek(0) elif isinstance(uri, str): self._hf_encoded = {"bytes": None, "path": uri} self._hf_bridge_out = bridge_out self._original_init(uri, *args, **kwargs) def _patched_next(self: "VideoReader", *args, **kwargs): return self._hf_bridge_out(self._original_next(*args, **kwargs)) def _patched_get_batch(self: "VideoReader", *args, **kwargs): return self._hf_bridge_out(self._original_get_batch(*args, **kwargs)) def patch_decord(): # We need to import torch first, otherwise later it can cause issues # e.g. "RuntimeError: random_device could not be read" # when running `torch.tensor(value).share_memory_()` # Same for duckdb which crashes on import if config.TORCH_AVAILABLE: import torch # noqa if config.DUCKDB_AVAILABLE: import duckdb # noqa import decord.video_reader from decord import VideoReader if not hasattr(VideoReader, "_hf_patched"): decord.video_reader.bridge_out = lambda x: x VideoReader._original_init = VideoReader.__init__ VideoReader.__init__ = _patched_init VideoReader._original_next = VideoReader.next VideoReader.next = _patched_next VideoReader._original_get_batch = VideoReader.get_batch VideoReader.get_batch = _patched_get_batch VideoReader._hf_patched = True