import string import cv2 # from imutils.video import FileVideoStream from math import floor from ..common.Rects import * from ..common.Common import * from ..common.Log import * from ..media.VideoInfo import * # window_rect is NormalizedRect which can reads video and returns a crop window. Pass None to disable cropping def CreateVideoReader(video_path: string, window_rect: PixelRect = None): return OpenCV_VideoReader(video_path, window_rect) class VideoReader: """ A flexible and more optimzed video reader against opencv video reader. """ def __init__(self, video_path, window_rect: PixelRect = None): self._video_path = video_path self._window_rect = window_rect self._current_frame = 0 if self._window_rect is not None: log.debug("Cropping into " + self._window_rect.String() + " for " + self._video_path) def seek(self, frame_number): self._current_frame = frame_number log.debug("Seeking to frame " + str(self._current_frame) + " for file " + self._video_path) # Returns a frame in RGB format def read(self): self._current_frame = self._current_frame + 1 if self._current_frame > self.frame_count: log.error("Reading past to frame " + str(self._current_frame) + " for " + self._video_path + " (" + str(self.frame_count) + " frames)") return None def release(self): log.debug("Releasing file " + self._video_path) def windowed_frame(self, frame): return frame[self._window_rect.min_y:self._window_rect.max_y, self._window_rect.min_x:self._window_rect.max_x, :] @property def current_frame(self): return self._current_frame @property def width(self): width = self.frame_width if self._window_rect is not None: width = floor(self._window_rect.max_x - self._window_rect.min_x) return width @property def height(self): height = self.frame_height if self._window_rect is not None: height = floor(self._window_rect.max_y - self._window_rect.min_y) return height # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # OpenCV_VideoReader - OpenCV Video Reader # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ class OpenCV_VideoReader(VideoReader): def __init__(self, video_path, window_rect: PixelRect = None): super().__init__(video_path, window_rect) self._reader = cv2.VideoCapture(self._video_path) if self._reader.isOpened() == False: log.fatal("Could not construct file reader for " + self._video_path) log.info("Creating OpenCV_VideoReader for file " + self._video_path) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Returns a frame in RGB format def read(self): super().read() ret, frame = self._reader.read() # if frame is read correctly ret is True if not ret: log.info("Can't receive frame (stream end?). Exiting ...") return None if self._window_rect is not None: frame = self.windowed_frame(frame) return frame # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def release(self): super().release() self._reader.release() def seek(self, frame_number): # no need to seek if we are at the right position - greatly speeds up reading sunbsequent frames if frame_number is not None and frame_number != self.current_frame: super().seek(frame_number) self._reader.set(cv2.CAP_PROP_POS_FRAMES, frame_number) @property def frame_rate(self): return self._reader.get(cv2.CAP_PROP_FPS) @property def frame_width(self): return int(self._reader.get(cv2.CAP_PROP_FRAME_WIDTH)) @property def frame_height(self): return int(self._reader.get(cv2.CAP_PROP_FRAME_HEIGHT)) @property def frame_count(self): return int(self._reader.get(cv2.CAP_PROP_FRAME_COUNT)) @property def aspect_ratio(self): return self.frame_width/self.frame_height def is_16by9(self): return self.frame_width/self.frame_height == 16./9. def is_4by3(self): return self.frame_width/self.frame_height == 4./3. # class Nvidia_VideoReader(VideoReader): # def __init__(self, video_path, window_rect: PixelRect = None): # super().__init__(video_path, window_rect) # import PyNvCodec as nvc # gpuID = 0 # self._nvc_Decoder = nvc.PyNvDecoder(self._video_path, gpuID) # if self._nvc_Decoder == None: # log.fatal("Failed to create _nvc_Decoder") # self._nvc_ColorConversion = nvc.ColorspaceConversionContext(color_space=nvc.ColorSpace.BT_601, color_range=nvc.ColorRange.MPEG) # if self._nvc_ColorConversion == None: # log.fatal("Failed to create _nvc_ColorConversion") # width, height = self._nvc_Decoder.Width(), self._nvc_Decoder.Height() # log.info("Creating Nvidia_VideoReader for file " + self._video_path + " (" + str(width) + "x" + str(height) + ")") # self._nvc_SurfaceDownloader = nvc.PySurfaceDownloader(width, height, nvc.PixelFormat.RGB, gpuID) # if self._nvc_SurfaceDownloader == None: # log.fatal("Failed to create PySurfaceDownloader") # self._nvc_Nv12ToYuv = nvc.PySurfaceConverter(width, height, nvc.PixelFormat.NV12, nvc.PixelFormat.YUV420, gpuID) # if self._nvc_Nv12ToYuv == None: # log.fatal("Failed to create _nvc_Nv12ToYuv") # self._nvc_YuvToRgb = nvc.PySurfaceConverter(width, height, nvc.PixelFormat.YUV420, nvc.PixelFormat.RGB, gpuID) # if self._nvc_YuvToRgb == None: # log.fatal("Failed to create _nvc_YuvToRgb") # self._raw_frame = np.ndarray(shape=(self.frame_width * self.frame_height * 3), dtype=np.uint8) # def seek(self, frame_number): # # no need to seek if we are at the right position - greatly speeds up reading sunbsequent frames # if frame_number is not None and frame_number != self.current_frame: # while self._current_frame != frame_number: # super().read() # nv12_surface = self._nvc_Decoder.DecodeSingleSurface() # # Decoder will return zero surface if input file is over; # if nv12_surface.Empty(): # log.fatal("Failed to decode video frame") # return None # log.debug("Seeking to frame " + str(self._current_frame) + " for file " + self._video_path) # # Returns a frame in RGB format # def read(self): # super().read() # nv12_surface = self._nvc_Decoder.DecodeSingleSurface() # # Decoder will return zero surface if input file is over; # if nv12_surface.Empty(): # log.fatal("Failed to decode video frame") # return None # yuv_surface = self._nvc_Nv12ToYuv.Execute(nv12_surface, self._nvc_ColorConversion) # if (yuv_surface.Empty()): # log.fatal("Failed to do color conversion (nv12 -> yuv)") # rgb_surface = self._nvc_YuvToRgb.Execute(yuv_surface, self._nvc_ColorConversion) # if (rgb_surface.Empty()): # log.fatal("Failed to do color conversion (yuv -> rgb)") # success = self._nvc_SurfaceDownloader.DownloadSingleSurface(rgb_surface, self._raw_frame) # if not (success): # log.fatal("Failed to download surface") # frame = np.resize(self._raw_frame,(self.frame_height, self.frame_width, 3)) # if self._window_rect is not None: # frame = self.windowed_frame(frame) # return frame # def release(self): # super().release() # self._nvc_Decoder = None # self._nvc_ColorConversion = None # self._nvc_SurfaceDownloader = None # self._nvc_Nv12ToYuv = None # self._nvc_YuvToRgb = None # # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # # FileVideoStream_VideoReader - FileVideoStream Video Reader # # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # class FileVideoStream_VideoReader(FileVideoStream): # def __init__(self, video_path, window_rect: PixelRect = None): # super(FileVideoStream_VideoReader, self).__init__(video_path, transform=None, queue_size=128) # self._video_path = video_path # self._window_rect = window_rect # log.info("Creating FileVideoStream_VideoReader for file " + video_path) # if self._window_rect is not None: # log.debug(video_path + ": cropping into " + self._window_rect.String()) # def seek(self, frame_number): # self.stream.set(cv2.CAP_PROP_POS_FRAMES, frame_number) # def release(self): # self.stop() # def read(self, frame_number = None): # # no need to seek if we are at the right position - greatly speeds up reading sunbsequent frames # if frame_number is not None and frame_number != self.current_frame: # self.seek(frame_number) # frame = super().read() # if self._window_rect is not None: # frame = frame[self._window_rect.min_y:self._window_rect.max_y, self._window_rect.min_x:self._window_rect.max_x, :] # return frame # @property # def frame_rate(self): # return round(self.stream.get(cv2.CAP_PROP_FPS), 2) # @property # def frame_width(self): # return int(self.stream.get(cv2.CAP_PROP_FRAME_WIDTH)) # @property # def frame_height(self): # return int(self.stream.get(cv2.CAP_PROP_FRAME_HEIGHT)) # @property # def frame_count(self): # return int(self.stream.get(cv2.CAP_PROP_FRAME_COUNT)) # @property # def current_frame(self): # return self.stream.get(cv2.CAP_PROP_POS_FRAMES) # @property # def width(self): # width = self.frame_width # if self._window_rect is not None: # width = floor(self._window_rect.max_x - self._window_rect.min_x) # return width # @property # def height(self): # height = self.frame_height # if self._window_rect is not None: # height = floor(self._window_rect.max_y - self._window_rect.min_y) # return height