BryanW's picture
Upload folder using huggingface_hub
3d1c0e1 verified
# Copyright (c) 2025 FoundationVision
# SPDX-License-Identifier: MIT
from abc import ABC, abstractmethod
import io
import math
import numpy as np
from typing import Optional, TypeVar, Union
import collections
try:
import decord
except ImportError:
_HAS_DECORD = False
else:
_HAS_DECORD = True
if _HAS_DECORD:
decord.bridge.set_bridge('native')
DecordDevice = TypeVar("DecordDevice")
# https://github.com/dmlc/decord/issues/208#issuecomment-1157632702
class VideoReaderWrapper(decord.VideoReader):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.seek(0)
def __getitem__(self, key):
frames = super().__getitem__(key)
self.seek(0)
return frames
class Video(ABC):
"""
Video provides an interface to access clips from a video container.
"""
@abstractmethod
def __init__(
self,
file: Union[str, io.IOBase],
video_name: Optional[str] = None,
decode_audio: bool = True,
) -> None:
"""
Args:
file (BinaryIO): a file-like object (e.g. io.BytesIO or io.StringIO) that
contains the encoded video.
"""
pass
@property
@abstractmethod
def duration(self) -> float:
"""
Returns:
duration of the video in seconds
"""
pass
@abstractmethod
def get_clip(
self, start_sec: float, end_sec: float, num_samples: int
):
"""
Retrieves frames from the internal video at the specified start and end times
in seconds (the video always starts at 0 seconds).
Args:
start_sec (float): the clip start time in seconds
end_sec (float): the clip end time in seconds
Returns:
video_data_dictonary: A dictionary mapping strings to tensor of the clip's
underlying data.
"""
pass
def close(self):
pass
class EncodedVideoDecord(Video):
"""
Accessing clips from an encoded video using Decord video reading API
as the decoding backend. For more details, please refer to -
`Decord <https://github.com/dmlc/decord>`
"""
def __init__(
self,
file: Union[str, io.IOBase],
video_name: Optional[str] = None,
width: int = -1,
height: int = -1,
num_threads: int = 0,
fault_tol: int = -1,
) -> None:
"""
Args:
file str: file path.
video_name (str): An optional name assigned to the video.
decode_audio (bool): If disabled, audio is not decoded.
sample_rate: int, default is -1
Desired output sample rate of the audio, unchanged if `-1` is specified.
mono: bool, default is True
Desired output channel layout of the audio. `True` is mono layout. `False`
is unchanged.
width : int, default is -1
Desired output width of the video, unchanged if `-1` is specified.
height : int, default is -1
Desired output height of the video, unchanged if `-1` is specified.
num_threads : int, default is 0
Number of decoding thread, auto if `0` is specified.
fault_tol : int, default is -1
The threshold of corupted and recovered frames. This is to prevent silent fault
tolerance when for example 50% frames of a video cannot be decoded and duplicate
frames are returned. You may find the fault tolerant feature sweet in many
cases, but not for training models. Say `N = # recovered frames`
If `fault_tol` < 0, nothing will happen.
If 0 < `fault_tol` < 1.0, if N > `fault_tol * len(video)`,
raise `DECORDLimitReachedError`.
If 1 < `fault_tol`, if N > `fault_tol`, raise `DECORDLimitReachedError`.
"""
self._video_name = video_name
if not _HAS_DECORD:
raise ImportError(
"decord is required to use EncodedVideoDecord decoder. Please "
"install with 'pip install decord' for CPU-only version and refer to"
"'https://github.com/dmlc/decord' for GPU-supported version"
)
try:
self._av_reader = VideoReaderWrapper(
uri=file,
ctx=decord.cpu(0),
width=width,
height=height,
num_threads=num_threads,
fault_tol=fault_tol,
)
except Exception as e:
raise RuntimeError(f"Failed to open video {video_name} with Decord. {e}")
self._fps = self._av_reader.get_avg_fps()
self._duration = float(len(self._av_reader)) / float(self._fps)
@property
def name(self) -> Optional[str]:
"""
Returns:
name: the name of the stored video if set.
"""
return self._video_name
@property
def duration(self) -> float:
"""
Returns:
duration: the video's duration/end-time in seconds.
"""
return self._duration
def close(self):
if self._av_reader is not None:
del self._av_reader
self._av_reader = None
def get_clip(
self, start_sec: float, end_sec: float, num_samples: int
):
"""
Retrieves frames from the encoded video at the specified start and end times
in seconds (the video always starts at 0 seconds).
Args:
start_sec (float): the clip start time in seconds
end_sec (float): the clip end time in seconds
Returns:
clip_data:
A dictionary mapping the entries at "video" and "audio" to a tensors.
"video": A tensor of the clip's RGB frames with shape:
(channel, time, height, width). The frames are of type torch.float32 and
in the range [0 - 255].
Returns None if no video or audio found within time range.
"""
if start_sec > end_sec or start_sec > self._duration:
raise RuntimeError(
f"Incorrect time window for Decord decoding for video: {self._video_name}."
)
start_idx = math.ceil(self._fps * start_sec)
end_idx = math.ceil(self._fps * end_sec)
end_idx = min(end_idx, len(self._av_reader))
# frame_idxs = list(range(start_idx, end_idx))
frame_idxs = np.linspace(start_idx, end_idx - 1, num_samples, dtype=int)
try:
outputs = self._av_reader.get_batch(frame_idxs)
return outputs.asnumpy(), frame_idxs - frame_idxs[0]
except Exception as e:
print(f"Failed to decode video with Decord: {self._video_name}. {e}")
raise e
try:
import cv2
except ImportError:
print(f"ERR: import cv2 failed, install cv2 by 'pip install opencv-python'")
class EncodedVideoOpencv():
def __init__(
self,
file: Union[str, io.IOBase],
video_name: Optional[str] = None,
width: int = -1,
height: int = -1,
num_threads: int = 0,
fault_tol: int = -1,
) -> None:
"""
Args:
file str: file path.
video_name (str): An optional name assigned to the video.
width : Not support yet.
height : Not support yet.
num_threads : Not support yet.
fault_tol : Not support yet.
"""
self._video_name = video_name
self.cap = cv2.VideoCapture(file)
self._fps = self.cap.get(cv2.CAP_PROP_FPS)
self._vlen = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
self._duration = float(self._vlen) / float(self._fps)
@property
def name(self) -> Optional[str]:
"""
Returns:
name: the name of the stored video if set.
"""
return self._video_name
@property
def duration(self) -> float:
"""
Returns:
duration: the video's duration/end-time in seconds.
"""
return self._duration
def __del__(self):
self.close()
def close(self):
self.cap.release()
def get_clip(
self, start_sec: float, end_sec: float, num_samples: int
):
if start_sec > end_sec or start_sec > self._duration:
raise RuntimeError(
f"Incorrect time window for Decord decoding for video: {self._video_name}."
)
start_idx = math.ceil(self._fps * start_sec)
end_idx = math.ceil(self._fps * end_sec)
end_idx = min(end_idx, self._vlen)
frame_idxs = np.linspace(start_idx, end_idx - 1, num_samples, dtype=int)
frame_idx2freq = collections.defaultdict(int)
for frame_idx in frame_idxs:
frame_idx2freq[frame_idx] += 1
try:
frames = []
for i in range(self._vlen):
if i > frame_idxs[-1]:
break
ret, frame = self.cap.read()
if i in frame_idx2freq:
frames.extend([frame] * frame_idx2freq[i])
frames = np.array(frames).astype(np.uint8) # BGR type
assert len(frames) == num_samples
return frames, frame_idxs - frame_idxs[0]
except Exception as e:
print(f"Failed to decode video with opencv: {self._video_name}. {e}")
raise e