|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from __future__ import print_function |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import torch.nn.functional as F |
|
|
import numpy as np |
|
|
from collections import OrderedDict |
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import random |
|
|
|
|
|
|
|
|
|
|
|
SEED1 = 1337 |
|
|
NEW_LINE = "\n" |
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def set_seed(seed): |
|
|
|
|
|
|
|
|
torch.backends.cudnn.deterministic = True |
|
|
torch.backends.cudnn.benchmark = False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def angle_incidence_calculation(b, c, alpha, last_ray=False): |
|
|
''' |
|
|
# remove invalid values: |
|
|
if(last_ray): # the last ray |
|
|
if(np.isnan(b) or np.isinf(b)): |
|
|
b = 60. |
|
|
if(np.isnan(c) or np.isinf(c)): |
|
|
c = 60. |
|
|
else: |
|
|
b[np.isnan(b)] = 60. |
|
|
b[np.isinf(b)] = 60. |
|
|
c[np.isnan(c)] = 60. |
|
|
c[np.isinf(c)] = 60. |
|
|
''' |
|
|
|
|
|
a = np.sqrt(b*b + c*c - 2*b*c*np.cos(alpha)) |
|
|
if(last_ray): |
|
|
beta = np.arccos([(a*a + c*c - b*b)/(2*a*c)]) |
|
|
theta = np.abs(np.pi/2 - beta) |
|
|
else: |
|
|
gamma = np.arccos([(a*a + b*b - c*c)/(2*a*b)]) |
|
|
theta = np.abs(np.pi/2 - gamma) |
|
|
|
|
|
return theta |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
POINTS = 1081 |
|
|
class VaeTestDataset(torch.utils.data.Dataset): |
|
|
def __init__(self, img_path, file_name): |
|
|
|
|
|
|
|
|
self.scan_file_names = [] |
|
|
self.intensity_file_names = [] |
|
|
|
|
|
self.label_file_names = [] |
|
|
|
|
|
|
|
|
|
|
|
self.s_mu = 4.518406 |
|
|
self.s_std = 8.2914915 |
|
|
self.i_mu = 3081.8167 |
|
|
self.i_std = 1529.4413 |
|
|
self.a_mu = 0.5959513 |
|
|
self.a_std = 0.4783924 |
|
|
|
|
|
fp_folder = open(img_path+'dataset.txt','r') |
|
|
|
|
|
|
|
|
for folder_line in fp_folder.read().split(NEW_LINE): |
|
|
if('-' in folder_line): |
|
|
folder_path = folder_line |
|
|
fp_file = open(img_path+folder_path+'/'+file_name+'.txt', 'r') |
|
|
for line in fp_file.read().split(NEW_LINE): |
|
|
if('.npy' in line): |
|
|
self.scan_file_names.append(img_path+folder_path+'/scans_lidar/'+line) |
|
|
self.intensity_file_names.append(img_path+folder_path+'/intensities_lidar/'+line) |
|
|
|
|
|
self.label_file_names.append(img_path+folder_path+'/semantic_label/'+line) |
|
|
|
|
|
fp_file.close() |
|
|
|
|
|
|
|
|
fp_folder.close() |
|
|
|
|
|
self.length = len(self.scan_file_names) |
|
|
|
|
|
print("dataset length: ", self.length) |
|
|
|
|
|
|
|
|
def __len__(self): |
|
|
return self.length |
|
|
|
|
|
def __getitem__(self, idx): |
|
|
|
|
|
scan = np.zeros((1, POINTS)) |
|
|
intensity = np.zeros((1, POINTS)) |
|
|
angle_incidence = np.zeros((1, POINTS)) |
|
|
label = np.zeros((1, POINTS)) |
|
|
|
|
|
|
|
|
intensity_name = self.intensity_file_names[idx] |
|
|
intensity = np.load(intensity_name) |
|
|
|
|
|
|
|
|
scan_name = self.scan_file_names[idx] |
|
|
scan = np.load(scan_name) |
|
|
|
|
|
|
|
|
label_name = self.label_file_names[idx] |
|
|
label = np.load(label_name) |
|
|
|
|
|
|
|
|
b = scan[:-1] |
|
|
c = scan[1:] |
|
|
alpha = np.ones(POINTS - 1)*((270*np.pi / 180) / (POINTS - 1)) |
|
|
theta = angle_incidence_calculation(b, c, alpha) |
|
|
|
|
|
b_last = scan[-2] |
|
|
c_last = scan[-1] |
|
|
alpha_last = (270*np.pi / 180) / (POINTS - 1) |
|
|
theta_last = angle_incidence_calculation(b_last, c_last, alpha_last, last_ray=True) |
|
|
angle_incidence = np.concatenate((theta[0], theta_last), axis=0) |
|
|
|
|
|
|
|
|
scan[np.isnan(scan)] = 0. |
|
|
scan[np.isinf(scan)] = 0. |
|
|
|
|
|
intensity[np.isnan(intensity)] = 0. |
|
|
intensity[np.isinf(intensity)] = 0. |
|
|
|
|
|
angle_incidence[np.isnan(angle_incidence)] = 0. |
|
|
angle_incidence[np.isinf(angle_incidence)] = 0. |
|
|
|
|
|
label[np.isnan(label)] = 0. |
|
|
label[np.isinf(label)] = 0. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
scan = (scan - self.s_mu) / self.s_std |
|
|
|
|
|
|
|
|
|
|
|
intensity = (intensity - self.i_mu) / self.i_std |
|
|
|
|
|
|
|
|
|
|
|
angle_incidence = (angle_incidence - self.a_mu) / self.a_std |
|
|
|
|
|
|
|
|
scan_tensor = torch.FloatTensor(scan) |
|
|
intensity_tensor = torch.FloatTensor(intensity) |
|
|
angle_incidence_tensor = torch.FloatTensor(angle_incidence) |
|
|
label_tensor = torch.FloatTensor(label) |
|
|
|
|
|
data = { |
|
|
'scan': scan_tensor, |
|
|
'intensity': intensity_tensor, |
|
|
'angle_incidence': angle_incidence_tensor, |
|
|
'label': label_tensor, |
|
|
} |
|
|
|
|
|
return data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Residual(nn.Module): |
|
|
def __init__(self, in_channels, num_hiddens, num_residual_hiddens): |
|
|
super(Residual, self).__init__() |
|
|
self._block = nn.Sequential( |
|
|
nn.ReLU(True), |
|
|
nn.Conv1d(in_channels=in_channels, |
|
|
out_channels=num_residual_hiddens, |
|
|
kernel_size=3, stride=1, padding=1, bias=False), |
|
|
nn.BatchNorm1d(num_residual_hiddens), |
|
|
nn.ReLU(True), |
|
|
nn.Conv1d(in_channels=num_residual_hiddens, |
|
|
out_channels=num_hiddens, |
|
|
kernel_size=1, stride=1, bias=False), |
|
|
nn.BatchNorm1d(num_hiddens) |
|
|
) |
|
|
|
|
|
def forward(self, x): |
|
|
return x + self._block(x) |
|
|
|
|
|
class ResidualStack(nn.Module): |
|
|
def __init__(self, in_channels, num_hiddens, num_residual_layers, num_residual_hiddens): |
|
|
super(ResidualStack, self).__init__() |
|
|
self._num_residual_layers = num_residual_layers |
|
|
self._layers = nn.ModuleList([Residual(in_channels, num_hiddens, num_residual_hiddens) |
|
|
for _ in range(self._num_residual_layers)]) |
|
|
|
|
|
def forward(self, x): |
|
|
for i in range(self._num_residual_layers): |
|
|
x = self._layers[i](x) |
|
|
return F.relu(x) |
|
|
|
|
|
|
|
|
|
|
|
class Encoder(nn.Module): |
|
|
def __init__(self, in_channels, num_hiddens, num_residual_layers, num_residual_hiddens): |
|
|
super(Encoder, self).__init__() |
|
|
self._conv_1 = nn.Sequential(*[ |
|
|
nn.Conv1d(in_channels=in_channels, |
|
|
out_channels=num_hiddens//2, |
|
|
kernel_size=4, |
|
|
stride=2, |
|
|
padding=1), |
|
|
nn.BatchNorm1d(num_hiddens//2), |
|
|
nn.ReLU(True) |
|
|
]) |
|
|
self._conv_2 = nn.Sequential(*[ |
|
|
nn.Conv1d(in_channels=num_hiddens//2, |
|
|
out_channels=num_hiddens, |
|
|
kernel_size=4, |
|
|
stride=2, |
|
|
padding=1), |
|
|
nn.BatchNorm1d(num_hiddens) |
|
|
|
|
|
]) |
|
|
self._residual_stack = ResidualStack(in_channels=num_hiddens, |
|
|
num_hiddens=num_hiddens, |
|
|
num_residual_layers=num_residual_layers, |
|
|
num_residual_hiddens=num_residual_hiddens) |
|
|
|
|
|
def forward(self, inputs): |
|
|
x = self._conv_1(inputs) |
|
|
x = self._conv_2(x) |
|
|
x = self._residual_stack(x) |
|
|
return x |
|
|
|
|
|
|
|
|
class Decoder(nn.Module): |
|
|
def __init__(self, out_channels, num_hiddens, num_residual_layers, num_residual_hiddens): |
|
|
super(Decoder, self).__init__() |
|
|
|
|
|
self._residual_stack = ResidualStack(in_channels=num_hiddens, |
|
|
num_hiddens=num_hiddens, |
|
|
num_residual_layers=num_residual_layers, |
|
|
num_residual_hiddens=num_residual_hiddens) |
|
|
|
|
|
self._conv_trans_2 = nn.Sequential(*[ |
|
|
nn.ReLU(True), |
|
|
nn.ConvTranspose1d(in_channels=num_hiddens, |
|
|
out_channels=num_hiddens//2, |
|
|
kernel_size=4, |
|
|
stride=2, |
|
|
padding=1), |
|
|
nn.BatchNorm1d(num_hiddens//2), |
|
|
nn.ReLU(True) |
|
|
]) |
|
|
|
|
|
self._conv_trans_1 = nn.Sequential(*[ |
|
|
nn.ConvTranspose1d(in_channels=num_hiddens//2, |
|
|
out_channels=num_hiddens//2, |
|
|
kernel_size=4, |
|
|
stride=2, |
|
|
padding=1, |
|
|
output_padding=1), |
|
|
nn.BatchNorm1d(num_hiddens//2), |
|
|
nn.ReLU(True), |
|
|
nn.Conv1d(in_channels=num_hiddens//2, |
|
|
out_channels=out_channels, |
|
|
kernel_size=3, |
|
|
stride=1, |
|
|
padding=1), |
|
|
|
|
|
]) |
|
|
|
|
|
def forward(self, inputs): |
|
|
x = self._residual_stack(inputs) |
|
|
x = self._conv_trans_2(x) |
|
|
x = self._conv_trans_1(x) |
|
|
return x |
|
|
|
|
|
class VAE_Encoder(nn.Module): |
|
|
def __init__(self, input_channel, num_hiddens, num_residual_layers, num_residual_hiddens, embedding_dim): |
|
|
super(VAE_Encoder, self).__init__() |
|
|
|
|
|
self.input_channels = input_channel |
|
|
''' |
|
|
# Constants |
|
|
num_hiddens = 128 #128 |
|
|
num_residual_hiddens = 64 #32 |
|
|
num_residual_layers = 2 |
|
|
embedding_dim = 2 #64 |
|
|
''' |
|
|
|
|
|
|
|
|
in_channels = input_channel |
|
|
self._encoder = Encoder(in_channels, |
|
|
num_hiddens, |
|
|
num_residual_layers, |
|
|
num_residual_hiddens) |
|
|
|
|
|
|
|
|
self._encoder_z_mu = nn.Conv1d(in_channels=num_hiddens, |
|
|
out_channels=embedding_dim, |
|
|
kernel_size=1, |
|
|
stride=1) |
|
|
self._encoder_z_log_sd = nn.Conv1d(in_channels=num_hiddens, |
|
|
out_channels=embedding_dim, |
|
|
kernel_size=1, |
|
|
stride=1) |
|
|
|
|
|
def forward(self, x): |
|
|
|
|
|
x = x.reshape(-1, self.input_channels, POINTS) |
|
|
|
|
|
encoder_out = self._encoder(x) |
|
|
|
|
|
z_mu = self._encoder_z_mu(encoder_out) |
|
|
z_log_sd = self._encoder_z_log_sd(encoder_out) |
|
|
return z_mu, z_log_sd |
|
|
|
|
|
|
|
|
class S3Net(nn.Module): |
|
|
def __init__(self, input_channels, output_channels): |
|
|
super(S3Net, self).__init__() |
|
|
|
|
|
self.input_channels = input_channels |
|
|
self.latent_dim = 270 |
|
|
self.output_channels = output_channels |
|
|
|
|
|
|
|
|
num_hiddens = 64 |
|
|
num_residual_hiddens = 32 |
|
|
num_residual_layers = 2 |
|
|
embedding_dim = 1 |
|
|
|
|
|
|
|
|
self._encoder = VAE_Encoder(self.input_channels, |
|
|
num_hiddens, |
|
|
num_residual_layers, |
|
|
num_residual_hiddens, |
|
|
embedding_dim) |
|
|
|
|
|
|
|
|
self._decoder_z_mu = nn.ConvTranspose1d(in_channels=embedding_dim, |
|
|
out_channels=num_hiddens, |
|
|
kernel_size=1, |
|
|
stride=1) |
|
|
self._decoder = Decoder(self.output_channels, |
|
|
num_hiddens, |
|
|
num_residual_layers, |
|
|
num_residual_hiddens) |
|
|
|
|
|
self.softmax = nn.Softmax(dim=1) |
|
|
|
|
|
|
|
|
|
|
|
def vae_reparameterize(self, z_mu, z_log_sd): |
|
|
""" |
|
|
:param mu: mean from the encoder's latent space |
|
|
:param log_sd: log standard deviation from the encoder's latent space |
|
|
:output: reparameterized latent variable z, Monte carlo KL divergence |
|
|
""" |
|
|
|
|
|
z_mu = z_mu.reshape(-1, self.latent_dim, 1) |
|
|
z_log_sd = z_log_sd.reshape(-1, self.latent_dim, 1) |
|
|
|
|
|
|
|
|
pz = torch.distributions.Normal(loc=torch.zeros_like(z_mu), scale=torch.ones_like(z_log_sd)) |
|
|
|
|
|
qz_x = torch.distributions.Normal(loc=z_mu, scale=torch.exp(z_log_sd)) |
|
|
|
|
|
|
|
|
z = qz_x.rsample() |
|
|
|
|
|
|
|
|
kl_divergence = (pz.log_prob(z) - qz_x.log_prob(z)).sum(dim=1) |
|
|
kl_loss = -kl_divergence.mean() |
|
|
|
|
|
return z, kl_loss |
|
|
|
|
|
def forward(self, x_s, x_i, x_a): |
|
|
""" |
|
|
Forward pass `input_img` through the network |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
x_s = x_s.reshape(-1, 1, POINTS) |
|
|
x_i = x_i.reshape(-1, 1, POINTS) |
|
|
x_a = x_a.reshape(-1, 1, POINTS) |
|
|
|
|
|
x = torch.cat([x_s, x_i, x_a], dim=1) |
|
|
|
|
|
|
|
|
z_mu, z_log_sd = self._encoder(x) |
|
|
|
|
|
|
|
|
z, kl_loss = self.vae_reparameterize(z_mu, z_log_sd) |
|
|
|
|
|
|
|
|
|
|
|
z = z.reshape(-1, 1, 270) |
|
|
x_d = self._decoder_z_mu(z) |
|
|
semantic_channels = self._decoder(x_d) |
|
|
|
|
|
|
|
|
semantic_scan = self.softmax(semantic_channels) |
|
|
|
|
|
return semantic_scan, semantic_channels, kl_loss |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|