s3_net / scripts /model.py
zzuxzt's picture
Upload folder using huggingface_hub
d9c5371 verified
#!/usr/bin/env python
#
# file: $ISIP_EXP/SOGMP/scripts/model.py
#
# revision history: xzt
# 20220824 (TE): first version
#
# usage:
#
# This script hold the model architecture
#------------------------------------------------------------------------------
# import pytorch modules
#
from __future__ import print_function
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from collections import OrderedDict
# import modules
#
import os
import random
# for reproducibility, we seed the rng
#
SEED1 = 1337
NEW_LINE = "\n"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#-----------------------------------------------------------------------------
#
# helper functions are listed here
#
#-----------------------------------------------------------------------------
# function: set_seed
#
# arguments: seed - the seed for all the rng
#
# returns: none
#
# this method seeds all the random number generators and makes
# the results deterministic
#
def set_seed(seed):
#torch.manual_seed(seed)
#torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
#random.seed(seed)
#os.environ['PYTHONHASHSEED'] = str(seed)
#
# end of method
# calculate the angle of incidence of the lidar ray:
def angle_incidence_calculation(b, c, alpha, last_ray=False):
'''
# remove invalid values:
if(last_ray): # the last ray
if(np.isnan(b) or np.isinf(b)):
b = 60.
if(np.isnan(c) or np.isinf(c)):
c = 60.
else:
b[np.isnan(b)] = 60.
b[np.isinf(b)] = 60.
c[np.isnan(c)] = 60.
c[np.isinf(c)] = 60.
'''
# the law of cosines:
a = np.sqrt(b*b + c*c - 2*b*c*np.cos(alpha))
if(last_ray): # the last ray
beta = np.arccos([(a*a + c*c - b*b)/(2*a*c)])
theta = np.abs(np.pi/2 - beta)
else:
gamma = np.arccos([(a*a + b*b - c*c)/(2*a*b)])
theta = np.abs(np.pi/2 - gamma)
return theta
# function: get_data
#
# arguments: fp - file pointer
# num_feats - the number of features in a sample
#
# returns: data - the signals/features
# labels - the correct labels for them
#
# this method takes in a fp and returns the data and labels
POINTS = 1081
class VaeTestDataset(torch.utils.data.Dataset):
def __init__(self, img_path, file_name):
# initialize the data and labels
# read the names of image data:
self.scan_file_names = []
self.intensity_file_names = []
#self.vel_file_names = []
self.label_file_names = []
# parameters: data mean std: scan, intensity, angle of incidence:
# [[4.518406, 8.2914915], [3081.8167, 1529.4413]]
# [4.518406, 8.2914915], [3081.8167, 1529.4413], [0.5959513, 0.4783924]]
self.s_mu = 4.518406
self.s_std = 8.2914915
self.i_mu = 3081.8167
self.i_std = 1529.4413
self.a_mu = 0.5959513
self.a_std = 0.4783924
# open train.txt or dev.txt:
fp_folder = open(img_path+'dataset.txt','r')
# for each line of the file:
for folder_line in fp_folder.read().split(NEW_LINE):
if('-' in folder_line):
folder_path = folder_line
fp_file = open(img_path+folder_path+'/'+file_name+'.txt', 'r')
for line in fp_file.read().split(NEW_LINE):
if('.npy' in line):
self.scan_file_names.append(img_path+folder_path+'/scans_lidar/'+line)
self.intensity_file_names.append(img_path+folder_path+'/intensities_lidar/'+line)
#self.vel_file_names.append(img_path+folder_path+'/velocities/'+line)
self.label_file_names.append(img_path+folder_path+'/semantic_label/'+line)
# close txt file:
fp_file.close()
# close txt file:
fp_folder.close()
self.length = len(self.scan_file_names)
print("dataset length: ", self.length)
def __len__(self):
return self.length
def __getitem__(self, idx):
# get the index of start point:
scan = np.zeros((1, POINTS))
intensity = np.zeros((1, POINTS))
angle_incidence = np.zeros((1, POINTS))
label = np.zeros((1, POINTS))
# get the scan data:
intensity_name = self.intensity_file_names[idx]
intensity = np.load(intensity_name)
# get the scan data:
scan_name = self.scan_file_names[idx]
scan = np.load(scan_name)
# get the semantic label data:
label_name = self.label_file_names[idx]
label = np.load(label_name)
# get the angle of incidence of the ray:
b = scan[:-1]
c = scan[1:]
alpha = np.ones(POINTS - 1)*((270*np.pi / 180) / (POINTS - 1))
theta = angle_incidence_calculation(b, c, alpha)
# last ray:
b_last = scan[-2]
c_last = scan[-1]
alpha_last = (270*np.pi / 180) / (POINTS - 1)
theta_last = angle_incidence_calculation(b_last, c_last, alpha_last, last_ray=True)
angle_incidence = np.concatenate((theta[0], theta_last), axis=0)
# initialize:
scan[np.isnan(scan)] = 0.
scan[np.isinf(scan)] = 0.
intensity[np.isnan(intensity)] = 0.
intensity[np.isinf(intensity)] = 0.
angle_incidence[np.isnan(angle_incidence)] = 0.
angle_incidence[np.isinf(angle_incidence)] = 0.
label[np.isnan(label)] = 0.
label[np.isinf(label)] = 0.
# data normalization:
# standardization: scan
# mu: 4.518406, std: 8.2914915
scan = (scan - self.s_mu) / self.s_std
# standardization: intensity
# mu: 3081.8167, std: 1529.4413
intensity = (intensity - self.i_mu) / self.i_std
# standardization: angle_incidence
# mu: 0.5959513, std: 0.4783924
angle_incidence = (angle_incidence - self.a_mu) / self.a_std
# transfer to pytorch tensor:
scan_tensor = torch.FloatTensor(scan)
intensity_tensor = torch.FloatTensor(intensity)
angle_incidence_tensor = torch.FloatTensor(angle_incidence)
label_tensor = torch.FloatTensor(label)
data = {
'scan': scan_tensor,
'intensity': intensity_tensor,
'angle_incidence': angle_incidence_tensor,
'label': label_tensor,
}
return data
#
# end of function
#------------------------------------------------------------------------------
#
# the model is defined here
#
#------------------------------------------------------------------------------
# define the PyTorch VAE model
#
# define a VAE
# Residual blocks:
class Residual(nn.Module):
def __init__(self, in_channels, num_hiddens, num_residual_hiddens):
super(Residual, self).__init__()
self._block = nn.Sequential(
nn.ReLU(True),
nn.Conv1d(in_channels=in_channels,
out_channels=num_residual_hiddens,
kernel_size=3, stride=1, padding=1, bias=False),
nn.BatchNorm1d(num_residual_hiddens),
nn.ReLU(True),
nn.Conv1d(in_channels=num_residual_hiddens,
out_channels=num_hiddens,
kernel_size=1, stride=1, bias=False),
nn.BatchNorm1d(num_hiddens)
)
def forward(self, x):
return x + self._block(x)
class ResidualStack(nn.Module):
def __init__(self, in_channels, num_hiddens, num_residual_layers, num_residual_hiddens):
super(ResidualStack, self).__init__()
self._num_residual_layers = num_residual_layers
self._layers = nn.ModuleList([Residual(in_channels, num_hiddens, num_residual_hiddens)
for _ in range(self._num_residual_layers)])
def forward(self, x):
for i in range(self._num_residual_layers):
x = self._layers[i](x)
return F.relu(x)
# Encoder & Decoder Architecture:
# Encoder:
class Encoder(nn.Module):
def __init__(self, in_channels, num_hiddens, num_residual_layers, num_residual_hiddens):
super(Encoder, self).__init__()
self._conv_1 = nn.Sequential(*[
nn.Conv1d(in_channels=in_channels,
out_channels=num_hiddens//2,
kernel_size=4,
stride=2,
padding=1),
nn.BatchNorm1d(num_hiddens//2),
nn.ReLU(True)
])
self._conv_2 = nn.Sequential(*[
nn.Conv1d(in_channels=num_hiddens//2,
out_channels=num_hiddens,
kernel_size=4,
stride=2,
padding=1),
nn.BatchNorm1d(num_hiddens)
#nn.ReLU(True)
])
self._residual_stack = ResidualStack(in_channels=num_hiddens,
num_hiddens=num_hiddens,
num_residual_layers=num_residual_layers,
num_residual_hiddens=num_residual_hiddens)
def forward(self, inputs):
x = self._conv_1(inputs)
x = self._conv_2(x)
x = self._residual_stack(x)
return x
# Decoder:
class Decoder(nn.Module):
def __init__(self, out_channels, num_hiddens, num_residual_layers, num_residual_hiddens):
super(Decoder, self).__init__()
self._residual_stack = ResidualStack(in_channels=num_hiddens,
num_hiddens=num_hiddens,
num_residual_layers=num_residual_layers,
num_residual_hiddens=num_residual_hiddens)
self._conv_trans_2 = nn.Sequential(*[
nn.ReLU(True),
nn.ConvTranspose1d(in_channels=num_hiddens,
out_channels=num_hiddens//2,
kernel_size=4,
stride=2,
padding=1),
nn.BatchNorm1d(num_hiddens//2),
nn.ReLU(True)
])
self._conv_trans_1 = nn.Sequential(*[
nn.ConvTranspose1d(in_channels=num_hiddens//2,
out_channels=num_hiddens//2,
kernel_size=4,
stride=2,
padding=1,
output_padding=1),
nn.BatchNorm1d(num_hiddens//2),
nn.ReLU(True),
nn.Conv1d(in_channels=num_hiddens//2,
out_channels=out_channels,
kernel_size=3,
stride=1,
padding=1),
#nn.Sigmoid()
])
def forward(self, inputs):
x = self._residual_stack(inputs)
x = self._conv_trans_2(x)
x = self._conv_trans_1(x)
return x
class VAE_Encoder(nn.Module):
def __init__(self, input_channel, num_hiddens, num_residual_layers, num_residual_hiddens, embedding_dim):
super(VAE_Encoder, self).__init__()
# parameters:
self.input_channels = input_channel
'''
# Constants
num_hiddens = 128 #128
num_residual_hiddens = 64 #32
num_residual_layers = 2
embedding_dim = 2 #64
'''
# encoder:
in_channels = input_channel
self._encoder = Encoder(in_channels,
num_hiddens,
num_residual_layers,
num_residual_hiddens)
# z latent variable:
self._encoder_z_mu = nn.Conv1d(in_channels=num_hiddens,
out_channels=embedding_dim,
kernel_size=1,
stride=1)
self._encoder_z_log_sd = nn.Conv1d(in_channels=num_hiddens,
out_channels=embedding_dim,
kernel_size=1,
stride=1)
def forward(self, x):
# input reshape:
x = x.reshape(-1, self.input_channels, POINTS)
# Encoder:
encoder_out = self._encoder(x)
# get `mu` and `log_var`:
z_mu = self._encoder_z_mu(encoder_out)
z_log_sd = self._encoder_z_log_sd(encoder_out)
return z_mu, z_log_sd
# our proposed model:
class S3Net(nn.Module):
def __init__(self, input_channels, output_channels):
super(S3Net, self).__init__()
# parameters:
self.input_channels = input_channels
self.latent_dim = 270
self.output_channels = output_channels
# Constants
num_hiddens = 64 #128
num_residual_hiddens = 32 #64
num_residual_layers = 2
embedding_dim = 1 #2
# prediction encoder:
self._encoder = VAE_Encoder(self.input_channels,
num_hiddens,
num_residual_layers,
num_residual_hiddens,
embedding_dim)
# decoder:
self._decoder_z_mu = nn.ConvTranspose1d(in_channels=embedding_dim,
out_channels=num_hiddens,
kernel_size=1,
stride=1)
self._decoder = Decoder(self.output_channels,
num_hiddens,
num_residual_layers,
num_residual_hiddens)
self.softmax = nn.Softmax(dim=1)
def vae_reparameterize(self, z_mu, z_log_sd):
"""
:param mu: mean from the encoder's latent space
:param log_sd: log standard deviation from the encoder's latent space
:output: reparameterized latent variable z, Monte carlo KL divergence
"""
# reshape:
z_mu = z_mu.reshape(-1, self.latent_dim, 1)
z_log_sd = z_log_sd.reshape(-1, self.latent_dim, 1)
# define the z probabilities (in this case Normal for both)
# p(z): N(z|0,I)
pz = torch.distributions.Normal(loc=torch.zeros_like(z_mu), scale=torch.ones_like(z_log_sd))
# q(z|x,phi): N(z|mu, z_var)
qz_x = torch.distributions.Normal(loc=z_mu, scale=torch.exp(z_log_sd))
# repameterization trick: z = z_mu + xi (*) z_log_var, xi~N(xi|0,I)
z = qz_x.rsample()
# Monte Carlo KL divergence: MCKL(p(z)||q(z|x,phi)) = log(p(z)) - log(q(z|x,phi))
# sum over weight dim, leaves the batch dim
kl_divergence = (pz.log_prob(z) - qz_x.log_prob(z)).sum(dim=1)
kl_loss = -kl_divergence.mean()
return z, kl_loss
def forward(self, x_s, x_i, x_a):
"""
Forward pass `input_img` through the network
"""
# reconstruction:
# encode:
# input reshape:
x_s = x_s.reshape(-1, 1, POINTS)
x_i = x_i.reshape(-1, 1, POINTS)
x_a = x_a.reshape(-1, 1, POINTS)
# concatenate along channel axis
x = torch.cat([x_s, x_i, x_a], dim=1)
# encode:
z_mu, z_log_sd = self._encoder(x)
# get the latent vector through reparameterization:
z, kl_loss = self.vae_reparameterize(z_mu, z_log_sd)
# decode:
# reshape:
z = z.reshape(-1, 1, 270)
x_d = self._decoder_z_mu(z)
semantic_channels = self._decoder(x_d)
# semantic grid: 10 channels
semantic_scan = self.softmax(semantic_channels)
return semantic_scan, semantic_channels, kl_loss
#
# end of class
#
# end of file