Spaces:
Sleeping
Sleeping
| import sys | |
| import time | |
| import numpy as np | |
| from PIL import Image | |
| from skimage import color | |
| from skimage.transform import resize | |
| import src.data.functional as F | |
| import torch | |
| from torch import nn | |
| import torch.nn.functional as F_torch | |
| import torchvision.transforms.functional as F_torchvision | |
| from numba import cuda, jit | |
| import math | |
| import torchvision.utils as vutils | |
| from torch.autograd import Variable | |
| import cv2 | |
| rgb_from_xyz = np.array( | |
| [ | |
| [3.24048134, -0.96925495, 0.05564664], | |
| [-1.53715152, 1.87599, -0.20404134], | |
| [-0.49853633, 0.04155593, 1.05731107], | |
| ] | |
| ) | |
| l_norm, ab_norm = 1.0, 1.0 | |
| l_mean, ab_mean = 50.0, 0 | |
| import numpy as np | |
| from PIL import Image | |
| from skimage.transform import resize | |
| import numpy as np | |
| from PIL import Image | |
| from skimage.transform import resize | |
| class SquaredPadding: | |
| def __init__(self, target_size=384, fill_value=0): | |
| self.target_size = target_size | |
| self.fill_value = fill_value | |
| def __call__(self, img, return_pil=True, return_paddings=False, dtype=np.uint8): | |
| if not isinstance(img, np.ndarray): | |
| img = np.array(img) | |
| ndim = len(img.shape) | |
| H, W = img.shape[:2] | |
| if H > W: | |
| H_new, W_new = self.target_size, int(W/H*self.target_size) | |
| # Resize image | |
| img = resize(img, (H_new, W_new), preserve_range=True).astype(dtype) | |
| # Padding image | |
| padded_size = H_new - W_new | |
| if ndim == 3: | |
| paddings = [(0, 0), (padded_size // 2, (padded_size // 2) + (padded_size % 2)), (0,0)] | |
| elif ndim == 2: | |
| paddings = [(0, 0), (padded_size // 2, (padded_size // 2) + (padded_size % 2))] | |
| padded_img = np.pad(img, paddings, mode='constant', constant_values=self.fill_value) | |
| else: | |
| H_new, W_new = int(H/W*self.target_size), self.target_size | |
| # Resize image | |
| img = resize(img, (H_new, W_new), preserve_range=True).astype(dtype) | |
| # Padding image | |
| padded_size = W_new - H_new | |
| if ndim == 3: | |
| paddings = [(padded_size // 2, (padded_size // 2) + (padded_size % 2)), (0, 0), (0,0)] | |
| elif ndim == 2: | |
| paddings = [(padded_size // 2, (padded_size // 2) + (padded_size % 2)), (0, 0)] | |
| padded_img = np.pad(img, paddings, mode='constant', constant_values=self.fill_value) | |
| if return_pil: | |
| padded_img = Image.fromarray(padded_img) | |
| if return_paddings: | |
| return padded_img, paddings | |
| return padded_img | |
| class UnpaddingSquare(): | |
| def __call__(self, img, paddings): | |
| if not isinstance(img, np.ndarray): | |
| img = np.array(img) | |
| H, W = img.shape[0], img.shape[1] | |
| (pad_top, pad_bottom), (pad_left, pad_right), _ = paddings | |
| W_ori = W - pad_left - pad_right | |
| H_ori = H - pad_top - pad_bottom | |
| return img[pad_top:pad_top+H_ori, pad_left:pad_left+W_ori, :] | |
| class UnpaddingSquare_Tensor(): | |
| def __call__(self, img, paddings): | |
| H, W = img.shape[1], img.shape[2] | |
| (pad_top, pad_bottom), (pad_left, pad_right), _ = paddings | |
| W_ori = W - pad_left - pad_right | |
| H_ori = H - pad_top - pad_bottom | |
| return img[:, pad_top:pad_top+H_ori, pad_left:pad_left+W_ori] | |
| class ResizeFlow(object): | |
| def __init__(self, target_size=(224,224)): | |
| self.target_size = target_size | |
| pass | |
| def __call__(self, flow): | |
| return F_torch.interpolate(flow.unsqueeze(0), self.target_size, mode='bilinear', align_corners=True).squeeze(0) | |
| class SquaredPaddingFlow(object): | |
| def __init__(self, fill_value=0): | |
| self.fill_value = fill_value | |
| def __call__(self, flow): | |
| H, W = flow.size(1), flow.size(2) | |
| if H > W: | |
| # Padding flow | |
| padded_size = H - W | |
| paddings = (padded_size // 2, (padded_size // 2) + (padded_size % 2), 0, 0) | |
| padded_img = F_torch.pad(flow, paddings, value=self.fill_value) | |
| else: | |
| # Padding flow | |
| padded_size = W - H | |
| paddings = (0, 0, padded_size // 2, (padded_size // 2) + (padded_size % 2)) | |
| padded_img = F_torch.pad(flow, paddings, value=self.fill_value) | |
| return padded_img | |
| def gray2rgb_batch(l): | |
| # gray image tensor to rgb image tensor | |
| l_uncenter = uncenter_l(l) | |
| l_uncenter = l_uncenter / (2 * l_mean) | |
| return torch.cat((l_uncenter, l_uncenter, l_uncenter), dim=1) | |
| def batch_lab2rgb_transpose_mc(img_l_mc, img_ab_mc, nrow=8): | |
| if isinstance(img_l_mc, Variable): | |
| img_l_mc = img_l_mc.data.cpu() | |
| if isinstance(img_ab_mc, Variable): | |
| img_ab_mc = img_ab_mc.data.cpu() | |
| if img_l_mc.is_cuda: | |
| img_l_mc = img_l_mc.cpu() | |
| if img_ab_mc.is_cuda: | |
| img_ab_mc = img_ab_mc.cpu() | |
| assert img_l_mc.dim() == 4 and img_ab_mc.dim() == 4, "only for batch input" | |
| img_l = img_l_mc * l_norm + l_mean | |
| img_ab = img_ab_mc * ab_norm + ab_mean | |
| pred_lab = torch.cat((img_l, img_ab), dim=1) | |
| grid_lab = vutils.make_grid(pred_lab, nrow=nrow).numpy().astype("float64") | |
| return (np.clip(color.lab2rgb(grid_lab.transpose((1, 2, 0))), 0, 1) * 255).astype("uint8") | |
| def vgg_preprocess(tensor): | |
| # input is RGB tensor which ranges in [0,1] | |
| # output is BGR tensor which ranges in [0,255] | |
| tensor_bgr = torch.cat((tensor[:, 2:3, :, :], tensor[:, 1:2, :, :], tensor[:, 0:1, :, :]), dim=1) | |
| tensor_bgr_ml = tensor_bgr - torch.Tensor([0.40760392, 0.45795686, 0.48501961]).type_as(tensor_bgr).view(1, 3, 1, 1) | |
| return tensor_bgr_ml * 255 | |
| def tensor_lab2rgb(input): | |
| """ | |
| n * 3* h *w | |
| """ | |
| input_trans = input.transpose(1, 2).transpose(2, 3) # n * h * w * 3 | |
| L, a, b = ( | |
| input_trans[:, :, :, 0:1], | |
| input_trans[:, :, :, 1:2], | |
| input_trans[:, :, :, 2:], | |
| ) | |
| y = (L + 16.0) / 116.0 | |
| x = (a / 500.0) + y | |
| z = y - (b / 200.0) | |
| neg_mask = z.data < 0 | |
| z[neg_mask] = 0 | |
| xyz = torch.cat((x, y, z), dim=3) | |
| mask = xyz.data > 0.2068966 | |
| mask_xyz = xyz.clone() | |
| mask_xyz[mask] = torch.pow(xyz[mask], 3.0) | |
| mask_xyz[~mask] = (xyz[~mask] - 16.0 / 116.0) / 7.787 | |
| mask_xyz[:, :, :, 0] = mask_xyz[:, :, :, 0] * 0.95047 | |
| mask_xyz[:, :, :, 2] = mask_xyz[:, :, :, 2] * 1.08883 | |
| rgb_trans = torch.mm(mask_xyz.view(-1, 3), torch.from_numpy(rgb_from_xyz).type_as(xyz)).view( | |
| input.size(0), input.size(2), input.size(3), 3 | |
| ) | |
| rgb = rgb_trans.transpose(2, 3).transpose(1, 2) | |
| mask = rgb > 0.0031308 | |
| mask_rgb = rgb.clone() | |
| mask_rgb[mask] = 1.055 * torch.pow(rgb[mask], 1 / 2.4) - 0.055 | |
| mask_rgb[~mask] = rgb[~mask] * 12.92 | |
| neg_mask = mask_rgb.data < 0 | |
| large_mask = mask_rgb.data > 1 | |
| mask_rgb[neg_mask] = 0 | |
| mask_rgb[large_mask] = 1 | |
| return mask_rgb | |
| ###### loss functions ###### | |
| def feature_normalize(feature_in): | |
| feature_in_norm = torch.norm(feature_in, 2, 1, keepdim=True) + sys.float_info.epsilon | |
| feature_in_norm = torch.div(feature_in, feature_in_norm) | |
| return feature_in_norm | |
| # denormalization for l | |
| def uncenter_l(l): | |
| return l * l_norm + l_mean | |
| def get_grid(x): | |
| torchHorizontal = torch.linspace(-1.0, 1.0, x.size(3)).view(1, 1, 1, x.size(3)).expand(x.size(0), 1, x.size(2), x.size(3)) | |
| torchVertical = torch.linspace(-1.0, 1.0, x.size(2)).view(1, 1, x.size(2), 1).expand(x.size(0), 1, x.size(2), x.size(3)) | |
| return torch.cat([torchHorizontal, torchVertical], 1) | |
| class WarpingLayer(nn.Module): | |
| def __init__(self, device): | |
| super(WarpingLayer, self).__init__() | |
| self.device = device | |
| def forward(self, x, flow): | |
| """ | |
| It takes the input image and the flow and warps the input image according to the flow | |
| Args: | |
| x: the input image | |
| flow: the flow tensor, which is a 4D tensor of shape (batch_size, 2, height, width) | |
| Returns: | |
| The warped image | |
| """ | |
| # WarpingLayer uses F.grid_sample, which expects normalized grid | |
| # we still output unnormalized flow for the convenience of comparing EPEs with FlowNet2 and original code | |
| # so here we need to denormalize the flow | |
| flow_for_grip = torch.zeros_like(flow).to(self.device) | |
| flow_for_grip[:, 0, :, :] = flow[:, 0, :, :] / ((flow.size(3) - 1.0) / 2.0) | |
| flow_for_grip[:, 1, :, :] = flow[:, 1, :, :] / ((flow.size(2) - 1.0) / 2.0) | |
| grid = (get_grid(x).to(self.device) + flow_for_grip).permute(0, 2, 3, 1) | |
| return F_torch.grid_sample(x, grid, align_corners=True) | |
| class CenterPad_threshold(object): | |
| def __init__(self, image_size, threshold=3 / 4): | |
| self.height = image_size[0] | |
| self.width = image_size[1] | |
| self.threshold = threshold | |
| def __call__(self, image): | |
| # pad the image to 16:9 | |
| # pad height | |
| I = np.array(image) | |
| # for padded input | |
| height_old = np.size(I, 0) | |
| width_old = np.size(I, 1) | |
| old_size = [height_old, width_old] | |
| height = self.height | |
| width = self.width | |
| I_pad = np.zeros((height, width, np.size(I, 2))) | |
| ratio = height / width | |
| if height_old / width_old == ratio: | |
| if height_old == height: | |
| return Image.fromarray(I.astype(np.uint8)) | |
| new_size = [int(x * height / height_old) for x in old_size] | |
| I_resize = resize(I, new_size, mode="reflect", preserve_range=True, clip=False, anti_aliasing=True) | |
| return Image.fromarray(I_resize.astype(np.uint8)) | |
| if height_old / width_old > self.threshold: | |
| width_new, height_new = width_old, int(width_old * self.threshold) | |
| height_margin = height_old - height_new | |
| height_crop_start = height_margin // 2 | |
| I_crop = I[height_crop_start : (height_crop_start + height_new), :, :] | |
| I_resize = resize(I_crop, [height, width], mode="reflect", preserve_range=True, clip=False, anti_aliasing=True) | |
| return Image.fromarray(I_resize.astype(np.uint8)) | |
| if height_old / width_old > ratio: # pad the width and crop | |
| new_size = [int(x * width / width_old) for x in old_size] | |
| I_resize = resize(I, new_size, mode="reflect", preserve_range=True, clip=False, anti_aliasing=True) | |
| width_resize = np.size(I_resize, 1) | |
| height_resize = np.size(I_resize, 0) | |
| start_height = (height_resize - height) // 2 | |
| I_pad[:, :, :] = I_resize[start_height : (start_height + height), :, :] | |
| else: # pad the height and crop | |
| new_size = [int(x * height / height_old) for x in old_size] | |
| I_resize = resize(I, new_size, mode="reflect", preserve_range=True, clip=False, anti_aliasing=True) | |
| width_resize = np.size(I_resize, 1) | |
| height_resize = np.size(I_resize, 0) | |
| start_width = (width_resize - width) // 2 | |
| I_pad[:, :, :] = I_resize[:, start_width : (start_width + width), :] | |
| return Image.fromarray(I_pad.astype(np.uint8)) | |
| class Normalize(object): | |
| def __init__(self): | |
| pass | |
| def __call__(self, inputs): | |
| inputs[0:1, :, :] = F.normalize(inputs[0:1, :, :], 50, 1) | |
| inputs[1:3, :, :] = F.normalize(inputs[1:3, :, :], (0, 0), (1, 1)) | |
| return inputs | |
| class RGB2Lab(object): | |
| def __init__(self): | |
| pass | |
| def __call__(self, inputs): | |
| normed_inputs = np.float32(inputs) / 255.0 | |
| rgb_inputs = cv2.cvtColor(normed_inputs, cv2.COLOR_RGB2LAB) | |
| return rgb_inputs | |
| class ToTensor(object): | |
| def __init__(self): | |
| pass | |
| def __call__(self, inputs): | |
| return F.to_mytensor(inputs) | |
| class CenterPad(object): | |
| def __init__(self, image_size): | |
| self.height = image_size[0] | |
| self.width = image_size[1] | |
| def __call__(self, image): | |
| # pad the image to 16:9 | |
| # pad height | |
| I = np.array(image) | |
| # for padded input | |
| height_old = np.size(I, 0) | |
| width_old = np.size(I, 1) | |
| old_size = [height_old, width_old] | |
| height = self.height | |
| width = self.width | |
| I_pad = np.zeros((height, width, np.size(I, 2))) | |
| ratio = height / width | |
| if height_old / width_old == ratio: | |
| if height_old == height: | |
| return Image.fromarray(I.astype(np.uint8)) | |
| new_size = [int(x * height / height_old) for x in old_size] | |
| I_resize = resize(I, new_size, mode="reflect", preserve_range=True, clip=False, anti_aliasing=True) | |
| return Image.fromarray(I_resize.astype(np.uint8)) | |
| if height_old / width_old > ratio: # pad the width and crop | |
| new_size = [int(x * width / width_old) for x in old_size] | |
| I_resize = resize(I, new_size, mode="reflect", preserve_range=True, clip=False, anti_aliasing=True) | |
| width_resize = np.size(I_resize, 1) | |
| height_resize = np.size(I_resize, 0) | |
| start_height = (height_resize - height) // 2 | |
| I_pad[:, :, :] = I_resize[start_height : (start_height + height), :, :] | |
| else: # pad the height and crop | |
| new_size = [int(x * height / height_old) for x in old_size] | |
| I_resize = resize(I, new_size, mode="reflect", preserve_range=True, clip=False, anti_aliasing=True) | |
| width_resize = np.size(I_resize, 1) | |
| height_resize = np.size(I_resize, 0) | |
| start_width = (width_resize - width) // 2 | |
| I_pad[:, :, :] = I_resize[:, start_width : (start_width + width), :] | |
| return Image.fromarray(I_pad.astype(np.uint8)) | |
| class CenterPadCrop_numpy(object): | |
| """ | |
| pad the image according to the height | |
| """ | |
| def __init__(self, image_size): | |
| self.height = image_size[0] | |
| self.width = image_size[1] | |
| def __call__(self, image, threshold=3 / 4): | |
| # pad the image to 16:9 | |
| # pad height | |
| I = np.array(image) | |
| # for padded input | |
| height_old = np.size(I, 0) | |
| width_old = np.size(I, 1) | |
| old_size = [height_old, width_old] | |
| height = self.height | |
| width = self.width | |
| padding_size = width | |
| if image.ndim == 2: | |
| I_pad = np.zeros((width, width)) | |
| else: | |
| I_pad = np.zeros((width, width, I.shape[2])) | |
| ratio = height / width | |
| if height_old / width_old == ratio: | |
| return I | |
| # if height_old / width_old > threshold: | |
| # width_new, height_new = width_old, int(width_old * threshold) | |
| # height_margin = height_old - height_new | |
| # height_crop_start = height_margin // 2 | |
| # I_crop = I[height_start : (height_start + height_new), :] | |
| # I_resize = resize( | |
| # I_crop, [height, width], mode="reflect", preserve_range=True, clip=False, anti_aliasing=True | |
| # ) | |
| # return I_resize | |
| if height_old / width_old > ratio: # pad the width and crop | |
| new_size = [int(x * width / width_old) for x in old_size] | |
| I_resize = resize(I, new_size, mode="reflect", preserve_range=True, clip=False, anti_aliasing=True) | |
| width_resize = np.size(I_resize, 1) | |
| height_resize = np.size(I_resize, 0) | |
| start_height = (height_resize - height) // 2 | |
| start_height_block = (padding_size - height) // 2 | |
| if image.ndim == 2: | |
| I_pad[start_height_block : (start_height_block + height), :] = I_resize[ | |
| start_height : (start_height + height), : | |
| ] | |
| else: | |
| I_pad[start_height_block : (start_height_block + height), :, :] = I_resize[ | |
| start_height : (start_height + height), :, : | |
| ] | |
| else: # pad the height and crop | |
| new_size = [int(x * height / height_old) for x in old_size] | |
| I_resize = resize(I, new_size, mode="reflect", preserve_range=True, clip=False, anti_aliasing=True) | |
| width_resize = np.size(I_resize, 1) | |
| height_resize = np.size(I_resize, 0) | |
| start_width = (width_resize - width) // 2 | |
| start_width_block = (padding_size - width) // 2 | |
| if image.ndim == 2: | |
| I_pad[:, start_width_block : (start_width_block + width)] = I_resize[:, start_width : (start_width + width)] | |
| else: | |
| I_pad[:, start_width_block : (start_width_block + width), :] = I_resize[ | |
| :, start_width : (start_width + width), : | |
| ] | |
| crop_start_height = (I_pad.shape[0] - height) // 2 | |
| crop_start_width = (I_pad.shape[1] - width) // 2 | |
| if image.ndim == 2: | |
| return I_pad[crop_start_height : (crop_start_height + height), crop_start_width : (crop_start_width + width)] | |
| else: | |
| return I_pad[crop_start_height : (crop_start_height + height), crop_start_width : (crop_start_width + width), :] | |
| def biInterpolation_cpu(distorted, i, j): | |
| i = np.uint16(i) | |
| j = np.uint16(j) | |
| Q11 = distorted[j, i] | |
| Q12 = distorted[j, i + 1] | |
| Q21 = distorted[j + 1, i] | |
| Q22 = distorted[j + 1, i + 1] | |
| return np.int8( | |
| Q11 * (i + 1 - i) * (j + 1 - j) + Q12 * (i - i) * (j + 1 - j) + Q21 * (i + 1 - i) * (j - j) + Q22 * (i - i) * (j - j) | |
| ) | |
| def iterSearchShader_cpu(padu, padv, xr, yr, W, H, maxIter, precision): | |
| # print('processing location', (xr, yr)) | |
| # | |
| if abs(padu[yr, xr]) < precision and abs(padv[yr, xr]) < precision: | |
| return xr, yr | |
| # Our initialize method in this paper, can see the overleaf for detail | |
| if (xr + 1) <= (W - 1): | |
| dif = padu[yr, xr + 1] - padu[yr, xr] | |
| else: | |
| dif = padu[yr, xr] - padu[yr, xr - 1] | |
| u_next = padu[yr, xr] / (1 + dif) | |
| if (yr + 1) <= (H - 1): | |
| dif = padv[yr + 1, xr] - padv[yr, xr] | |
| else: | |
| dif = padv[yr, xr] - padv[yr - 1, xr] | |
| v_next = padv[yr, xr] / (1 + dif) | |
| i = xr - u_next | |
| j = yr - v_next | |
| i_int = int(i) | |
| j_int = int(j) | |
| # The same as traditional iterative search method | |
| for _ in range(maxIter): | |
| if not 0 <= i <= (W - 1) or not 0 <= j <= (H - 1): | |
| return i, j | |
| u11 = padu[j_int, i_int] | |
| v11 = padv[j_int, i_int] | |
| u12 = padu[j_int, i_int + 1] | |
| v12 = padv[j_int, i_int + 1] | |
| int1 = padu[j_int + 1, i_int] | |
| v21 = padv[j_int + 1, i_int] | |
| int2 = padu[j_int + 1, i_int + 1] | |
| v22 = padv[j_int + 1, i_int + 1] | |
| u = ( | |
| u11 * (i_int + 1 - i) * (j_int + 1 - j) | |
| + u12 * (i - i_int) * (j_int + 1 - j) | |
| + int1 * (i_int + 1 - i) * (j - j_int) | |
| + int2 * (i - i_int) * (j - j_int) | |
| ) | |
| v = ( | |
| v11 * (i_int + 1 - i) * (j_int + 1 - j) | |
| + v12 * (i - i_int) * (j_int + 1 - j) | |
| + v21 * (i_int + 1 - i) * (j - j_int) | |
| + v22 * (i - i_int) * (j - j_int) | |
| ) | |
| i_next = xr - u | |
| j_next = yr - v | |
| if abs(i - i_next) < precision and abs(j - j_next) < precision: | |
| return i, j | |
| i = i_next | |
| j = j_next | |
| # if the search doesn't converge within max iter, it will return the last iter result | |
| return i_next, j_next | |
| def iterSearch_cpu(distortImg, resultImg, padu, padv, W, H, maxIter=5, precision=1e-2): | |
| for xr in range(W): | |
| for yr in range(H): | |
| # (xr, yr) is the point in result image, (i, j) is the search result in distorted image | |
| i, j = iterSearchShader_cpu(padu, padv, xr, yr, W, H, maxIter, precision) | |
| # reflect the pixels outside the border | |
| if i > W - 1: | |
| i = 2 * W - 1 - i | |
| if i < 0: | |
| i = -i | |
| if j > H - 1: | |
| j = 2 * H - 1 - j | |
| if j < 0: | |
| j = -j | |
| # Bilinear interpolation to get the pixel at (i, j) in distorted image | |
| resultImg[yr, xr, 0] = biInterpolation_cpu( | |
| distortImg[:, :, 0], | |
| i, | |
| j, | |
| ) | |
| resultImg[yr, xr, 1] = biInterpolation_cpu( | |
| distortImg[:, :, 1], | |
| i, | |
| j, | |
| ) | |
| resultImg[yr, xr, 2] = biInterpolation_cpu( | |
| distortImg[:, :, 2], | |
| i, | |
| j, | |
| ) | |
| return None | |
| def forward_mapping_cpu(source_image, u, v, maxIter=5, precision=1e-2): | |
| """ | |
| warp the image according to the forward flow | |
| u: horizontal | |
| v: vertical | |
| """ | |
| H = source_image.shape[0] | |
| W = source_image.shape[1] | |
| distortImg = np.array(np.zeros((H + 1, W + 1, 3)), dtype=np.uint8) | |
| distortImg[0:H, 0:W] = source_image[0:H, 0:W] | |
| distortImg[H, 0:W] = source_image[H - 1, 0:W] | |
| distortImg[0:H, W] = source_image[0:H, W - 1] | |
| distortImg[H, W] = source_image[H - 1, W - 1] | |
| padu = np.array(np.zeros((H + 1, W + 1)), dtype=np.float32) | |
| padu[0:H, 0:W] = u[0:H, 0:W] | |
| padu[H, 0:W] = u[H - 1, 0:W] | |
| padu[0:H, W] = u[0:H, W - 1] | |
| padu[H, W] = u[H - 1, W - 1] | |
| padv = np.array(np.zeros((H + 1, W + 1)), dtype=np.float32) | |
| padv[0:H, 0:W] = v[0:H, 0:W] | |
| padv[H, 0:W] = v[H - 1, 0:W] | |
| padv[0:H, W] = v[0:H, W - 1] | |
| padv[H, W] = v[H - 1, W - 1] | |
| resultImg = np.array(np.zeros((H, W, 3)), dtype=np.uint8) | |
| iterSearch_cpu(distortImg, resultImg, padu, padv, W, H, maxIter, precision) | |
| return resultImg | |
| class Distortion_with_flow_cpu(object): | |
| """Elastic distortion""" | |
| def __init__(self, maxIter=3, precision=1e-3): | |
| self.maxIter = maxIter | |
| self.precision = precision | |
| def __call__(self, inputs, dx, dy): | |
| inputs = np.array(inputs) | |
| shape = inputs.shape[0], inputs.shape[1] | |
| remap_image = forward_mapping_cpu(inputs, dy, dx, maxIter=self.maxIter, precision=self.precision) | |
| return Image.fromarray(remap_image) | |
| def biInterpolation_gpu(distorted, i, j): | |
| i = int(i) | |
| j = int(j) | |
| Q11 = distorted[j, i] | |
| Q12 = distorted[j, i + 1] | |
| Q21 = distorted[j + 1, i] | |
| Q22 = distorted[j + 1, i + 1] | |
| return np.int8( | |
| Q11 * (i + 1 - i) * (j + 1 - j) + Q12 * (i - i) * (j + 1 - j) + Q21 * (i + 1 - i) * (j - j) + Q22 * (i - i) * (j - j) | |
| ) | |
| def iterSearchShader_gpu(padu, padv, xr, yr, W, H, maxIter, precision): | |
| # print('processing location', (xr, yr)) | |
| # | |
| if abs(padu[yr, xr]) < precision and abs(padv[yr, xr]) < precision: | |
| return xr, yr | |
| # Our initialize method in this paper, can see the overleaf for detail | |
| if (xr + 1) <= (W - 1): | |
| dif = padu[yr, xr + 1] - padu[yr, xr] | |
| else: | |
| dif = padu[yr, xr] - padu[yr, xr - 1] | |
| u_next = padu[yr, xr] / (1 + dif) | |
| if (yr + 1) <= (H - 1): | |
| dif = padv[yr + 1, xr] - padv[yr, xr] | |
| else: | |
| dif = padv[yr, xr] - padv[yr - 1, xr] | |
| v_next = padv[yr, xr] / (1 + dif) | |
| i = xr - u_next | |
| j = yr - v_next | |
| i_int = int(i) | |
| j_int = int(j) | |
| # The same as traditional iterative search method | |
| for _ in range(maxIter): | |
| if not 0 <= i <= (W - 1) or not 0 <= j <= (H - 1): | |
| return i, j | |
| u11 = padu[j_int, i_int] | |
| v11 = padv[j_int, i_int] | |
| u12 = padu[j_int, i_int + 1] | |
| v12 = padv[j_int, i_int + 1] | |
| int1 = padu[j_int + 1, i_int] | |
| v21 = padv[j_int + 1, i_int] | |
| int2 = padu[j_int + 1, i_int + 1] | |
| v22 = padv[j_int + 1, i_int + 1] | |
| u = ( | |
| u11 * (i_int + 1 - i) * (j_int + 1 - j) | |
| + u12 * (i - i_int) * (j_int + 1 - j) | |
| + int1 * (i_int + 1 - i) * (j - j_int) | |
| + int2 * (i - i_int) * (j - j_int) | |
| ) | |
| v = ( | |
| v11 * (i_int + 1 - i) * (j_int + 1 - j) | |
| + v12 * (i - i_int) * (j_int + 1 - j) | |
| + v21 * (i_int + 1 - i) * (j - j_int) | |
| + v22 * (i - i_int) * (j - j_int) | |
| ) | |
| i_next = xr - u | |
| j_next = yr - v | |
| if abs(i - i_next) < precision and abs(j - j_next) < precision: | |
| return i, j | |
| i = i_next | |
| j = j_next | |
| # if the search doesn't converge within max iter, it will return the last iter result | |
| return i_next, j_next | |
| def iterSearch_gpu(distortImg, resultImg, padu, padv, W, H, maxIter=5, precision=1e-2): | |
| start_x, start_y = cuda.grid(2) | |
| stride_x, stride_y = cuda.gridsize(2) | |
| for xr in range(start_x, W, stride_x): | |
| for yr in range(start_y, H, stride_y): | |
| i,j = iterSearchShader_gpu(padu, padv, xr, yr, W, H, maxIter, precision) | |
| if i > W - 1: | |
| i = 2 * W - 1 - i | |
| if i < 0: | |
| i = -i | |
| if j > H - 1: | |
| j = 2 * H - 1 - j | |
| if j < 0: | |
| j = -j | |
| resultImg[yr, xr,0] = biInterpolation_gpu(distortImg[:,:,0], i, j) | |
| resultImg[yr, xr,1] = biInterpolation_gpu(distortImg[:,:,1], i, j) | |
| resultImg[yr, xr,2] = biInterpolation_gpu(distortImg[:,:,2], i, j) | |
| return None | |
| def forward_mapping_gpu(source_image, u, v, maxIter=5, precision=1e-2): | |
| """ | |
| warp the image according to the forward flow | |
| u: horizontal | |
| v: vertical | |
| """ | |
| H = source_image.shape[0] | |
| W = source_image.shape[1] | |
| resultImg = np.array(np.zeros((H, W, 3)), dtype=np.uint8) | |
| distortImg = np.array(np.zeros((H + 1, W + 1, 3)), dtype=np.uint8) | |
| distortImg[0:H, 0:W] = source_image[0:H, 0:W] | |
| distortImg[H, 0:W] = source_image[H - 1, 0:W] | |
| distortImg[0:H, W] = source_image[0:H, W - 1] | |
| distortImg[H, W] = source_image[H - 1, W - 1] | |
| padu = np.array(np.zeros((H + 1, W + 1)), dtype=np.float32) | |
| padu[0:H, 0:W] = u[0:H, 0:W] | |
| padu[H, 0:W] = u[H - 1, 0:W] | |
| padu[0:H, W] = u[0:H, W - 1] | |
| padu[H, W] = u[H - 1, W - 1] | |
| padv = np.array(np.zeros((H + 1, W + 1)), dtype=np.float32) | |
| padv[0:H, 0:W] = v[0:H, 0:W] | |
| padv[H, 0:W] = v[H - 1, 0:W] | |
| padv[0:H, W] = v[0:H, W - 1] | |
| padv[H, W] = v[H - 1, W - 1] | |
| padu = cuda.to_device(padu) | |
| padv = cuda.to_device(padv) | |
| distortImg = cuda.to_device(distortImg) | |
| resultImg = cuda.to_device(resultImg) | |
| threadsperblock = (16, 16) | |
| blockspergrid_x = math.ceil(W / threadsperblock[0]) | |
| blockspergrid_y = math.ceil(H / threadsperblock[1]) | |
| blockspergrid = (blockspergrid_x, blockspergrid_y) | |
| iterSearch_gpu[blockspergrid, threadsperblock](distortImg, resultImg, padu, padv, W, H, maxIter, precision) | |
| resultImg = resultImg.copy_to_host() | |
| return resultImg | |
| class Distortion_with_flow_gpu(object): | |
| def __init__(self, maxIter=3, precision=1e-3): | |
| self.maxIter = maxIter | |
| self.precision = precision | |
| def __call__(self, inputs, dx, dy): | |
| inputs = np.array(inputs) | |
| shape = inputs.shape[0], inputs.shape[1] | |
| remap_image = forward_mapping_gpu(inputs, dy, dx, maxIter=self.maxIter, precision=self.precision) | |
| return Image.fromarray(remap_image) | |
| def read_flow(filename): | |
| """ | |
| read optical flow from Middlebury .flo file | |
| :param filename: name of the flow file | |
| :return: optical flow data in matrix | |
| """ | |
| f = open(filename, "rb") | |
| try: | |
| magic = np.fromfile(f, np.float32, count=1)[0] # For Python3.x | |
| except: | |
| magic = np.fromfile(f, np.float32, count=1) # For Python2.x | |
| data2d = None | |
| if (202021.25 != magic)and(123.25!=magic): | |
| print("Magic number incorrect. Invalid .flo file") | |
| elif (123.25==magic): | |
| w = np.fromfile(f, np.int32, count=1)[0] | |
| h = np.fromfile(f, np.int32, count=1)[0] | |
| # print("Reading %d x %d flo file" % (h, w)) | |
| data2d = np.fromfile(f, np.float16, count=2 * w * h) | |
| # reshape data into 3D array (columns, rows, channels) | |
| data2d = np.resize(data2d, (h, w, 2)) | |
| elif (202021.25 == magic): | |
| w = np.fromfile(f, np.int32, count=1)[0] | |
| h = np.fromfile(f, np.int32, count=1)[0] | |
| # print("Reading %d x %d flo file" % (h, w)) | |
| data2d = np.fromfile(f, np.float32, count=2 * w * h) | |
| # reshape data into 3D array (columns, rows, channels) | |
| data2d = np.resize(data2d, (h, w, 2)) | |
| f.close() | |
| return data2d.astype(np.float32) | |
| class LossHandler: | |
| def __init__(self): | |
| self.loss_dict = {} | |
| self.count_sample = 0 | |
| def add_loss(self, key, loss): | |
| if key not in self.loss_dict: | |
| self.loss_dict[key] = 0 | |
| self.loss_dict[key] += loss | |
| def get_loss(self, key): | |
| return self.loss_dict[key] / self.count_sample | |
| def count_one_sample(self): | |
| self.count_sample += 1 | |
| def reset(self): | |
| self.loss_dict = {} | |
| self.count_sample = 0 | |
| class TimeHandler: | |
| def __init__(self): | |
| self.time_handler = {} | |
| def compute_time(self, key): | |
| if key not in self.time_handler: | |
| self.time_handler[key] = time.time() | |
| return None | |
| else: | |
| return time.time() - self.time_handler.pop(key) | |
| def print_num_params(model, is_trainable=False): | |
| model_name = model.__class__.__name__.ljust(30) | |
| if is_trainable: | |
| num_params = sum(p.numel() for p in model.parameters() if p.requires_grad) | |
| print(f"| TRAINABLE | {model_name} | {('{:,}'.format(num_params)).rjust(10)} |") | |
| else: | |
| num_params = sum(p.numel() for p in model.parameters()) | |
| print(f"| GENERAL | {model_name} | {('{:,}'.format(num_params)).rjust(10)} |") | |
| return num_params |