File size: 9,555 Bytes
3d1c0e1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 |
# Copyright (c) 2025 FoundationVision
# SPDX-License-Identifier: MIT
from abc import ABC, abstractmethod
import io
import math
import numpy as np
from typing import Optional, TypeVar, Union
import collections
try:
import decord
except ImportError:
_HAS_DECORD = False
else:
_HAS_DECORD = True
if _HAS_DECORD:
decord.bridge.set_bridge('native')
DecordDevice = TypeVar("DecordDevice")
# https://github.com/dmlc/decord/issues/208#issuecomment-1157632702
class VideoReaderWrapper(decord.VideoReader):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.seek(0)
def __getitem__(self, key):
frames = super().__getitem__(key)
self.seek(0)
return frames
class Video(ABC):
"""
Video provides an interface to access clips from a video container.
"""
@abstractmethod
def __init__(
self,
file: Union[str, io.IOBase],
video_name: Optional[str] = None,
decode_audio: bool = True,
) -> None:
"""
Args:
file (BinaryIO): a file-like object (e.g. io.BytesIO or io.StringIO) that
contains the encoded video.
"""
pass
@property
@abstractmethod
def duration(self) -> float:
"""
Returns:
duration of the video in seconds
"""
pass
@abstractmethod
def get_clip(
self, start_sec: float, end_sec: float, num_samples: int
):
"""
Retrieves frames from the internal video at the specified start and end times
in seconds (the video always starts at 0 seconds).
Args:
start_sec (float): the clip start time in seconds
end_sec (float): the clip end time in seconds
Returns:
video_data_dictonary: A dictionary mapping strings to tensor of the clip's
underlying data.
"""
pass
def close(self):
pass
class EncodedVideoDecord(Video):
"""
Accessing clips from an encoded video using Decord video reading API
as the decoding backend. For more details, please refer to -
`Decord <https://github.com/dmlc/decord>`
"""
def __init__(
self,
file: Union[str, io.IOBase],
video_name: Optional[str] = None,
width: int = -1,
height: int = -1,
num_threads: int = 0,
fault_tol: int = -1,
) -> None:
"""
Args:
file str: file path.
video_name (str): An optional name assigned to the video.
decode_audio (bool): If disabled, audio is not decoded.
sample_rate: int, default is -1
Desired output sample rate of the audio, unchanged if `-1` is specified.
mono: bool, default is True
Desired output channel layout of the audio. `True` is mono layout. `False`
is unchanged.
width : int, default is -1
Desired output width of the video, unchanged if `-1` is specified.
height : int, default is -1
Desired output height of the video, unchanged if `-1` is specified.
num_threads : int, default is 0
Number of decoding thread, auto if `0` is specified.
fault_tol : int, default is -1
The threshold of corupted and recovered frames. This is to prevent silent fault
tolerance when for example 50% frames of a video cannot be decoded and duplicate
frames are returned. You may find the fault tolerant feature sweet in many
cases, but not for training models. Say `N = # recovered frames`
If `fault_tol` < 0, nothing will happen.
If 0 < `fault_tol` < 1.0, if N > `fault_tol * len(video)`,
raise `DECORDLimitReachedError`.
If 1 < `fault_tol`, if N > `fault_tol`, raise `DECORDLimitReachedError`.
"""
self._video_name = video_name
if not _HAS_DECORD:
raise ImportError(
"decord is required to use EncodedVideoDecord decoder. Please "
"install with 'pip install decord' for CPU-only version and refer to"
"'https://github.com/dmlc/decord' for GPU-supported version"
)
try:
self._av_reader = VideoReaderWrapper(
uri=file,
ctx=decord.cpu(0),
width=width,
height=height,
num_threads=num_threads,
fault_tol=fault_tol,
)
except Exception as e:
raise RuntimeError(f"Failed to open video {video_name} with Decord. {e}")
self._fps = self._av_reader.get_avg_fps()
self._duration = float(len(self._av_reader)) / float(self._fps)
@property
def name(self) -> Optional[str]:
"""
Returns:
name: the name of the stored video if set.
"""
return self._video_name
@property
def duration(self) -> float:
"""
Returns:
duration: the video's duration/end-time in seconds.
"""
return self._duration
def close(self):
if self._av_reader is not None:
del self._av_reader
self._av_reader = None
def get_clip(
self, start_sec: float, end_sec: float, num_samples: int
):
"""
Retrieves frames from the encoded video at the specified start and end times
in seconds (the video always starts at 0 seconds).
Args:
start_sec (float): the clip start time in seconds
end_sec (float): the clip end time in seconds
Returns:
clip_data:
A dictionary mapping the entries at "video" and "audio" to a tensors.
"video": A tensor of the clip's RGB frames with shape:
(channel, time, height, width). The frames are of type torch.float32 and
in the range [0 - 255].
Returns None if no video or audio found within time range.
"""
if start_sec > end_sec or start_sec > self._duration:
raise RuntimeError(
f"Incorrect time window for Decord decoding for video: {self._video_name}."
)
start_idx = math.ceil(self._fps * start_sec)
end_idx = math.ceil(self._fps * end_sec)
end_idx = min(end_idx, len(self._av_reader))
# frame_idxs = list(range(start_idx, end_idx))
frame_idxs = np.linspace(start_idx, end_idx - 1, num_samples, dtype=int)
try:
outputs = self._av_reader.get_batch(frame_idxs)
return outputs.asnumpy(), frame_idxs - frame_idxs[0]
except Exception as e:
print(f"Failed to decode video with Decord: {self._video_name}. {e}")
raise e
try:
import cv2
except ImportError:
print(f"ERR: import cv2 failed, install cv2 by 'pip install opencv-python'")
class EncodedVideoOpencv():
def __init__(
self,
file: Union[str, io.IOBase],
video_name: Optional[str] = None,
width: int = -1,
height: int = -1,
num_threads: int = 0,
fault_tol: int = -1,
) -> None:
"""
Args:
file str: file path.
video_name (str): An optional name assigned to the video.
width : Not support yet.
height : Not support yet.
num_threads : Not support yet.
fault_tol : Not support yet.
"""
self._video_name = video_name
self.cap = cv2.VideoCapture(file)
self._fps = self.cap.get(cv2.CAP_PROP_FPS)
self._vlen = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
self._duration = float(self._vlen) / float(self._fps)
@property
def name(self) -> Optional[str]:
"""
Returns:
name: the name of the stored video if set.
"""
return self._video_name
@property
def duration(self) -> float:
"""
Returns:
duration: the video's duration/end-time in seconds.
"""
return self._duration
def __del__(self):
self.close()
def close(self):
self.cap.release()
def get_clip(
self, start_sec: float, end_sec: float, num_samples: int
):
if start_sec > end_sec or start_sec > self._duration:
raise RuntimeError(
f"Incorrect time window for Decord decoding for video: {self._video_name}."
)
start_idx = math.ceil(self._fps * start_sec)
end_idx = math.ceil(self._fps * end_sec)
end_idx = min(end_idx, self._vlen)
frame_idxs = np.linspace(start_idx, end_idx - 1, num_samples, dtype=int)
frame_idx2freq = collections.defaultdict(int)
for frame_idx in frame_idxs:
frame_idx2freq[frame_idx] += 1
try:
frames = []
for i in range(self._vlen):
if i > frame_idxs[-1]:
break
ret, frame = self.cap.read()
if i in frame_idx2freq:
frames.extend([frame] * frame_idx2freq[i])
frames = np.array(frames).astype(np.uint8) # BGR type
assert len(frames) == num_samples
return frames, frame_idxs - frame_idxs[0]
except Exception as e:
print(f"Failed to decode video with opencv: {self._video_name}. {e}")
raise e
|