video-animator / Practical-RIFE /inference_video_enhance.py
root
fixing python and stuff
136be26
raw
history blame
7.71 kB
import os
import cv2
import torch
import argparse
import numpy as np
from tqdm import tqdm
from torch.nn import functional as F
import warnings
import _thread
import skvideo.io
from queue import Queue, Empty
from model.pytorch_msssim import ssim_matlab
warnings.filterwarnings("ignore")
def transferAudio(sourceVideo, targetVideo):
import shutil
import moviepy.editor
tempAudioFileName = "./temp/audio.mkv"
# split audio from original video file and store in "temp" directory
if True:
# clear old "temp" directory if it exits
if os.path.isdir("temp"):
# remove temp directory
shutil.rmtree("temp")
# create new "temp" directory
os.makedirs("temp")
# extract audio from video
os.system('ffmpeg -y -i "{}" -c:a copy -vn {}'.format(sourceVideo, tempAudioFileName))
targetNoAudio = os.path.splitext(targetVideo)[0] + "_noaudio" + os.path.splitext(targetVideo)[1]
os.rename(targetVideo, targetNoAudio)
# combine audio file and new video file
os.system('ffmpeg -y -i "{}" -i {} -c copy "{}"'.format(targetNoAudio, tempAudioFileName, targetVideo))
if os.path.getsize(targetVideo) == 0: # if ffmpeg failed to merge the video and audio together try converting the audio to aac
tempAudioFileName = "./temp/audio.m4a"
os.system('ffmpeg -y -i "{}" -c:a aac -b:a 160k -vn {}'.format(sourceVideo, tempAudioFileName))
os.system('ffmpeg -y -i "{}" -i {} -c copy "{}"'.format(targetNoAudio, tempAudioFileName, targetVideo))
if (os.path.getsize(targetVideo) == 0): # if aac is not supported by selected format
os.rename(targetNoAudio, targetVideo)
print("Audio transfer failed. Interpolated video will have no audio")
else:
print("Lossless audio transfer failed. Audio was transcoded to AAC (M4A) instead.")
# remove audio-less video
os.remove(targetNoAudio)
else:
os.remove(targetNoAudio)
# remove temp directory
shutil.rmtree("temp")
parser = argparse.ArgumentParser(description='Video SR')
parser.add_argument('--video', dest='video', type=str, default=None)
parser.add_argument('--output', dest='output', type=str, default=None)
parser.add_argument('--img', dest='img', type=str, default=None)
parser.add_argument('--model', dest='modelDir', type=str, default='train_log_SAFA', help='directory with trained model files')
parser.add_argument('--fp16', dest='fp16', action='store_true', help='fp16 mode for faster and more lightweight inference on cards with Tensor Cores')
parser.add_argument('--png', dest='png', action='store_true', help='whether to vid_out png format vid_outs')
parser.add_argument('--ext', dest='ext', type=str, default='mp4', help='vid_out video extension')
args = parser.parse_args()
assert (not args.video is None or not args.img is None)
if not args.img is None:
args.png = True
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.set_grad_enabled(False)
if torch.cuda.is_available():
torch.backends.cudnn.enabled = True
torch.backends.cudnn.benchmark = True
if(args.fp16):
print('set fp16')
torch.set_default_tensor_type(torch.cuda.HalfTensor)
try:
from train_log_SAFA.model import Model
except:
print("Please download our model from model list")
model = Model()
model.device()
model.load_model(args.modelDir)
print("Loaded SAFA model.")
model.eval()
if not args.video is None:
videoCapture = cv2.VideoCapture(args.video)
fps = videoCapture.get(cv2.CAP_PROP_FPS)
tot_frame = videoCapture.get(cv2.CAP_PROP_FRAME_COUNT)
videoCapture.release()
fpsNotAssigned = True
videogen = skvideo.io.vreader(args.video)
lastframe = next(videogen)
fourcc = cv2.VideoWriter_fourcc('m', 'p', '4', 'v')
video_path_wo_ext, ext = os.path.splitext(args.video)
if args.png == False and fpsNotAssigned == True:
print("The audio will be merged after interpolation process")
else:
print("Will not merge audio because using png or fps flag!")
else:
videogen = []
for f in os.listdir(args.img):
if 'png' in f:
videogen.append(f)
tot_frame = len(videogen)
videogen.sort(key= lambda x:int(x[:-4]))
lastframe = cv2.imread(os.path.join(args.img, videogen[0]), cv2.IMREAD_UNCHANGED)[:, :, ::-1].copy()
videogen = videogen[1:]
h, w, _ = lastframe.shape
vid_out_name = None
vid_out = None
if args.png:
if not os.path.exists('vid_out'):
os.mkdir('vid_out')
else:
if args.output is not None:
vid_out_name = args.output
else:
vid_out_name = '{}_2X{}'.format(video_path_wo_ext, ext)
vid_out = cv2.VideoWriter(vid_out_name, fourcc, fps, (w, h))
def clear_write_buffer(user_args, write_buffer):
cnt = 0
while True:
item = write_buffer.get()
if item is None:
break
if user_args.png:
cv2.imwrite('vid_out/{:0>7d}.png'.format(cnt), item[:, :, ::-1])
cnt += 1
else:
vid_out.write(item[:, :, ::-1])
def build_read_buffer(user_args, read_buffer, videogen):
for frame in videogen:
if not user_args.img is None:
frame = cv2.imread(os.path.join(user_args.img, frame), cv2.IMREAD_UNCHANGED)[:, :, ::-1].copy()
# if user_args.montage:
# frame = frame[:, left: left + w]
read_buffer.put(frame)
read_buffer.put(None)
def pad_image(img):
if(args.fp16):
return F.pad(img, padding, mode='reflect').half()
else:
return F.pad(img, padding, mode='reflect')
tmp = 64
ph = ((h - 1) // tmp + 1) * tmp
pw = ((w - 1) // tmp + 1) * tmp
padding = (0, pw - w, 0, ph - h)
pbar = tqdm(total=tot_frame)
write_buffer = Queue(maxsize=500)
read_buffer = Queue(maxsize=500)
_thread.start_new_thread(build_read_buffer, (args, read_buffer, videogen))
_thread.start_new_thread(clear_write_buffer, (args, write_buffer))
while True:
frame = read_buffer.get()
if frame is None:
break
# lastframe_2x = cv2.resize(lastframe, (0, 0), fx=2, fy=2, interpolation=cv2.INTER_CUBIC)
# frame_2x = cv2.resize(frame, (0, 0), fx=2, fy=2, interpolation=cv2.INTER_CUBIC)
I0 = pad_image(torch.from_numpy(np.transpose(lastframe, (2,0,1))).to(device, non_blocking=True).unsqueeze(0).float() / 255.)
I1 = pad_image(torch.from_numpy(np.transpose(frame, (2,0,1))).to(device, non_blocking=True).unsqueeze(0).float() / 255.)
I0_small = F.interpolate(I0, (32, 32), mode='bilinear', align_corners=False)
I1_small = F.interpolate(I1, (32, 32), mode='bilinear', align_corners=False)
ssim = ssim_matlab(I0_small[:, :3], I1_small[:, :3])
if ssim < 0.2:
out = [model.inference(I0, I0, [0])[0], model.inference(I1, I1, [0])[0]]
else:
out = model.inference(I0, I1, [0, 1])
assert(len(out) == 2)
write_buffer.put((out[0][0] * 255).byte().cpu().numpy().transpose(1, 2, 0)[:h, :w])
write_buffer.put((out[1][0] * 255).byte().cpu().numpy().transpose(1, 2, 0)[:h, :w])
lastframe = read_buffer.get()
if lastframe is None:
break
pbar.update(2)
import time
while(not write_buffer.empty()):
time.sleep(0.1)
pbar.close()
if not vid_out is None:
vid_out.release()
# move audio to new video file if appropriate
if args.png == False and fpsNotAssigned == True and not args.video is None:
try:
transferAudio(args.video, vid_out_name)
except:
print("Audio transfer failed. Interpolated video will have no audio")
targetNoAudio = os.path.splitext(vid_out_name)[0] + "_noaudio" + os.path.splitext(vid_out_name)[1]
os.rename(targetNoAudio, vid_out_name)