LARRES / utilpack /e3dlstm_modules.py
Staty's picture
Upload 50 files
2b21abc verified
import torch
import torch.nn as nn
import torch.nn.functional as F
class tf_Conv3d(nn.Module):
def __init__(self, in_channels, out_channels, *vargs, **kwargs):
super(tf_Conv3d, self).__init__()
self.conv3d = nn.Conv3d(in_channels, out_channels, *vargs, **kwargs)
def forward(self, input):
return F.interpolate(self.conv3d(input), size=input.shape[-3:], mode="nearest")
class Eidetic3DLSTMCell(nn.Module):
def __init__(self, in_channel, num_hidden, window_length,
height, width, filter_size, stride, layer_norm):
super(Eidetic3DLSTMCell, self).__init__()
self._norm_c_t = nn.LayerNorm([num_hidden, window_length, height, width])
self.num_hidden = num_hidden
self.padding = (0, filter_size[1] // 2, filter_size[2] // 2)
self._forget_bias = 1.0
if layer_norm:
self.conv_x = nn.Sequential(
tf_Conv3d(in_channel, num_hidden * 7, kernel_size=filter_size,
stride=stride, padding=self.padding, bias=False),
nn.LayerNorm([num_hidden * 7, window_length, height, width])
)
self.conv_h = nn.Sequential(
tf_Conv3d(num_hidden, num_hidden * 4, kernel_size=filter_size,
stride=stride, padding=self.padding, bias=False),
nn.LayerNorm([num_hidden * 4, window_length, height, width])
)
self.conv_gm = nn.Sequential(
tf_Conv3d(num_hidden, num_hidden * 4, kernel_size=filter_size,
stride=stride, padding=self.padding, bias=False),
nn.LayerNorm([num_hidden * 4, window_length, height, width])
)
self.conv_new_cell = nn.Sequential(
tf_Conv3d(num_hidden, num_hidden, kernel_size=filter_size,
stride=stride, padding=self.padding, bias=False),
nn.LayerNorm([num_hidden, window_length, height, width])
)
self.conv_new_gm = nn.Sequential(
tf_Conv3d(num_hidden, num_hidden, kernel_size=filter_size,
stride=stride, padding=self.padding, bias=False),
nn.LayerNorm([num_hidden, window_length, height, width])
)
else:
self.conv_x = nn.Sequential(
tf_Conv3d(in_channel, num_hidden * 7, kernel_size=filter_size,
stride=stride, padding=self.padding, bias=False),
)
self.conv_h = nn.Sequential(
tf_Conv3d(num_hidden, num_hidden * 4, kernel_size=filter_size,
stride=stride, padding=self.padding, bias=False),
)
self.conv_gm = nn.Sequential(
tf_Conv3d(num_hidden, num_hidden * 4, kernel_size=filter_size,
stride=stride, padding=self.padding, bias=False),
)
self.conv_new_cell = nn.Sequential(
tf_Conv3d(num_hidden, num_hidden, kernel_size=filter_size,
stride=stride, padding=self.padding, bias=False),
)
self.conv_new_gm = nn.Sequential(
tf_Conv3d(num_hidden, num_hidden, kernel_size=filter_size,
stride=stride, padding=self.padding, bias=False),
)
self.conv_last = tf_Conv3d(num_hidden * 2, num_hidden, kernel_size=1,
stride=1, padding=0, bias=False)
def _attn(self, in_query, in_keys, in_values):
batch, num_channels, _, width, height = in_query.shape
query = in_query.reshape(batch, -1, num_channels)
keys = in_keys.reshape(batch, -1, num_channels)
values = in_values.reshape(batch, -1, num_channels)
attn = torch.einsum('bxc,byc->bxy', query, keys)
attn = torch.softmax(attn, dim=2)
attn = torch.einsum("bxy,byc->bxc", attn, values)
return attn.reshape(batch, num_channels, -1, width, height)
def forward(self, x_t, h_t, c_t, global_memory, eidetic_cell):
h_concat = self.conv_h(h_t)
i_h, g_h, r_h, o_h = torch.split(h_concat, self.num_hidden, dim=1)
x_concat = self.conv_x(x_t)
i_x, g_x, r_x, o_x, temp_i_x, temp_g_x, temp_f_x = \
torch.split(x_concat, self.num_hidden, dim=1)
i_t = torch.sigmoid(i_x + i_h)
r_t = torch.sigmoid(r_x + r_h)
g_t = torch.tanh(g_x + g_h)
new_cell = c_t + self._attn(r_t, eidetic_cell, eidetic_cell)
new_cell = self._norm_c_t(new_cell) + i_t * g_t
new_global_memory = self.conv_gm(global_memory)
i_m, f_m, g_m, m_m = torch.split(new_global_memory, self.num_hidden, dim=1)
temp_i_t = torch.sigmoid(temp_i_x + i_m)
temp_f_t = torch.sigmoid(temp_f_x + f_m + self._forget_bias)
temp_g_t = torch.tanh(temp_g_x + g_m)
new_global_memory = temp_f_t * torch.tanh(m_m) + temp_i_t * temp_g_t
o_c = self.conv_new_cell(new_cell)
o_m = self.conv_new_gm(new_global_memory)
output_gate = torch.tanh(o_x + o_h + o_c + o_m)
memory = torch.cat((new_cell, new_global_memory), 1)
memory = self.conv_last(memory)
output = torch.tanh(memory) * torch.sigmoid(output_gate)
return output, new_cell, global_memory