| import string | |
| import cv2 | |
| # from imutils.video import FileVideoStream | |
| from math import floor | |
| from ..common.Rects import * | |
| from ..common.Common import * | |
| from ..common.Log import * | |
| from ..media.VideoInfo import * | |
| # window_rect is NormalizedRect which can reads video and returns a crop window. Pass None to disable cropping | |
| def CreateVideoReader(video_path: string, window_rect: PixelRect = None): | |
| return OpenCV_VideoReader(video_path, window_rect) | |
| class VideoReader: | |
| """ | |
| A flexible and more optimzed video reader against opencv video reader. | |
| """ | |
| def __init__(self, video_path, window_rect: PixelRect = None): | |
| self._video_path = video_path | |
| self._window_rect = window_rect | |
| self._current_frame = 0 | |
| if self._window_rect is not None: | |
| log.debug("Cropping into " + self._window_rect.String() + " for " + self._video_path) | |
| def seek(self, frame_number): | |
| self._current_frame = frame_number | |
| log.debug("Seeking to frame " + str(self._current_frame) + " for file " + self._video_path) | |
| # Returns a frame in RGB format | |
| def read(self): | |
| self._current_frame = self._current_frame + 1 | |
| if self._current_frame > self.frame_count: | |
| log.error("Reading past to frame " + str(self._current_frame) + " for " + self._video_path + " (" + str(self.frame_count) + " frames)") | |
| return None | |
| def release(self): | |
| log.debug("Releasing file " + self._video_path) | |
| def windowed_frame(self, frame): | |
| return frame[self._window_rect.min_y:self._window_rect.max_y, self._window_rect.min_x:self._window_rect.max_x, :] | |
| def current_frame(self): | |
| return self._current_frame | |
| def width(self): | |
| width = self.frame_width | |
| if self._window_rect is not None: | |
| width = floor(self._window_rect.max_x - self._window_rect.min_x) | |
| return width | |
| def height(self): | |
| height = self.frame_height | |
| if self._window_rect is not None: | |
| height = floor(self._window_rect.max_y - self._window_rect.min_y) | |
| return height | |
| # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
| # OpenCV_VideoReader - OpenCV Video Reader | |
| # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
| class OpenCV_VideoReader(VideoReader): | |
| def __init__(self, video_path, window_rect: PixelRect = None): | |
| super().__init__(video_path, window_rect) | |
| self._reader = cv2.VideoCapture(self._video_path) | |
| if self._reader.isOpened() == False: | |
| log.fatal("Could not construct file reader for " + self._video_path) | |
| log.info("Creating OpenCV_VideoReader for file " + self._video_path) | |
| # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
| # Returns a frame in RGB format | |
| def read(self): | |
| super().read() | |
| ret, frame = self._reader.read() | |
| # if frame is read correctly ret is True | |
| if not ret: | |
| log.info("Can't receive frame (stream end?). Exiting ...") | |
| return None | |
| if self._window_rect is not None: | |
| frame = self.windowed_frame(frame) | |
| return frame | |
| # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
| def release(self): | |
| super().release() | |
| self._reader.release() | |
| def seek(self, frame_number): | |
| # no need to seek if we are at the right position - greatly speeds up reading sunbsequent frames | |
| if frame_number is not None and frame_number != self.current_frame: | |
| super().seek(frame_number) | |
| self._reader.set(cv2.CAP_PROP_POS_FRAMES, frame_number) | |
| def frame_rate(self): | |
| return self._reader.get(cv2.CAP_PROP_FPS) | |
| def frame_width(self): | |
| return int(self._reader.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| def frame_height(self): | |
| return int(self._reader.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| def frame_count(self): | |
| return int(self._reader.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| def aspect_ratio(self): | |
| return self.frame_width/self.frame_height | |
| def is_16by9(self): | |
| return self.frame_width/self.frame_height == 16./9. | |
| def is_4by3(self): | |
| return self.frame_width/self.frame_height == 4./3. | |
| # class Nvidia_VideoReader(VideoReader): | |
| # def __init__(self, video_path, window_rect: PixelRect = None): | |
| # super().__init__(video_path, window_rect) | |
| # import PyNvCodec as nvc | |
| # gpuID = 0 | |
| # self._nvc_Decoder = nvc.PyNvDecoder(self._video_path, gpuID) | |
| # if self._nvc_Decoder == None: | |
| # log.fatal("Failed to create _nvc_Decoder") | |
| # self._nvc_ColorConversion = nvc.ColorspaceConversionContext(color_space=nvc.ColorSpace.BT_601, color_range=nvc.ColorRange.MPEG) | |
| # if self._nvc_ColorConversion == None: | |
| # log.fatal("Failed to create _nvc_ColorConversion") | |
| # width, height = self._nvc_Decoder.Width(), self._nvc_Decoder.Height() | |
| # log.info("Creating Nvidia_VideoReader for file " + self._video_path + " (" + str(width) + "x" + str(height) + ")") | |
| # self._nvc_SurfaceDownloader = nvc.PySurfaceDownloader(width, height, nvc.PixelFormat.RGB, gpuID) | |
| # if self._nvc_SurfaceDownloader == None: | |
| # log.fatal("Failed to create PySurfaceDownloader") | |
| # self._nvc_Nv12ToYuv = nvc.PySurfaceConverter(width, height, nvc.PixelFormat.NV12, nvc.PixelFormat.YUV420, gpuID) | |
| # if self._nvc_Nv12ToYuv == None: | |
| # log.fatal("Failed to create _nvc_Nv12ToYuv") | |
| # self._nvc_YuvToRgb = nvc.PySurfaceConverter(width, height, nvc.PixelFormat.YUV420, nvc.PixelFormat.RGB, gpuID) | |
| # if self._nvc_YuvToRgb == None: | |
| # log.fatal("Failed to create _nvc_YuvToRgb") | |
| # self._raw_frame = np.ndarray(shape=(self.frame_width * self.frame_height * 3), dtype=np.uint8) | |
| # def seek(self, frame_number): | |
| # # no need to seek if we are at the right position - greatly speeds up reading sunbsequent frames | |
| # if frame_number is not None and frame_number != self.current_frame: | |
| # while self._current_frame != frame_number: | |
| # super().read() | |
| # nv12_surface = self._nvc_Decoder.DecodeSingleSurface() | |
| # # Decoder will return zero surface if input file is over; | |
| # if nv12_surface.Empty(): | |
| # log.fatal("Failed to decode video frame") | |
| # return None | |
| # log.debug("Seeking to frame " + str(self._current_frame) + " for file " + self._video_path) | |
| # # Returns a frame in RGB format | |
| # def read(self): | |
| # super().read() | |
| # nv12_surface = self._nvc_Decoder.DecodeSingleSurface() | |
| # # Decoder will return zero surface if input file is over; | |
| # if nv12_surface.Empty(): | |
| # log.fatal("Failed to decode video frame") | |
| # return None | |
| # yuv_surface = self._nvc_Nv12ToYuv.Execute(nv12_surface, self._nvc_ColorConversion) | |
| # if (yuv_surface.Empty()): | |
| # log.fatal("Failed to do color conversion (nv12 -> yuv)") | |
| # rgb_surface = self._nvc_YuvToRgb.Execute(yuv_surface, self._nvc_ColorConversion) | |
| # if (rgb_surface.Empty()): | |
| # log.fatal("Failed to do color conversion (yuv -> rgb)") | |
| # success = self._nvc_SurfaceDownloader.DownloadSingleSurface(rgb_surface, self._raw_frame) | |
| # if not (success): | |
| # log.fatal("Failed to download surface") | |
| # frame = np.resize(self._raw_frame,(self.frame_height, self.frame_width, 3)) | |
| # if self._window_rect is not None: | |
| # frame = self.windowed_frame(frame) | |
| # return frame | |
| # def release(self): | |
| # super().release() | |
| # self._nvc_Decoder = None | |
| # self._nvc_ColorConversion = None | |
| # self._nvc_SurfaceDownloader = None | |
| # self._nvc_Nv12ToYuv = None | |
| # self._nvc_YuvToRgb = None | |
| # # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
| # # FileVideoStream_VideoReader - FileVideoStream Video Reader | |
| # # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
| # class FileVideoStream_VideoReader(FileVideoStream): | |
| # def __init__(self, video_path, window_rect: PixelRect = None): | |
| # super(FileVideoStream_VideoReader, self).__init__(video_path, transform=None, queue_size=128) | |
| # self._video_path = video_path | |
| # self._window_rect = window_rect | |
| # log.info("Creating FileVideoStream_VideoReader for file " + video_path) | |
| # if self._window_rect is not None: | |
| # log.debug(video_path + ": cropping into " + self._window_rect.String()) | |
| # def seek(self, frame_number): | |
| # self.stream.set(cv2.CAP_PROP_POS_FRAMES, frame_number) | |
| # def release(self): | |
| # self.stop() | |
| # def read(self, frame_number = None): | |
| # # no need to seek if we are at the right position - greatly speeds up reading sunbsequent frames | |
| # if frame_number is not None and frame_number != self.current_frame: | |
| # self.seek(frame_number) | |
| # frame = super().read() | |
| # if self._window_rect is not None: | |
| # frame = frame[self._window_rect.min_y:self._window_rect.max_y, self._window_rect.min_x:self._window_rect.max_x, :] | |
| # return frame | |
| # @property | |
| # def frame_rate(self): | |
| # return round(self.stream.get(cv2.CAP_PROP_FPS), 2) | |
| # @property | |
| # def frame_width(self): | |
| # return int(self.stream.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| # @property | |
| # def frame_height(self): | |
| # return int(self.stream.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| # @property | |
| # def frame_count(self): | |
| # return int(self.stream.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # @property | |
| # def current_frame(self): | |
| # return self.stream.get(cv2.CAP_PROP_POS_FRAMES) | |
| # @property | |
| # def width(self): | |
| # width = self.frame_width | |
| # if self._window_rect is not None: | |
| # width = floor(self._window_rect.max_x - self._window_rect.min_x) | |
| # return width | |
| # @property | |
| # def height(self): | |
| # height = self.frame_height | |
| # if self._window_rect is not None: | |
| # height = floor(self._window_rect.max_y - self._window_rect.min_y) | |
| # return height |