|
|
|
|
|
import pdb
|
|
|
|
|
|
import cv2
|
|
|
import numpy as np
|
|
|
import ffmpeg
|
|
|
import os
|
|
|
import os.path as osp
|
|
|
|
|
|
|
|
|
def video_has_audio(video_file):
|
|
|
try:
|
|
|
ret = ffmpeg.probe(video_file, select_streams='a')
|
|
|
return len(ret["streams"]) > 0
|
|
|
except ffmpeg.Error:
|
|
|
return False
|
|
|
|
|
|
|
|
|
def get_video_info(video_path):
|
|
|
|
|
|
probe = ffmpeg.probe(video_path)
|
|
|
video_streams = [stream for stream in probe['streams'] if stream['codec_type'] == 'video']
|
|
|
|
|
|
if not video_streams:
|
|
|
raise ValueError("No video stream found")
|
|
|
|
|
|
|
|
|
duration = float(probe['format']['duration'])
|
|
|
|
|
|
|
|
|
fps_string = video_streams[0]['r_frame_rate']
|
|
|
numerator, denominator = map(int, fps_string.split('/'))
|
|
|
fps = numerator / denominator
|
|
|
|
|
|
return duration, fps
|
|
|
|
|
|
|
|
|
def resize_to_limit(img: np.ndarray, max_dim=1280, division=2):
|
|
|
"""
|
|
|
ajust the size of the image so that the maximum dimension does not exceed max_dim, and the width and the height of the image are multiples of n.
|
|
|
:param img: the image to be processed.
|
|
|
:param max_dim: the maximum dimension constraint.
|
|
|
:param n: the number that needs to be multiples of.
|
|
|
:return: the adjusted image.
|
|
|
"""
|
|
|
h, w = img.shape[:2]
|
|
|
|
|
|
|
|
|
if max_dim > 0 and max(h, w) > max_dim:
|
|
|
if h > w:
|
|
|
new_h = max_dim
|
|
|
new_w = int(w * (max_dim / h))
|
|
|
else:
|
|
|
new_w = max_dim
|
|
|
new_h = int(h * (max_dim / w))
|
|
|
img = cv2.resize(img, (new_w, new_h))
|
|
|
|
|
|
|
|
|
division = max(division, 1)
|
|
|
new_h = img.shape[0] - (img.shape[0] % division)
|
|
|
new_w = img.shape[1] - (img.shape[1] % division)
|
|
|
|
|
|
if new_h == 0 or new_w == 0:
|
|
|
|
|
|
return img
|
|
|
|
|
|
if new_h != img.shape[0] or new_w != img.shape[1]:
|
|
|
img = img[:new_h, :new_w]
|
|
|
|
|
|
return img
|
|
|
|
|
|
|
|
|
def get_rotation_matrix(pitch_, yaw_, roll_):
|
|
|
""" the input is in degree
|
|
|
"""
|
|
|
PI = np.pi
|
|
|
|
|
|
pitch = pitch_ / 180 * PI
|
|
|
yaw = yaw_ / 180 * PI
|
|
|
roll = roll_ / 180 * PI
|
|
|
|
|
|
if pitch.ndim == 1:
|
|
|
pitch = np.expand_dims(pitch.cpu(), axis=1)
|
|
|
if yaw.ndim == 1:
|
|
|
yaw = np.expand_dims(yaw.cpu(), axis=1)
|
|
|
if roll.ndim == 1:
|
|
|
roll = np.expand_dims(roll.cpu(), axis=1)
|
|
|
|
|
|
|
|
|
bs = pitch.shape[0]
|
|
|
ones = np.ones([bs, 1])
|
|
|
zeros = np.zeros([bs, 1])
|
|
|
x, y, z = pitch, yaw, roll
|
|
|
|
|
|
rot_x = np.concatenate([
|
|
|
ones, zeros, zeros,
|
|
|
zeros, np.cos(x), -np.sin(x),
|
|
|
zeros, np.sin(x), np.cos(x)
|
|
|
], axis=1).reshape([bs, 3, 3])
|
|
|
|
|
|
rot_y = np.concatenate([
|
|
|
np.cos(y), zeros, np.sin(y),
|
|
|
zeros, ones, zeros,
|
|
|
-np.sin(y), zeros, np.cos(y)
|
|
|
], axis=1).reshape([bs, 3, 3])
|
|
|
|
|
|
rot_z = np.concatenate([
|
|
|
np.cos(z), -np.sin(z), zeros,
|
|
|
np.sin(z), np.cos(z), zeros,
|
|
|
zeros, zeros, ones
|
|
|
], axis=1).reshape([bs, 3, 3])
|
|
|
|
|
|
rot = np.matmul(rot_z, np.matmul(rot_y, rot_x))
|
|
|
return np.transpose(rot, (0, 2, 1))
|
|
|
|
|
|
|
|
|
def calculate_distance_ratio(lmk: np.ndarray, idx1: int, idx2: int, idx3: int, idx4: int,
|
|
|
eps: float = 1e-6) -> np.ndarray:
|
|
|
return (np.linalg.norm(lmk[:, idx1] - lmk[:, idx2], axis=1, keepdims=True) /
|
|
|
(np.linalg.norm(lmk[:, idx3] - lmk[:, idx4], axis=1, keepdims=True) + eps))
|
|
|
|
|
|
|
|
|
def calc_eye_close_ratio(lmk: np.ndarray, target_eye_ratio: np.ndarray = None) -> np.ndarray:
|
|
|
lefteye_close_ratio = calculate_distance_ratio(lmk, 6, 18, 0, 12)
|
|
|
righteye_close_ratio = calculate_distance_ratio(lmk, 30, 42, 24, 36)
|
|
|
if target_eye_ratio is not None:
|
|
|
return np.concatenate([lefteye_close_ratio, righteye_close_ratio, target_eye_ratio], axis=1)
|
|
|
else:
|
|
|
return np.concatenate([lefteye_close_ratio, righteye_close_ratio], axis=1)
|
|
|
|
|
|
|
|
|
def calc_lip_close_ratio(lmk: np.ndarray) -> np.ndarray:
|
|
|
return calculate_distance_ratio(lmk, 90, 102, 48, 66)
|
|
|
|
|
|
|
|
|
def _transform_img(img, M, dsize, flags=cv2.INTER_LINEAR, borderMode=None):
|
|
|
""" conduct similarity or affine transformation to the image, do not do border operation!
|
|
|
img:
|
|
|
M: 2x3 matrix or 3x3 matrix
|
|
|
dsize: target shape (width, height)
|
|
|
"""
|
|
|
if isinstance(dsize, tuple) or isinstance(dsize, list):
|
|
|
_dsize = tuple(dsize)
|
|
|
else:
|
|
|
_dsize = (dsize, dsize)
|
|
|
|
|
|
if borderMode is not None:
|
|
|
return cv2.warpAffine(img, M[:2, :], dsize=_dsize, flags=flags, borderMode=borderMode, borderValue=(0, 0, 0))
|
|
|
else:
|
|
|
return cv2.warpAffine(img, M[:2, :], dsize=_dsize, flags=flags)
|
|
|
|
|
|
|
|
|
def prepare_paste_back(mask_crop, crop_M_c2o, dsize):
|
|
|
"""prepare mask for later image paste back
|
|
|
"""
|
|
|
mask_ori = _transform_img(mask_crop, crop_M_c2o, dsize)
|
|
|
mask_ori = mask_ori.astype(np.float32) / 255.
|
|
|
return mask_ori
|
|
|
|
|
|
|
|
|
def transform_keypoint(pitch, yaw, roll, t, exp, scale, kp):
|
|
|
"""
|
|
|
transform the implicit keypoints with the pose, shift, and expression deformation
|
|
|
kp: BxNx3
|
|
|
"""
|
|
|
bs = kp.shape[0]
|
|
|
if kp.ndim == 2:
|
|
|
num_kp = kp.shape[1] // 3
|
|
|
else:
|
|
|
num_kp = kp.shape[1]
|
|
|
|
|
|
rot_mat = get_rotation_matrix(pitch, yaw, roll)
|
|
|
|
|
|
|
|
|
kp_transformed = kp.reshape(bs, num_kp, 3) @ rot_mat + exp.reshape(bs, num_kp, 3)
|
|
|
kp_transformed *= scale[..., None]
|
|
|
kp_transformed[:, :, 0:2] += t[:, None, 0:2]
|
|
|
|
|
|
return kp_transformed
|
|
|
|
|
|
|
|
|
def concat_feat(x, y):
|
|
|
bs = x.shape[0]
|
|
|
return np.concatenate([x.reshape(bs, -1), y.reshape(bs, -1)], axis=1)
|
|
|
|
|
|
|
|
|
def is_image(file_path):
|
|
|
image_extensions = ('.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff')
|
|
|
return file_path.lower().endswith(image_extensions)
|
|
|
|
|
|
|
|
|
def is_video(file_path):
|
|
|
if file_path.lower().endswith((".mp4", ".mov", ".avi", ".webm")) or os.path.isdir(file_path):
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
|
|
|
def make_abs_path(fn):
|
|
|
return osp.join(os.path.dirname(osp.dirname(osp.realpath(__file__))), fn)
|
|
|
|
|
|
|
|
|
class LowPassFilter:
|
|
|
def __init__(self):
|
|
|
self.prev_raw_value = None
|
|
|
self.prev_filtered_value = None
|
|
|
|
|
|
def process(self, value, alpha):
|
|
|
if self.prev_raw_value is None:
|
|
|
s = value
|
|
|
else:
|
|
|
s = alpha * value + (1.0 - alpha) * self.prev_filtered_value
|
|
|
self.prev_raw_value = value
|
|
|
self.prev_filtered_value = s
|
|
|
return s
|
|
|
|
|
|
|
|
|
class OneEuroFilter:
|
|
|
def __init__(self, mincutoff=1.0, beta=0.0, dcutoff=1.0, freq=30):
|
|
|
self.freq = freq
|
|
|
self.mincutoff = mincutoff
|
|
|
self.beta = beta
|
|
|
self.dcutoff = dcutoff
|
|
|
self.x_filter = LowPassFilter()
|
|
|
self.dx_filter = LowPassFilter()
|
|
|
|
|
|
def compute_alpha(self, cutoff):
|
|
|
te = 1.0 / self.freq
|
|
|
tau = 1.0 / (2 * np.pi * cutoff)
|
|
|
return 1.0 / (1.0 + tau / te)
|
|
|
|
|
|
def get_pre_x(self):
|
|
|
return self.x_filter.prev_filtered_value
|
|
|
|
|
|
def process(self, x):
|
|
|
prev_x = self.x_filter.prev_raw_value
|
|
|
dx = 0.0 if prev_x is None else (x - prev_x) * self.freq
|
|
|
edx = self.dx_filter.process(dx, self.compute_alpha(self.dcutoff))
|
|
|
cutoff = self.mincutoff + self.beta * np.abs(edx)
|
|
|
return self.x_filter.process(x, self.compute_alpha(cutoff))
|
|
|
|