#!/usr/bin/env python # # file: $ISIP_EXP/SOGMP/scripts/model.py # # revision history: xzt # 20220824 (TE): first version # # usage: # # This script hold the model architecture #------------------------------------------------------------------------------ # import pytorch modules # from __future__ import print_function import torch import torch.nn as nn import torch.nn.functional as F import numpy as np from collections import OrderedDict # import modules # import os import random # for reproducibility, we seed the rng # SEED1 = 1337 NEW_LINE = "\n" device = torch.device("cuda" if torch.cuda.is_available() else "cpu") #----------------------------------------------------------------------------- # # helper functions are listed here # #----------------------------------------------------------------------------- # function: set_seed # # arguments: seed - the seed for all the rng # # returns: none # # this method seeds all the random number generators and makes # the results deterministic # def set_seed(seed): #torch.manual_seed(seed) #torch.cuda.manual_seed_all(seed) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False #random.seed(seed) #os.environ['PYTHONHASHSEED'] = str(seed) # # end of method # calculate the angle of incidence of the lidar ray: def angle_incidence_calculation(b, c, alpha, last_ray=False): ''' # remove invalid values: if(last_ray): # the last ray if(np.isnan(b) or np.isinf(b)): b = 60. if(np.isnan(c) or np.isinf(c)): c = 60. else: b[np.isnan(b)] = 60. b[np.isinf(b)] = 60. c[np.isnan(c)] = 60. c[np.isinf(c)] = 60. ''' # the law of cosines: a = np.sqrt(b*b + c*c - 2*b*c*np.cos(alpha)) if(last_ray): # the last ray beta = np.arccos([(a*a + c*c - b*b)/(2*a*c)]) theta = np.abs(np.pi/2 - beta) else: gamma = np.arccos([(a*a + b*b - c*c)/(2*a*b)]) theta = np.abs(np.pi/2 - gamma) return theta # function: get_data # # arguments: fp - file pointer # num_feats - the number of features in a sample # # returns: data - the signals/features # labels - the correct labels for them # # this method takes in a fp and returns the data and labels POINTS = 1081 class VaeTestDataset(torch.utils.data.Dataset): def __init__(self, img_path, file_name): # initialize the data and labels # read the names of image data: self.scan_file_names = [] self.intensity_file_names = [] #self.vel_file_names = [] self.label_file_names = [] # parameters: data mean std: scan, intensity, angle of incidence: # [[4.518406, 8.2914915], [3081.8167, 1529.4413]] # [4.518406, 8.2914915], [3081.8167, 1529.4413], [0.5959513, 0.4783924]] self.s_mu = 4.518406 self.s_std = 8.2914915 self.i_mu = 3081.8167 self.i_std = 1529.4413 self.a_mu = 0.5959513 self.a_std = 0.4783924 # open train.txt or dev.txt: fp_folder = open(img_path+'dataset.txt','r') # for each line of the file: for folder_line in fp_folder.read().split(NEW_LINE): if('-' in folder_line): folder_path = folder_line fp_file = open(img_path+folder_path+'/'+file_name+'.txt', 'r') for line in fp_file.read().split(NEW_LINE): if('.npy' in line): self.scan_file_names.append(img_path+folder_path+'/scans_lidar/'+line) self.intensity_file_names.append(img_path+folder_path+'/intensities_lidar/'+line) #self.vel_file_names.append(img_path+folder_path+'/velocities/'+line) self.label_file_names.append(img_path+folder_path+'/semantic_label/'+line) # close txt file: fp_file.close() # close txt file: fp_folder.close() self.length = len(self.scan_file_names) print("dataset length: ", self.length) def __len__(self): return self.length def __getitem__(self, idx): # get the index of start point: scan = np.zeros((1, POINTS)) intensity = np.zeros((1, POINTS)) angle_incidence = np.zeros((1, POINTS)) label = np.zeros((1, POINTS)) # get the scan data: intensity_name = self.intensity_file_names[idx] intensity = np.load(intensity_name) # get the scan data: scan_name = self.scan_file_names[idx] scan = np.load(scan_name) # get the semantic label data: label_name = self.label_file_names[idx] label = np.load(label_name) # get the angle of incidence of the ray: b = scan[:-1] c = scan[1:] alpha = np.ones(POINTS - 1)*((270*np.pi / 180) / (POINTS - 1)) theta = angle_incidence_calculation(b, c, alpha) # last ray: b_last = scan[-2] c_last = scan[-1] alpha_last = (270*np.pi / 180) / (POINTS - 1) theta_last = angle_incidence_calculation(b_last, c_last, alpha_last, last_ray=True) angle_incidence = np.concatenate((theta[0], theta_last), axis=0) # initialize: scan[np.isnan(scan)] = 0. scan[np.isinf(scan)] = 0. intensity[np.isnan(intensity)] = 0. intensity[np.isinf(intensity)] = 0. angle_incidence[np.isnan(angle_incidence)] = 0. angle_incidence[np.isinf(angle_incidence)] = 0. label[np.isnan(label)] = 0. label[np.isinf(label)] = 0. # data normalization: # standardization: scan # mu: 4.518406, std: 8.2914915 scan = (scan - self.s_mu) / self.s_std # standardization: intensity # mu: 3081.8167, std: 1529.4413 intensity = (intensity - self.i_mu) / self.i_std # standardization: angle_incidence # mu: 0.5959513, std: 0.4783924 angle_incidence = (angle_incidence - self.a_mu) / self.a_std # transfer to pytorch tensor: scan_tensor = torch.FloatTensor(scan) intensity_tensor = torch.FloatTensor(intensity) angle_incidence_tensor = torch.FloatTensor(angle_incidence) label_tensor = torch.FloatTensor(label) data = { 'scan': scan_tensor, 'intensity': intensity_tensor, 'angle_incidence': angle_incidence_tensor, 'label': label_tensor, } return data # # end of function #------------------------------------------------------------------------------ # # the model is defined here # #------------------------------------------------------------------------------ # define the PyTorch VAE model # # define a VAE # Residual blocks: class Residual(nn.Module): def __init__(self, in_channels, num_hiddens, num_residual_hiddens): super(Residual, self).__init__() self._block = nn.Sequential( nn.ReLU(True), nn.Conv1d(in_channels=in_channels, out_channels=num_residual_hiddens, kernel_size=3, stride=1, padding=1, bias=False), nn.BatchNorm1d(num_residual_hiddens), nn.ReLU(True), nn.Conv1d(in_channels=num_residual_hiddens, out_channels=num_hiddens, kernel_size=1, stride=1, bias=False), nn.BatchNorm1d(num_hiddens) ) def forward(self, x): return x + self._block(x) class ResidualStack(nn.Module): def __init__(self, in_channels, num_hiddens, num_residual_layers, num_residual_hiddens): super(ResidualStack, self).__init__() self._num_residual_layers = num_residual_layers self._layers = nn.ModuleList([Residual(in_channels, num_hiddens, num_residual_hiddens) for _ in range(self._num_residual_layers)]) def forward(self, x): for i in range(self._num_residual_layers): x = self._layers[i](x) return F.relu(x) # Encoder & Decoder Architecture: # Encoder: class Encoder(nn.Module): def __init__(self, in_channels, num_hiddens, num_residual_layers, num_residual_hiddens): super(Encoder, self).__init__() self._conv_1 = nn.Sequential(*[ nn.Conv1d(in_channels=in_channels, out_channels=num_hiddens//2, kernel_size=4, stride=2, padding=1), nn.BatchNorm1d(num_hiddens//2), nn.ReLU(True) ]) self._conv_2 = nn.Sequential(*[ nn.Conv1d(in_channels=num_hiddens//2, out_channels=num_hiddens, kernel_size=4, stride=2, padding=1), nn.BatchNorm1d(num_hiddens) #nn.ReLU(True) ]) self._residual_stack = ResidualStack(in_channels=num_hiddens, num_hiddens=num_hiddens, num_residual_layers=num_residual_layers, num_residual_hiddens=num_residual_hiddens) def forward(self, inputs): x = self._conv_1(inputs) x = self._conv_2(x) x = self._residual_stack(x) return x # Decoder: class Decoder(nn.Module): def __init__(self, out_channels, num_hiddens, num_residual_layers, num_residual_hiddens): super(Decoder, self).__init__() self._residual_stack = ResidualStack(in_channels=num_hiddens, num_hiddens=num_hiddens, num_residual_layers=num_residual_layers, num_residual_hiddens=num_residual_hiddens) self._conv_trans_2 = nn.Sequential(*[ nn.ReLU(True), nn.ConvTranspose1d(in_channels=num_hiddens, out_channels=num_hiddens//2, kernel_size=4, stride=2, padding=1), nn.BatchNorm1d(num_hiddens//2), nn.ReLU(True) ]) self._conv_trans_1 = nn.Sequential(*[ nn.ConvTranspose1d(in_channels=num_hiddens//2, out_channels=num_hiddens//2, kernel_size=4, stride=2, padding=1, output_padding=1), nn.BatchNorm1d(num_hiddens//2), nn.ReLU(True), nn.Conv1d(in_channels=num_hiddens//2, out_channels=out_channels, kernel_size=3, stride=1, padding=1), #nn.Sigmoid() ]) def forward(self, inputs): x = self._residual_stack(inputs) x = self._conv_trans_2(x) x = self._conv_trans_1(x) return x class VAE_Encoder(nn.Module): def __init__(self, input_channel, num_hiddens, num_residual_layers, num_residual_hiddens, embedding_dim): super(VAE_Encoder, self).__init__() # parameters: self.input_channels = input_channel ''' # Constants num_hiddens = 128 #128 num_residual_hiddens = 64 #32 num_residual_layers = 2 embedding_dim = 2 #64 ''' # encoder: in_channels = input_channel self._encoder = Encoder(in_channels, num_hiddens, num_residual_layers, num_residual_hiddens) # z latent variable: self._encoder_z_mu = nn.Conv1d(in_channels=num_hiddens, out_channels=embedding_dim, kernel_size=1, stride=1) self._encoder_z_log_sd = nn.Conv1d(in_channels=num_hiddens, out_channels=embedding_dim, kernel_size=1, stride=1) def forward(self, x): # input reshape: x = x.reshape(-1, self.input_channels, POINTS) # Encoder: encoder_out = self._encoder(x) # get `mu` and `log_var`: z_mu = self._encoder_z_mu(encoder_out) z_log_sd = self._encoder_z_log_sd(encoder_out) return z_mu, z_log_sd # our proposed model: class S3Net(nn.Module): def __init__(self, input_channels, output_channels): super(S3Net, self).__init__() # parameters: self.input_channels = input_channels self.latent_dim = 270 self.output_channels = output_channels # Constants num_hiddens = 64 #128 num_residual_hiddens = 32 #64 num_residual_layers = 2 embedding_dim = 1 #2 # prediction encoder: self._encoder = VAE_Encoder(self.input_channels, num_hiddens, num_residual_layers, num_residual_hiddens, embedding_dim) # decoder: self._decoder_z_mu = nn.ConvTranspose1d(in_channels=embedding_dim, out_channels=num_hiddens, kernel_size=1, stride=1) self._decoder = Decoder(self.output_channels, num_hiddens, num_residual_layers, num_residual_hiddens) self.softmax = nn.Softmax(dim=1) def vae_reparameterize(self, z_mu, z_log_sd): """ :param mu: mean from the encoder's latent space :param log_sd: log standard deviation from the encoder's latent space :output: reparameterized latent variable z, Monte carlo KL divergence """ # reshape: z_mu = z_mu.reshape(-1, self.latent_dim, 1) z_log_sd = z_log_sd.reshape(-1, self.latent_dim, 1) # define the z probabilities (in this case Normal for both) # p(z): N(z|0,I) pz = torch.distributions.Normal(loc=torch.zeros_like(z_mu), scale=torch.ones_like(z_log_sd)) # q(z|x,phi): N(z|mu, z_var) qz_x = torch.distributions.Normal(loc=z_mu, scale=torch.exp(z_log_sd)) # repameterization trick: z = z_mu + xi (*) z_log_var, xi~N(xi|0,I) z = qz_x.rsample() # Monte Carlo KL divergence: MCKL(p(z)||q(z|x,phi)) = log(p(z)) - log(q(z|x,phi)) # sum over weight dim, leaves the batch dim kl_divergence = (pz.log_prob(z) - qz_x.log_prob(z)).sum(dim=1) kl_loss = -kl_divergence.mean() return z, kl_loss def forward(self, x_s, x_i, x_a): """ Forward pass `input_img` through the network """ # reconstruction: # encode: # input reshape: x_s = x_s.reshape(-1, 1, POINTS) x_i = x_i.reshape(-1, 1, POINTS) x_a = x_a.reshape(-1, 1, POINTS) # concatenate along channel axis x = torch.cat([x_s, x_i, x_a], dim=1) # encode: z_mu, z_log_sd = self._encoder(x) # get the latent vector through reparameterization: z, kl_loss = self.vae_reparameterize(z_mu, z_log_sd) # decode: # reshape: z = z.reshape(-1, 1, 270) x_d = self._decoder_z_mu(z) semantic_channels = self._decoder(x_d) # semantic grid: 10 channels semantic_scan = self.softmax(semantic_channels) return semantic_scan, semantic_channels, kl_loss # # end of class # # end of file