vfx-2 / python_core /media /VideoReader.py
TaqiRaza512's picture
Initial commit
a103028
import string
import cv2
# from imutils.video import FileVideoStream
from math import floor
from ..common.Rects import *
from ..common.Common import *
from ..common.Log import *
from ..media.VideoInfo import *
# window_rect is NormalizedRect which can reads video and returns a crop window. Pass None to disable cropping
def CreateVideoReader(video_path: string, window_rect: PixelRect = None):
return OpenCV_VideoReader(video_path, window_rect)
class VideoReader:
"""
A flexible and more optimzed video reader against opencv video reader.
"""
def __init__(self, video_path, window_rect: PixelRect = None):
self._video_path = video_path
self._window_rect = window_rect
self._current_frame = 0
if self._window_rect is not None:
log.debug("Cropping into " + self._window_rect.String() + " for " + self._video_path)
def seek(self, frame_number):
self._current_frame = frame_number
log.debug("Seeking to frame " + str(self._current_frame) + " for file " + self._video_path)
# Returns a frame in RGB format
def read(self):
self._current_frame = self._current_frame + 1
if self._current_frame > self.frame_count:
log.error("Reading past to frame " + str(self._current_frame) + " for " + self._video_path + " (" + str(self.frame_count) + " frames)")
return None
def release(self):
log.debug("Releasing file " + self._video_path)
def windowed_frame(self, frame):
return frame[self._window_rect.min_y:self._window_rect.max_y, self._window_rect.min_x:self._window_rect.max_x, :]
@property
def current_frame(self):
return self._current_frame
@property
def width(self):
width = self.frame_width
if self._window_rect is not None:
width = floor(self._window_rect.max_x - self._window_rect.min_x)
return width
@property
def height(self):
height = self.frame_height
if self._window_rect is not None:
height = floor(self._window_rect.max_y - self._window_rect.min_y)
return height
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# OpenCV_VideoReader - OpenCV Video Reader
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class OpenCV_VideoReader(VideoReader):
def __init__(self, video_path, window_rect: PixelRect = None):
super().__init__(video_path, window_rect)
self._reader = cv2.VideoCapture(self._video_path)
if self._reader.isOpened() == False:
log.fatal("Could not construct file reader for " + self._video_path)
log.info("Creating OpenCV_VideoReader for file " + self._video_path)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Returns a frame in RGB format
def read(self):
super().read()
ret, frame = self._reader.read()
# if frame is read correctly ret is True
if not ret:
log.info("Can't receive frame (stream end?). Exiting ...")
return None
if self._window_rect is not None:
frame = self.windowed_frame(frame)
return frame
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def release(self):
super().release()
self._reader.release()
def seek(self, frame_number):
# no need to seek if we are at the right position - greatly speeds up reading sunbsequent frames
if frame_number is not None and frame_number != self.current_frame:
super().seek(frame_number)
self._reader.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
@property
def frame_rate(self):
return self._reader.get(cv2.CAP_PROP_FPS)
@property
def frame_width(self):
return int(self._reader.get(cv2.CAP_PROP_FRAME_WIDTH))
@property
def frame_height(self):
return int(self._reader.get(cv2.CAP_PROP_FRAME_HEIGHT))
@property
def frame_count(self):
return int(self._reader.get(cv2.CAP_PROP_FRAME_COUNT))
@property
def aspect_ratio(self):
return self.frame_width/self.frame_height
def is_16by9(self):
return self.frame_width/self.frame_height == 16./9.
def is_4by3(self):
return self.frame_width/self.frame_height == 4./3.
# class Nvidia_VideoReader(VideoReader):
# def __init__(self, video_path, window_rect: PixelRect = None):
# super().__init__(video_path, window_rect)
# import PyNvCodec as nvc
# gpuID = 0
# self._nvc_Decoder = nvc.PyNvDecoder(self._video_path, gpuID)
# if self._nvc_Decoder == None:
# log.fatal("Failed to create _nvc_Decoder")
# self._nvc_ColorConversion = nvc.ColorspaceConversionContext(color_space=nvc.ColorSpace.BT_601, color_range=nvc.ColorRange.MPEG)
# if self._nvc_ColorConversion == None:
# log.fatal("Failed to create _nvc_ColorConversion")
# width, height = self._nvc_Decoder.Width(), self._nvc_Decoder.Height()
# log.info("Creating Nvidia_VideoReader for file " + self._video_path + " (" + str(width) + "x" + str(height) + ")")
# self._nvc_SurfaceDownloader = nvc.PySurfaceDownloader(width, height, nvc.PixelFormat.RGB, gpuID)
# if self._nvc_SurfaceDownloader == None:
# log.fatal("Failed to create PySurfaceDownloader")
# self._nvc_Nv12ToYuv = nvc.PySurfaceConverter(width, height, nvc.PixelFormat.NV12, nvc.PixelFormat.YUV420, gpuID)
# if self._nvc_Nv12ToYuv == None:
# log.fatal("Failed to create _nvc_Nv12ToYuv")
# self._nvc_YuvToRgb = nvc.PySurfaceConverter(width, height, nvc.PixelFormat.YUV420, nvc.PixelFormat.RGB, gpuID)
# if self._nvc_YuvToRgb == None:
# log.fatal("Failed to create _nvc_YuvToRgb")
# self._raw_frame = np.ndarray(shape=(self.frame_width * self.frame_height * 3), dtype=np.uint8)
# def seek(self, frame_number):
# # no need to seek if we are at the right position - greatly speeds up reading sunbsequent frames
# if frame_number is not None and frame_number != self.current_frame:
# while self._current_frame != frame_number:
# super().read()
# nv12_surface = self._nvc_Decoder.DecodeSingleSurface()
# # Decoder will return zero surface if input file is over;
# if nv12_surface.Empty():
# log.fatal("Failed to decode video frame")
# return None
# log.debug("Seeking to frame " + str(self._current_frame) + " for file " + self._video_path)
# # Returns a frame in RGB format
# def read(self):
# super().read()
# nv12_surface = self._nvc_Decoder.DecodeSingleSurface()
# # Decoder will return zero surface if input file is over;
# if nv12_surface.Empty():
# log.fatal("Failed to decode video frame")
# return None
# yuv_surface = self._nvc_Nv12ToYuv.Execute(nv12_surface, self._nvc_ColorConversion)
# if (yuv_surface.Empty()):
# log.fatal("Failed to do color conversion (nv12 -> yuv)")
# rgb_surface = self._nvc_YuvToRgb.Execute(yuv_surface, self._nvc_ColorConversion)
# if (rgb_surface.Empty()):
# log.fatal("Failed to do color conversion (yuv -> rgb)")
# success = self._nvc_SurfaceDownloader.DownloadSingleSurface(rgb_surface, self._raw_frame)
# if not (success):
# log.fatal("Failed to download surface")
# frame = np.resize(self._raw_frame,(self.frame_height, self.frame_width, 3))
# if self._window_rect is not None:
# frame = self.windowed_frame(frame)
# return frame
# def release(self):
# super().release()
# self._nvc_Decoder = None
# self._nvc_ColorConversion = None
# self._nvc_SurfaceDownloader = None
# self._nvc_Nv12ToYuv = None
# self._nvc_YuvToRgb = None
# # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# # FileVideoStream_VideoReader - FileVideoStream Video Reader
# # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# class FileVideoStream_VideoReader(FileVideoStream):
# def __init__(self, video_path, window_rect: PixelRect = None):
# super(FileVideoStream_VideoReader, self).__init__(video_path, transform=None, queue_size=128)
# self._video_path = video_path
# self._window_rect = window_rect
# log.info("Creating FileVideoStream_VideoReader for file " + video_path)
# if self._window_rect is not None:
# log.debug(video_path + ": cropping into " + self._window_rect.String())
# def seek(self, frame_number):
# self.stream.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
# def release(self):
# self.stop()
# def read(self, frame_number = None):
# # no need to seek if we are at the right position - greatly speeds up reading sunbsequent frames
# if frame_number is not None and frame_number != self.current_frame:
# self.seek(frame_number)
# frame = super().read()
# if self._window_rect is not None:
# frame = frame[self._window_rect.min_y:self._window_rect.max_y, self._window_rect.min_x:self._window_rect.max_x, :]
# return frame
# @property
# def frame_rate(self):
# return round(self.stream.get(cv2.CAP_PROP_FPS), 2)
# @property
# def frame_width(self):
# return int(self.stream.get(cv2.CAP_PROP_FRAME_WIDTH))
# @property
# def frame_height(self):
# return int(self.stream.get(cv2.CAP_PROP_FRAME_HEIGHT))
# @property
# def frame_count(self):
# return int(self.stream.get(cv2.CAP_PROP_FRAME_COUNT))
# @property
# def current_frame(self):
# return self.stream.get(cv2.CAP_PROP_POS_FRAMES)
# @property
# def width(self):
# width = self.frame_width
# if self._window_rect is not None:
# width = floor(self._window_rect.max_x - self._window_rect.min_x)
# return width
# @property
# def height(self):
# height = self.frame_height
# if self._window_rect is not None:
# height = floor(self._window_rect.max_y - self._window_rect.min_y)
# return height