| import numpy as np |
| import cv2 |
| |
| import os |
| import glob |
| from tqdm import tqdm |
| from pathlib import Path |
| import torch |
| import torch.nn.functional as F |
| |
| def warp_flow(img, flow): |
| ''' |
| Applies to img the transformation described by flow. |
| ''' |
| |
| hf, wf = flow.shape[:2] |
| |
| flow[:, :, 0] += np.arange(wf) |
| flow[:, :, 1] += np.arange(hf)[:, np.newaxis] |
| res = cv2.remap(img, flow, None, cv2.INTER_LINEAR) |
| return res |
|
|
| def estimate_invflow(img0, img1, me_algo): |
| ''' |
| Estimates inverse optical flow by using the me_algo algorithm. |
| ''' |
|
|
| |
| if me_algo == "DeepFlow": |
| of_estim = cv2.optflow.createOptFlow_DeepFlow() |
| else: |
| raise Exception("Incorrect motion estimation algorithm") |
|
|
| |
| flow = of_estim.calc(img1, img0, None) |
| |
|
|
| return flow |
|
|
| def align_frames(img_to_align, img_source, mc_alg='DeepFlow'): |
| ''' |
| Applies to img_to_align a transformation which converts it into img_source. |
| Args: |
| img_to_align: HxWxC image |
| img_source: HxWxC image |
| mc_alg: selects between DeepFlow, SimpleFlow, and TVL1. DeepFlow runs by default. |
| Returns: |
| HxWxC aligned image |
| ''' |
| if img_to_align.ndim == 2: |
| img0 = img_to_align |
| img1 = img_source |
| else: |
| img0 = img_to_align[:, :, 1] |
| img1 = img_source[:, :, 1] |
| out_img = None |
|
|
| |
| flow = estimate_invflow(img0, img1, mc_alg) |
| |
|
|
| |
| out_img = warp_flow(img_to_align, flow.astype(np.float32)) |
|
|
| return out_img, flow |
|
|
|
|
|
|
| def SIFT(img1gray, img2gray): |
| |
| sift = cv2.xfeatures2d.SIFT_create() |
| |
| |
| kp1, des1 = sift.detectAndCompute(img1gray, None) |
| kp2, des2 = sift.detectAndCompute(img2gray, None) |
| |
| FLANN_INDEX_KDTREE = 1 |
| index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5) |
| search_params = dict(checks=10) |
| flann = cv2.FlannBasedMatcher(index_params, search_params) |
| matches = flann.knnMatch(des1, des2, k=2) |
| |
| |
| |
| matchesMask = [[0, 0] for i in range(len(matches))] |
|
|
| good = [] |
| |
| for i, (m, n) in enumerate(matches): |
| if m.distance < 0.65*n.distance: |
| good.append(m) |
| matchesMask[i] = [1, 0] |
|
|
|
|
| MIN_MATCH_COUNT = 9 |
|
|
| print(len(good)) |
|
|
| if len(good) > MIN_MATCH_COUNT: |
| src_pts = np.float32([kp1[m.queryIdx].pt for m in good]).reshape(-1, 1, 2) |
| dst_pts = np.float32([kp2[m.trainIdx].pt for m in good]).reshape(-1, 1, 2) |
| M, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 3) |
|
|
| else: |
| print('error!!!!!!!!!!!!!!!!!!!!!!!!!!!') |
| return |
|
|
| |
| return M |
|
|
|
|
|
|
|
|
| def match_colors(im_ref, im_q, im_test): |
|
|
| im_ref_mean_re = im_ref.view(*im_ref.shape[:2], -1) |
| im_q_mean_re = im_q.view(*im_q.shape[:2], -1) |
|
|
| |
| c_mat_all = [] |
| for ir, iq in zip(im_ref_mean_re, im_q_mean_re): |
| c = torch.linalg.lstsq(iq.t(), ir.t()) |
| c = c.solution[:im_ref_mean_re.size(1)] |
| c_mat_all.append(c) |
|
|
| c_mat = torch.stack(c_mat_all, dim=0) |
| |
| im_test_re = im_test.view(*im_test.shape[:2], -1) |
| im_t_conv = torch.matmul(im_test_re.permute(0, 2, 1), c_mat).permute(0, 2, 1) |
| im_t_conv = im_t_conv.view(im_test.shape) |
|
|
| return im_t_conv |
|
|
| def color_correction(gt, in_put, output, scale_factor=2): |
| |
| output_cor = match_channel_colors(gt, in_put, output) |
| return output_cor |
|
|
| def match_channel_colors(im_ref, im_q, im_test): |
|
|
| im_ref_reshape = im_ref.view(*im_ref.shape[:2], -1) |
| im_q_reshape = im_q.view(*im_q.shape[:2], -1) |
| im_test_reshape = im_test.view(*im_test.shape[:2], -1) |
| |
|
|
| im_t_conv_list = [] |
| for i in range(im_ref.size(1)): |
| c_mat_all = [] |
| for ir_batch, iq_batch in zip(im_ref_reshape[:, i:i+1, :], im_q_reshape[:, i:i+1, :]): |
| c = torch.linalg.lstsq(iq_batch.t(), ir_batch.t()) |
| c = c.solution[:1] |
| c_mat_all.append(c) |
|
|
| c_mat = torch.stack(c_mat_all, dim=0) |
| |
| im_t_conv = torch.matmul(im_test_reshape[:, i:i+1, :].permute(0, 2, 1), c_mat).permute(0, 2, 1) |
| im_t_conv = im_t_conv.view(*im_t_conv.shape[:2], *im_test.shape[-2:]) |
| im_t_conv_list.append(im_t_conv) |
|
|
| im_t_conv = torch.cat(im_t_conv_list, dim=1) |
|
|
| return im_t_conv |
|
|
|
|
|
|
|
|
| def img2tensor(imgs, bgr2rgb=True, float32=True): |
| """Numpy array to tensor. |
| |
| Args: |
| imgs (list[ndarray] | ndarray): Input images. |
| bgr2rgb (bool): Whether to change bgr to rgb. |
| float32 (bool): Whether to change to float32. |
| |
| Returns: |
| list[tensor] | tensor: Tensor images. If returned results only have |
| one element, just return tensor. |
| """ |
|
|
| def _totensor(img, bgr2rgb, float32): |
| if img.shape[2] == 3 and bgr2rgb: |
| if img.dtype == 'float64': |
| img = img.astype('float32') |
| img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
| img = torch.from_numpy(img.transpose(2, 0, 1)) |
| if float32: |
| img = img.float() |
| return img |
|
|
| if isinstance(imgs, list): |
| return [_totensor(img, bgr2rgb, float32) for img in imgs] |
| else: |
| return _totensor(imgs, bgr2rgb, float32) |
|
|
|
|
| def tensor2img(tensor, rgb2bgr=True, out_type=np.uint8, min_max=(0, 1)): |
| """Convert torch Tensors into image numpy arrays. |
| |
| After clamping to [min, max], values will be normalized to [0, 1]. |
| |
| Args: |
| tensor (Tensor or list[Tensor]): Accept shapes: |
| 1) 4D mini-batch Tensor of shape (B x 3/1 x H x W); |
| 2) 3D Tensor of shape (3/1 x H x W); |
| 3) 2D Tensor of shape (H x W). |
| Tensor channel should be in RGB order. |
| rgb2bgr (bool): Whether to change rgb to bgr. |
| out_type (numpy type): output types. If ``np.uint8``, transform outputs |
| to uint8 type with range [0, 255]; otherwise, float type with |
| range [0, 1]. Default: ``np.uint8``. |
| min_max (tuple[int]): min and max values for clamp. |
| |
| Returns: |
| (Tensor or list): 3D ndarray of shape (H x W x C) OR 2D ndarray of |
| shape (H x W). The channel order is BGR. |
| """ |
| if not (torch.is_tensor(tensor) or (isinstance(tensor, list) and all(torch.is_tensor(t) for t in tensor))): |
| raise TypeError(f'tensor or list of tensors expected, got {type(tensor)}') |
|
|
| if torch.is_tensor(tensor): |
| tensor = [tensor] |
| result = [] |
| for _tensor in tensor: |
| _tensor = _tensor.squeeze(0).float().detach().cpu().clamp_(*min_max) |
| _tensor = (_tensor - min_max[0]) / (min_max[1] - min_max[0]) |
|
|
| n_dim = _tensor.dim() |
| if n_dim == 4: |
| img_np = make_grid(_tensor, nrow=int(math.sqrt(_tensor.size(0))), normalize=False).numpy() |
| img_np = img_np.transpose(1, 2, 0) |
| if rgb2bgr: |
| img_np = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR) |
| elif n_dim == 3: |
| img_np = _tensor.numpy() |
| img_np = img_np.transpose(1, 2, 0) |
| if img_np.shape[2] == 1: |
| img_np = np.squeeze(img_np, axis=2) |
| else: |
| if rgb2bgr: |
| img_np = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR) |
| elif n_dim == 2: |
| img_np = _tensor.numpy() |
| else: |
| raise TypeError(f'Only support 4D, 3D or 2D tensor. But received with dimension: {n_dim}') |
| if out_type == np.uint8: |
| |
| img_np = (img_np * 255.0).round() |
| img_np = img_np.astype(out_type) |
| result.append(img_np) |
| if len(result) == 1: |
| result = result[0] |
| return result |