File size: 10,918 Bytes
a103028 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 |
import string
import cv2
# from imutils.video import FileVideoStream
from math import floor
from ..common.Rects import *
from ..common.Common import *
from ..common.Log import *
from ..media.VideoInfo import *
# window_rect is NormalizedRect which can reads video and returns a crop window. Pass None to disable cropping
def CreateVideoReader(video_path: string, window_rect: PixelRect = None):
return OpenCV_VideoReader(video_path, window_rect)
class VideoReader:
"""
A flexible and more optimzed video reader against opencv video reader.
"""
def __init__(self, video_path, window_rect: PixelRect = None):
self._video_path = video_path
self._window_rect = window_rect
self._current_frame = 0
if self._window_rect is not None:
log.debug("Cropping into " + self._window_rect.String() + " for " + self._video_path)
def seek(self, frame_number):
self._current_frame = frame_number
log.debug("Seeking to frame " + str(self._current_frame) + " for file " + self._video_path)
# Returns a frame in RGB format
def read(self):
self._current_frame = self._current_frame + 1
if self._current_frame > self.frame_count:
log.error("Reading past to frame " + str(self._current_frame) + " for " + self._video_path + " (" + str(self.frame_count) + " frames)")
return None
def release(self):
log.debug("Releasing file " + self._video_path)
def windowed_frame(self, frame):
return frame[self._window_rect.min_y:self._window_rect.max_y, self._window_rect.min_x:self._window_rect.max_x, :]
@property
def current_frame(self):
return self._current_frame
@property
def width(self):
width = self.frame_width
if self._window_rect is not None:
width = floor(self._window_rect.max_x - self._window_rect.min_x)
return width
@property
def height(self):
height = self.frame_height
if self._window_rect is not None:
height = floor(self._window_rect.max_y - self._window_rect.min_y)
return height
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# OpenCV_VideoReader - OpenCV Video Reader
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class OpenCV_VideoReader(VideoReader):
def __init__(self, video_path, window_rect: PixelRect = None):
super().__init__(video_path, window_rect)
self._reader = cv2.VideoCapture(self._video_path)
if self._reader.isOpened() == False:
log.fatal("Could not construct file reader for " + self._video_path)
log.info("Creating OpenCV_VideoReader for file " + self._video_path)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Returns a frame in RGB format
def read(self):
super().read()
ret, frame = self._reader.read()
# if frame is read correctly ret is True
if not ret:
log.info("Can't receive frame (stream end?). Exiting ...")
return None
if self._window_rect is not None:
frame = self.windowed_frame(frame)
return frame
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def release(self):
super().release()
self._reader.release()
def seek(self, frame_number):
# no need to seek if we are at the right position - greatly speeds up reading sunbsequent frames
if frame_number is not None and frame_number != self.current_frame:
super().seek(frame_number)
self._reader.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
@property
def frame_rate(self):
return self._reader.get(cv2.CAP_PROP_FPS)
@property
def frame_width(self):
return int(self._reader.get(cv2.CAP_PROP_FRAME_WIDTH))
@property
def frame_height(self):
return int(self._reader.get(cv2.CAP_PROP_FRAME_HEIGHT))
@property
def frame_count(self):
return int(self._reader.get(cv2.CAP_PROP_FRAME_COUNT))
@property
def aspect_ratio(self):
return self.frame_width/self.frame_height
def is_16by9(self):
return self.frame_width/self.frame_height == 16./9.
def is_4by3(self):
return self.frame_width/self.frame_height == 4./3.
# class Nvidia_VideoReader(VideoReader):
# def __init__(self, video_path, window_rect: PixelRect = None):
# super().__init__(video_path, window_rect)
# import PyNvCodec as nvc
# gpuID = 0
# self._nvc_Decoder = nvc.PyNvDecoder(self._video_path, gpuID)
# if self._nvc_Decoder == None:
# log.fatal("Failed to create _nvc_Decoder")
# self._nvc_ColorConversion = nvc.ColorspaceConversionContext(color_space=nvc.ColorSpace.BT_601, color_range=nvc.ColorRange.MPEG)
# if self._nvc_ColorConversion == None:
# log.fatal("Failed to create _nvc_ColorConversion")
# width, height = self._nvc_Decoder.Width(), self._nvc_Decoder.Height()
# log.info("Creating Nvidia_VideoReader for file " + self._video_path + " (" + str(width) + "x" + str(height) + ")")
# self._nvc_SurfaceDownloader = nvc.PySurfaceDownloader(width, height, nvc.PixelFormat.RGB, gpuID)
# if self._nvc_SurfaceDownloader == None:
# log.fatal("Failed to create PySurfaceDownloader")
# self._nvc_Nv12ToYuv = nvc.PySurfaceConverter(width, height, nvc.PixelFormat.NV12, nvc.PixelFormat.YUV420, gpuID)
# if self._nvc_Nv12ToYuv == None:
# log.fatal("Failed to create _nvc_Nv12ToYuv")
# self._nvc_YuvToRgb = nvc.PySurfaceConverter(width, height, nvc.PixelFormat.YUV420, nvc.PixelFormat.RGB, gpuID)
# if self._nvc_YuvToRgb == None:
# log.fatal("Failed to create _nvc_YuvToRgb")
# self._raw_frame = np.ndarray(shape=(self.frame_width * self.frame_height * 3), dtype=np.uint8)
# def seek(self, frame_number):
# # no need to seek if we are at the right position - greatly speeds up reading sunbsequent frames
# if frame_number is not None and frame_number != self.current_frame:
# while self._current_frame != frame_number:
# super().read()
# nv12_surface = self._nvc_Decoder.DecodeSingleSurface()
# # Decoder will return zero surface if input file is over;
# if nv12_surface.Empty():
# log.fatal("Failed to decode video frame")
# return None
# log.debug("Seeking to frame " + str(self._current_frame) + " for file " + self._video_path)
# # Returns a frame in RGB format
# def read(self):
# super().read()
# nv12_surface = self._nvc_Decoder.DecodeSingleSurface()
# # Decoder will return zero surface if input file is over;
# if nv12_surface.Empty():
# log.fatal("Failed to decode video frame")
# return None
# yuv_surface = self._nvc_Nv12ToYuv.Execute(nv12_surface, self._nvc_ColorConversion)
# if (yuv_surface.Empty()):
# log.fatal("Failed to do color conversion (nv12 -> yuv)")
# rgb_surface = self._nvc_YuvToRgb.Execute(yuv_surface, self._nvc_ColorConversion)
# if (rgb_surface.Empty()):
# log.fatal("Failed to do color conversion (yuv -> rgb)")
# success = self._nvc_SurfaceDownloader.DownloadSingleSurface(rgb_surface, self._raw_frame)
# if not (success):
# log.fatal("Failed to download surface")
# frame = np.resize(self._raw_frame,(self.frame_height, self.frame_width, 3))
# if self._window_rect is not None:
# frame = self.windowed_frame(frame)
# return frame
# def release(self):
# super().release()
# self._nvc_Decoder = None
# self._nvc_ColorConversion = None
# self._nvc_SurfaceDownloader = None
# self._nvc_Nv12ToYuv = None
# self._nvc_YuvToRgb = None
# # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# # FileVideoStream_VideoReader - FileVideoStream Video Reader
# # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# class FileVideoStream_VideoReader(FileVideoStream):
# def __init__(self, video_path, window_rect: PixelRect = None):
# super(FileVideoStream_VideoReader, self).__init__(video_path, transform=None, queue_size=128)
# self._video_path = video_path
# self._window_rect = window_rect
# log.info("Creating FileVideoStream_VideoReader for file " + video_path)
# if self._window_rect is not None:
# log.debug(video_path + ": cropping into " + self._window_rect.String())
# def seek(self, frame_number):
# self.stream.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
# def release(self):
# self.stop()
# def read(self, frame_number = None):
# # no need to seek if we are at the right position - greatly speeds up reading sunbsequent frames
# if frame_number is not None and frame_number != self.current_frame:
# self.seek(frame_number)
# frame = super().read()
# if self._window_rect is not None:
# frame = frame[self._window_rect.min_y:self._window_rect.max_y, self._window_rect.min_x:self._window_rect.max_x, :]
# return frame
# @property
# def frame_rate(self):
# return round(self.stream.get(cv2.CAP_PROP_FPS), 2)
# @property
# def frame_width(self):
# return int(self.stream.get(cv2.CAP_PROP_FRAME_WIDTH))
# @property
# def frame_height(self):
# return int(self.stream.get(cv2.CAP_PROP_FRAME_HEIGHT))
# @property
# def frame_count(self):
# return int(self.stream.get(cv2.CAP_PROP_FRAME_COUNT))
# @property
# def current_frame(self):
# return self.stream.get(cv2.CAP_PROP_POS_FRAMES)
# @property
# def width(self):
# width = self.frame_width
# if self._window_rect is not None:
# width = floor(self._window_rect.max_x - self._window_rect.min_x)
# return width
# @property
# def height(self):
# height = self.frame_height
# if self._window_rect is not None:
# height = floor(self._window_rect.max_y - self._window_rect.min_y)
# return height |