"""Pythonic wrapper around opencv's VideoWriter().""" import cv2 import numpy as np import importlib.util from threading import Thread from ..common.Common import * from ..common.Execute import * from ..common.Log import * DefaultPreset = "medium" DefaultkeyFrameInterval = 6 def CreateVideoWriter(video_path, frame_rate, width, height, monochrome_video = False, encoding_bframes = 0, encoding_lossless = False, cleanup = True, gpu_if_available = True): parent_dir = GetParentDir(video_path) if DirExists(parent_dir) == False: log.fatal("Directory " + parent_dir + " does not exist") if gpu_if_available and CudaGpuAvailable() : py_nv_codec_lib = importlib.util.find_spec("_PyNvCodec") if py_nv_codec_lib is not None: return Nvidia_VideoWriter( video_path=video_path, frame_rate=frame_rate, width=width, height=height, monochrome_video=monochrome_video, encoding_bframes=encoding_bframes, encoding_lossless=encoding_lossless, cleanup=cleanup ) else: log.warning("Could not find PyNvCodec library - defaulting to use OpenCV_VideoWriter") if IsOsLinux(): log.warning("Video file " + video_path + " will not be encoded in H.264 format") return OpenCV_VideoWriter(video_path, frame_rate, width, height, monochrome_video) else: return OpenCV_VideoWriter(video_path, frame_rate, width, height, monochrome_video) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # VideoWriter - Generic Video Writer # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ class VideoWriter: """ A flexible and more optimzed video writer against opencv video writer. """ def __init__(self, video_path, frame_rate, width, height, monochrome_video): super().__init__() self._video_path = video_path self._frame_rate = frame_rate self._width = width self._height = height self._monochrome_video = monochrome_video self._thread = None # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def write(self, frame): "Writes a frame onto video - assumes frame to be in BGR format" pass # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def release(self): if self._thread is not None: self._thread.join() def video_file_path(self): return self._video_path def write_matte_frames(self, frames, input_matte_crop_rect = None): if self._thread is not None: self._thread.join() self._thread = Thread(target=self.__write_matte_frames, args=(frames,input_matte_crop_rect,)) self._thread.start() def __write_matte_frames(self, frames, input_matte_crop_rect = None): frames_count = frames.shape[0] for i in range(frames_count): output_frame = None if frames[i,:,:].shape[0] != self._width and frames[i,:,:].shape[1] != self._height: if input_matte_crop_rect is not None: # If input matte was cropped according to crop rect, pad zeros around it output_frame = np.zeros((self._height,self._width,3), np.uint8) output_frame[input_matte_crop_rect.min_y:input_matte_crop_rect.max_y, input_matte_crop_rect.min_x:input_matte_crop_rect.max_x] = frames[i,:,:] else: # Resize input frame to be same size as what I am configured to write to output_frame = cv2.resize(frames[i,:,:], (self._width, self._height)) else: output_frame = frames[i,:,:] self.write(output_frame) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # OpenCV_VideoWriter - OpenCV Video Writer # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ class OpenCV_VideoWriter(VideoWriter): def __init__(self, video_path, frame_rate, width, height, monochrome_video, encoding_bframes=1, encoding_lossless=True, cleanup=True): super().__init__(video_path, frame_rate, width, height, monochrome_video) self.encoding_bframes = encoding_bframes self.encoding_lossless = encoding_lossless self._fourcc = None self._video_path = video_path self._cleanup = cleanup if GetFileExt(video_path).upper() == "AVI": self._fourcc = cv2.VideoWriter_fourcc(*'MJPG') self._video_path_tmp = video_path[:-4] + "-CV.AVI" self._writer = cv2.VideoWriter(self._video_path_tmp, self._fourcc, frame_rate, (width, height)) elif GetFileExt(video_path).upper() == "MP4" or GetFileExt(video_path).upper() == "TS": self._fourcc = cv2.VideoWriter_fourcc(*'mp4v') self._video_path_tmp = video_path[:-4] + "-CV.MP4" self._writer = cv2.VideoWriter(self._video_path_tmp, self._fourcc, frame_rate, (width, height)) # For now disabling code that uses avc1 encoder on Apple HW because this selection mechanism on linux produces an error in output # Error looks like this: # [ERROR:0] global /app/pip-req-build-qm9qliph/opencv/modules/videoio/src/cap_ffmpeg_impl.hpp (2650) open Could not find encoder for codec_id=27, error: Encoder not found # self._fourcc = cv2.VideoWriter_fourcc(*'avc1') # self._writer = cv2.VideoWriter(video_path, self._fourcc, frame_rate, (width, height), has_color) # if self._writer.isOpened() == False: # self._fourcc = cv2.VideoWriter_fourcc(*'mp4v') # self._writer = cv2.VideoWriter(video_path, self._fourcc, frame_rate, (width, height), has_color) else: log.info("Unsupported output format for file " + video_path) fileinfo = str(width) + "x" + str(height) + " @ " + str(frame_rate) + " fps" log.info("Creating OpenCV_VideoWriter for file " + self._video_path_tmp + " (fourcc: " + hex(self._fourcc) + ") - " + fileinfo) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def write(self, frame): "Writes a frame onto video - assumes frame to be in BGR format (which is opencv default)" self._writer.write(frame) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def using_lossless_hevc(self): if self.encoding_bframes > 0 or self.encoding_lossless == True: return True else: return False # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def get_lossy_bitrate(self): bitrateFactor = 4.2 if self._monochrome_video: bitrateFactor = bitrateFactor * 0.75 # Bitrate of monochrome video can be reduced by 25% return self._width * self._height * bitrateFactor # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def final_encode(self): command = "ffmpeg -y -i " + self._video_path_tmp if self.using_lossless_hevc(): # Reencode in H.264 # if self.encoding_lossless: # command += " -c:v copy" # else: # command += " -c:v h264_nvenc" command += " -c:v libx264" command += " -profile:v high -preset medium" command += " -bf " + str(self.encoding_bframes) command += " -g " + str(DefaultkeyFrameInterval * self._frame_rate) # command += " -b:v " + str(self.get_lossy_bitrate()) command += " -crf 17" if self._frame_rate > 0: command += " -r " + str(self._frame_rate) # command += " -tag:v hvc1" # command += " -c:v libx265 -pix_fmt yuv444p -x265-params lossless=1 -color_primaries bt709 -color_trc bt709 -colorspace smpte170m -tag:v hvc1" else: # Reuse - i.e. copy original stream command += " -c:v copy" command += " " + "-color_primaries bt709 -color_trc bt709 -colorspace smpte170m" command += " " + self._video_path print("command", command) ExecuteCommand(command) if self._cleanup: command = "rm -f " + self._video_path_tmp ExecuteCommand(command) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def release(self): VideoWriter.release(self) log.debug("Encoded " + self._video_path + " with opencv backend " + self._writer.getBackendName()) self._writer.release() self.final_encode() log.debug("Re-Encoded " + self._video_path + " with ffmpeg h264.") # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Nvidia_VideoWriter - Nvidia Optimized Video Writer # Assumes Nvidia CUDA GPU With HW Encoding # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ class Nvidia_VideoWriter(VideoWriter): def get_lossy_bitrate(self): bitrateFactor = 4.2 if self._monochrome_video: bitrateFactor = bitrateFactor * 0.75 # Bitrate of monochrome video can be reduced by 25% return self._width * self._height * bitrateFactor def using_lossless_hevc(self): if self.encoding_bframes > 0 or self.encoding_lossless == True: return True else: return False def __init__(self, video_path, frame_rate, width, height, monochrome_video, encoding_bframes = 0, encoding_lossless = False, cleanup = True ): super().__init__(video_path, frame_rate, width, height, monochrome_video) import _PyNvCodec as nvc gpuID = 0 self.encoding_bframes = encoding_bframes self.encoding_lossless = encoding_lossless self._elementary_stream_file = "" profile = "" codec = "" preset = "" config = {} if self.using_lossless_hevc(): codec = "hevc" profile = "main" preset = "lossless" self._elementary_stream_file = GetParentDir(video_path) + GetFileBaseName(video_path) + ".265" else: codec = "h264" profile = "high" preset = "bd" self._elementary_stream_file = GetParentDir(video_path) + GetFileBaseName(video_path) + ".264" config.update({ 'bitrate' : str(self.get_lossy_bitrate()), 'gop' : str(int(DefaultkeyFrameInterval * frame_rate)), }) config.update({ 'preset': preset, 'codec': codec, 'profile' : profile, 's': str(width) + 'x' + str(height), 'fps' : str(frame_rate), }) self._nvc_Encoder = nvc.PyNvEncoder(config, gpuID, nvc.PixelFormat.NV12) if self._nvc_Encoder == None: log.fatal("Failed to create PyNvEncoder") self._encoded_file = open(self._elementary_stream_file, "wb") self._encoded_frame = np.ndarray(shape=(0), dtype=np.uint8) self._nvc_ColorConversion = nvc.ColorspaceConversionContext(color_space=nvc.ColorSpace.BT_601, color_range=nvc.ColorRange.MPEG) if self._nvc_ColorConversion == None: log.fatal("Failed to create _nvc_ColorConversion") self._nvc_FrameUploader = nvc.PyFrameUploader(int(width), int(height), nvc.PixelFormat.RGB, gpuID) if self._nvc_FrameUploader == None: log.fatal("Failed to create _nvc_FrameUploader") self._nvc_RgbToYuv = nvc.PySurfaceConverter(width, height, nvc.PixelFormat.RGB, nvc.PixelFormat.YUV420, gpuID) if self._nvc_RgbToYuv == None: log.fatal("Failed to create _nvc_RgbToYuv") self._nvc_YuvToNv12 = nvc.PySurfaceConverter(width, height, nvc.PixelFormat.YUV420, nvc.PixelFormat.NV12, gpuID) if self._nvc_YuvToNv12 == None: log.fatal("Failed to create _nvc_YuvToNv12") self._cleanup = cleanup log.info("Creating Nvidia_VideoWriter for file " + video_path + " - frame rate: " + str(frame_rate)) log.info("Encoding parameters: " + str(config)) def write(self, frame): "Writes a frame onto video - assumes frame to be in BGR format" # There is a segmentation fault if I dont include this line # So for now I force a bgr to rgb conversion here # although it is not needed frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) raw_surface = self._nvc_FrameUploader.UploadSingleFrame(frame) if (raw_surface.Empty()): log.fatal("Failed to upload video frame to GPU") cvt_surface = self._nvc_RgbToYuv.Execute(raw_surface, self._nvc_ColorConversion) if (cvt_surface.Empty()): log.fatal("Failed to do color conversion") cvt_surface2 = self._nvc_YuvToNv12.Execute(cvt_surface, self._nvc_ColorConversion) if (cvt_surface2.Empty()): log.fatal("Failed to do color conversion") success = self._nvc_Encoder.EncodeSingleSurface(cvt_surface2, self._encoded_frame, sync = True) if success: self._encoded_file.write(bytearray(self._encoded_frame)) else: log.fatal("Could not encode frame") def final_encode(self): command = "ffmpeg -y -i " + self._elementary_stream_file if self.using_lossless_hevc(): # Reencode in H.264 if self.encoding_lossless: command += " -c:v copy" else: command += " -c:v h264_nvenc" # command += " -c:v libx264" command += " -profile:v high -preset medium" command += " -bf " + str(self.encoding_bframes) command += " -g " + str(DefaultkeyFrameInterval * self._frame_rate) command += " -b:v " + str(self.get_lossy_bitrate()) if self._frame_rate > 0: command += " -r " + str(self._frame_rate) else: # Reuse - i.e. copy original stream command += " -c:v copy" command += " " + "-color_primaries bt709 -color_trc bt709 -colorspace smpte170m" command += " " + self._video_path ExecuteCommand(command) if self._cleanup: command = "rm -f " + self._elementary_stream_file ExecuteCommand(command) def release(self): super().release() while True: success = self._nvc_Encoder.FlushSinglePacket(self._encoded_frame) if success: self._encoded_file.write(bytearray(self._encoded_frame)) else: break self._encoded_file.close() self._nvc_Encoder = None self._nvc_ColorConversion = None self._nvc_FrameUploader = None self._nvc_RgbToYuv = None self._nvc_YuvToNv12 = None self.final_encode()