| import torch |
| from torch.nn import init |
| import torch.nn as nn |
| import torch.nn.functional as F |
| import functools |
|
|
| import numpy as np |
| from .mesh_util import * |
| from .sample_util import * |
| from .geometry import index |
| import cv2 |
| from PIL import Image |
| from tqdm import tqdm |
|
|
|
|
| def reshape_multiview_tensors(image_tensor, calib_tensor): |
| |
| |
| |
| |
| image_tensor = image_tensor.view( |
| image_tensor.shape[0] * image_tensor.shape[1], |
| image_tensor.shape[2], |
| image_tensor.shape[3], |
| image_tensor.shape[4] |
| ) |
| calib_tensor = calib_tensor.view( |
| calib_tensor.shape[0] * calib_tensor.shape[1], |
| calib_tensor.shape[2], |
| calib_tensor.shape[3] |
| ) |
|
|
| return image_tensor, calib_tensor |
|
|
|
|
| def reshape_sample_tensor(sample_tensor, num_views): |
| if num_views == 1: |
| return sample_tensor |
| |
| sample_tensor = sample_tensor.unsqueeze(dim=1) |
| sample_tensor = sample_tensor.repeat(1, num_views, 1, 1) |
| sample_tensor = sample_tensor.view( |
| sample_tensor.shape[0] * sample_tensor.shape[1], |
| sample_tensor.shape[2], |
| sample_tensor.shape[3] |
| ) |
| return sample_tensor |
|
|
|
|
| def gen_mesh(opt, net, cuda, data, save_path, use_octree=True): |
| image_tensor = data['img'].to(device=cuda) |
| calib_tensor = data['calib'].to(device=cuda) |
|
|
| net.filter(image_tensor) |
|
|
| b_min = data['b_min'] |
| b_max = data['b_max'] |
| try: |
| save_img_path = save_path[:-4] + '.png' |
| save_img_list = [] |
| for v in range(image_tensor.shape[0]): |
| save_img = (np.transpose(image_tensor[v].detach().cpu().numpy(), (1, 2, 0)) * 0.5 + 0.5)[:, :, ::-1] * 255.0 |
| save_img_list.append(save_img) |
| save_img = np.concatenate(save_img_list, axis=1) |
| Image.fromarray(np.uint8(save_img[:,:,::-1])).save(save_img_path) |
|
|
| verts, faces, _, _ = reconstruction( |
| net, cuda, calib_tensor, opt.resolution, b_min, b_max, use_octree=use_octree) |
| verts_tensor = torch.from_numpy(verts.T).unsqueeze(0).to(device=cuda).float() |
| xyz_tensor = net.projection(verts_tensor, calib_tensor[:1]) |
| uv = xyz_tensor[:, :2, :] |
| color = index(image_tensor[:1], uv).detach().cpu().numpy()[0].T |
| color = color * 0.5 + 0.5 |
| save_obj_mesh_with_color(save_path, verts, faces, color) |
| except Exception as e: |
| print(e) |
| print('Can not create marching cubes at this time.') |
|
|
| def gen_mesh_color(opt, netG, netC, cuda, data, save_path, use_octree=True): |
| image_tensor = data['img'].to(device=cuda) |
| calib_tensor = data['calib'].to(device=cuda) |
|
|
| netG.filter(image_tensor) |
| netC.filter(image_tensor) |
| netC.attach(netG.get_im_feat()) |
|
|
| b_min = data['b_min'] |
| b_max = data['b_max'] |
| try: |
| save_img_path = save_path[:-4] + '.png' |
| save_img_list = [] |
| for v in range(image_tensor.shape[0]): |
| save_img = (np.transpose(image_tensor[v].detach().cpu().numpy(), (1, 2, 0)) * 0.5 + 0.5)[:, :, ::-1] * 255.0 |
| save_img_list.append(save_img) |
| save_img = np.concatenate(save_img_list, axis=1) |
| Image.fromarray(np.uint8(save_img[:,:,::-1])).save(save_img_path) |
|
|
| verts, faces, _, _ = reconstruction( |
| netG, cuda, calib_tensor, opt.resolution, b_min, b_max, use_octree=use_octree) |
|
|
| |
| verts_tensor = torch.from_numpy(verts.T).unsqueeze(0).to(device=cuda).float() |
| verts_tensor = reshape_sample_tensor(verts_tensor, opt.num_views) |
|
|
| color = np.zeros(verts.shape) |
| interval = opt.num_sample_color |
| for i in range(len(color) // interval): |
| left = i * interval |
| right = i * interval + interval |
| if i == len(color) // interval - 1: |
| right = -1 |
| netC.query(verts_tensor[:, :, left:right], calib_tensor) |
| rgb = netC.get_preds()[0].detach().cpu().numpy() * 0.5 + 0.5 |
| color[left:right] = rgb.T |
|
|
| save_obj_mesh_with_color(save_path, verts, faces, color) |
| except Exception as e: |
| print(e) |
| print('Can not create marching cubes at this time.') |
|
|
| def adjust_learning_rate(optimizer, epoch, lr, schedule, gamma): |
| """Sets the learning rate to the initial LR decayed by schedule""" |
| if epoch in schedule: |
| lr *= gamma |
| for param_group in optimizer.param_groups: |
| param_group['lr'] = lr |
| return lr |
|
|
|
|
| def compute_acc(pred, gt, thresh=0.5): |
| ''' |
| return: |
| IOU, precision, and recall |
| ''' |
| with torch.no_grad(): |
| vol_pred = pred > thresh |
| vol_gt = gt > thresh |
|
|
| union = vol_pred | vol_gt |
| inter = vol_pred & vol_gt |
|
|
| true_pos = inter.sum().float() |
|
|
| union = union.sum().float() |
| if union == 0: |
| union = 1 |
| vol_pred = vol_pred.sum().float() |
| if vol_pred == 0: |
| vol_pred = 1 |
| vol_gt = vol_gt.sum().float() |
| if vol_gt == 0: |
| vol_gt = 1 |
| return true_pos / union, true_pos / vol_pred, true_pos / vol_gt |
|
|
|
|
| def calc_error(opt, net, cuda, dataset, num_tests): |
| if num_tests > len(dataset): |
| num_tests = len(dataset) |
| with torch.no_grad(): |
| erorr_arr, IOU_arr, prec_arr, recall_arr = [], [], [], [] |
| for idx in tqdm(range(num_tests)): |
| data = dataset[idx * len(dataset) // num_tests] |
| |
| image_tensor = data['img'].to(device=cuda) |
| calib_tensor = data['calib'].to(device=cuda) |
| sample_tensor = data['samples'].to(device=cuda).unsqueeze(0) |
| if opt.num_views > 1: |
| sample_tensor = reshape_sample_tensor(sample_tensor, opt.num_views) |
| label_tensor = data['labels'].to(device=cuda).unsqueeze(0) |
|
|
| res, error = net.forward(image_tensor, sample_tensor, calib_tensor, labels=label_tensor) |
|
|
| IOU, prec, recall = compute_acc(res, label_tensor) |
|
|
| |
| |
| |
| erorr_arr.append(error.item()) |
| IOU_arr.append(IOU.item()) |
| prec_arr.append(prec.item()) |
| recall_arr.append(recall.item()) |
|
|
| return np.average(erorr_arr), np.average(IOU_arr), np.average(prec_arr), np.average(recall_arr) |
|
|
| def calc_error_color(opt, netG, netC, cuda, dataset, num_tests): |
| if num_tests > len(dataset): |
| num_tests = len(dataset) |
| with torch.no_grad(): |
| error_color_arr = [] |
|
|
| for idx in tqdm(range(num_tests)): |
| data = dataset[idx * len(dataset) // num_tests] |
| |
| image_tensor = data['img'].to(device=cuda) |
| calib_tensor = data['calib'].to(device=cuda) |
| color_sample_tensor = data['color_samples'].to(device=cuda).unsqueeze(0) |
|
|
| if opt.num_views > 1: |
| color_sample_tensor = reshape_sample_tensor(color_sample_tensor, opt.num_views) |
|
|
| rgb_tensor = data['rgbs'].to(device=cuda).unsqueeze(0) |
|
|
| netG.filter(image_tensor) |
| _, errorC = netC.forward(image_tensor, netG.get_im_feat(), color_sample_tensor, calib_tensor, labels=rgb_tensor) |
|
|
| |
| |
| error_color_arr.append(errorC.item()) |
|
|
| return np.average(error_color_arr) |
|
|
|
|
| def conv3x3(in_planes, out_planes, strd=1, padding=1, bias=False): |
| "3x3 convolution with padding" |
| return nn.Conv2d(in_planes, out_planes, kernel_size=3, |
| stride=strd, padding=padding, bias=bias) |
|
|
| def init_weights(net, init_type='normal', init_gain=0.02): |
| """Initialize network weights. |
| |
| Parameters: |
| net (network) -- network to be initialized |
| init_type (str) -- the name of an initialization method: normal | xavier | kaiming | orthogonal |
| init_gain (float) -- scaling factor for normal, xavier and orthogonal. |
| |
| We use 'normal' in the original pix2pix and CycleGAN paper. But xavier and kaiming might |
| work better for some applications. Feel free to try yourself. |
| """ |
|
|
| def init_func(m): |
| classname = m.__class__.__name__ |
| if hasattr(m, 'weight') and (classname.find('Conv') != -1 or classname.find('Linear') != -1): |
| if init_type == 'normal': |
| init.normal_(m.weight.data, 0.0, init_gain) |
| elif init_type == 'xavier': |
| init.xavier_normal_(m.weight.data, gain=init_gain) |
| elif init_type == 'kaiming': |
| init.kaiming_normal_(m.weight.data, a=0, mode='fan_in') |
| elif init_type == 'orthogonal': |
| init.orthogonal_(m.weight.data, gain=init_gain) |
| else: |
| raise NotImplementedError('initialization method [%s] is not implemented' % init_type) |
| if hasattr(m, 'bias') and m.bias is not None: |
| init.constant_(m.bias.data, 0.0) |
| elif classname.find( |
| 'BatchNorm2d') != -1: |
| init.normal_(m.weight.data, 1.0, init_gain) |
| init.constant_(m.bias.data, 0.0) |
|
|
| print('initialize network with %s' % init_type) |
| net.apply(init_func) |
|
|
|
|
| def init_net(net, init_type='normal', init_gain=0.02, gpu_ids=[]): |
| """Initialize a network: 1. register CPU/GPU device (with multi-GPU support); 2. initialize the network weights |
| Parameters: |
| net (network) -- the network to be initialized |
| init_type (str) -- the name of an initialization method: normal | xavier | kaiming | orthogonal |
| gain (float) -- scaling factor for normal, xavier and orthogonal. |
| gpu_ids (int list) -- which GPUs the network runs on: e.g., 0,1,2 |
| |
| Return an initialized network. |
| """ |
| if len(gpu_ids) > 0: |
| assert (torch.cuda.is_available()) |
| net.to(gpu_ids[0]) |
| net = torch.nn.DataParallel(net, gpu_ids) |
| init_weights(net, init_type, init_gain=init_gain) |
| return net |
|
|
|
|
| def imageSpaceRotation(xy, rot): |
| ''' |
| args: |
| xy: (B, 2, N) input |
| rot: (B, 2) x,y axis rotation angles |
| |
| rotation center will be always image center (other rotation center can be represented by additional z translation) |
| ''' |
| disp = rot.unsqueeze(2).sin().expand_as(xy) |
| return (disp * xy).sum(dim=1) |
|
|
|
|
| def cal_gradient_penalty(netD, real_data, fake_data, device, type='mixed', constant=1.0, lambda_gp=10.0): |
| """Calculate the gradient penalty loss, used in WGAN-GP paper https://arxiv.org/abs/1704.00028 |
| |
| Arguments: |
| netD (network) -- discriminator network |
| real_data (tensor array) -- real images |
| fake_data (tensor array) -- generated images from the generator |
| device (str) -- GPU / CPU: from torch.device('cuda:{}'.format(self.gpu_ids[0])) if self.gpu_ids else torch.device('cpu') |
| type (str) -- if we mix real and fake data or not [real | fake | mixed]. |
| constant (float) -- the constant used in formula ( | |gradient||_2 - constant)^2 |
| lambda_gp (float) -- weight for this loss |
| |
| Returns the gradient penalty loss |
| """ |
| if lambda_gp > 0.0: |
| if type == 'real': |
| interpolatesv = real_data |
| elif type == 'fake': |
| interpolatesv = fake_data |
| elif type == 'mixed': |
| alpha = torch.rand(real_data.shape[0], 1) |
| alpha = alpha.expand(real_data.shape[0], real_data.nelement() // real_data.shape[0]).contiguous().view( |
| *real_data.shape) |
| alpha = alpha.to(device) |
| interpolatesv = alpha * real_data + ((1 - alpha) * fake_data) |
| else: |
| raise NotImplementedError('{} not implemented'.format(type)) |
| interpolatesv.requires_grad_(True) |
| disc_interpolates = netD(interpolatesv) |
| gradients = torch.autograd.grad(outputs=disc_interpolates, inputs=interpolatesv, |
| grad_outputs=torch.ones(disc_interpolates.size()).to(device), |
| create_graph=True, retain_graph=True, only_inputs=True) |
| gradients = gradients[0].view(real_data.size(0), -1) |
| gradient_penalty = (((gradients + 1e-16).norm(2, dim=1) - constant) ** 2).mean() * lambda_gp |
| return gradient_penalty, gradients |
| else: |
| return 0.0, None |
|
|
| def get_norm_layer(norm_type='instance'): |
| """Return a normalization layer |
| Parameters: |
| norm_type (str) -- the name of the normalization layer: batch | instance | none |
| For BatchNorm, we use learnable affine parameters and track running statistics (mean/stddev). |
| For InstanceNorm, we do not use learnable affine parameters. We do not track running statistics. |
| """ |
| if norm_type == 'batch': |
| norm_layer = functools.partial(nn.BatchNorm2d, affine=True, track_running_stats=True) |
| elif norm_type == 'instance': |
| norm_layer = functools.partial(nn.InstanceNorm2d, affine=False, track_running_stats=False) |
| elif norm_type == 'group': |
| norm_layer = functools.partial(nn.GroupNorm, 32) |
| elif norm_type == 'none': |
| norm_layer = None |
| else: |
| raise NotImplementedError('normalization layer [%s] is not found' % norm_type) |
| return norm_layer |
|
|
| class Flatten(nn.Module): |
| def forward(self, input): |
| return input.view(input.size(0), -1) |
|
|
| class ConvBlock(nn.Module): |
| def __init__(self, in_planes, out_planes, norm='batch'): |
| super(ConvBlock, self).__init__() |
| self.conv1 = conv3x3(in_planes, int(out_planes / 2)) |
| self.conv2 = conv3x3(int(out_planes / 2), int(out_planes / 4)) |
| self.conv3 = conv3x3(int(out_planes / 4), int(out_planes / 4)) |
|
|
| if norm == 'batch': |
| self.bn1 = nn.BatchNorm2d(in_planes) |
| self.bn2 = nn.BatchNorm2d(int(out_planes / 2)) |
| self.bn3 = nn.BatchNorm2d(int(out_planes / 4)) |
| self.bn4 = nn.BatchNorm2d(in_planes) |
| elif norm == 'group': |
| self.bn1 = nn.GroupNorm(32, in_planes) |
| self.bn2 = nn.GroupNorm(32, int(out_planes / 2)) |
| self.bn3 = nn.GroupNorm(32, int(out_planes / 4)) |
| self.bn4 = nn.GroupNorm(32, in_planes) |
| |
| if in_planes != out_planes: |
| self.downsample = nn.Sequential( |
| self.bn4, |
| nn.ReLU(True), |
| nn.Conv2d(in_planes, out_planes, |
| kernel_size=1, stride=1, bias=False), |
| ) |
| else: |
| self.downsample = None |
|
|
| def forward(self, x): |
| residual = x |
|
|
| out1 = self.bn1(x) |
| out1 = F.relu(out1, True) |
| out1 = self.conv1(out1) |
|
|
| out2 = self.bn2(out1) |
| out2 = F.relu(out2, True) |
| out2 = self.conv2(out2) |
|
|
| out3 = self.bn3(out2) |
| out3 = F.relu(out3, True) |
| out3 = self.conv3(out3) |
|
|
| out3 = torch.cat((out1, out2, out3), 1) |
|
|
| if self.downsample is not None: |
| residual = self.downsample(residual) |
|
|
| out3 += residual |
|
|
| return out3 |
| |