TTSAM / model.py
jimmy60504's picture
docs: add full model implementation with CNN, MLP, and Transformer components
7781d84
import numpy as np
import torch
import torch.nn as nn
from loguru import logger
# GPU/CPU 設定
if torch.cuda.is_available():
device = torch.device("cuda")
logger.info("使用 GPU")
elif torch.mps.is_available():
device = torch.device("mps")
logger.info("使用 Apple MPS")
else:
device = torch.device("cpu")
logger.info("使用 CPU")
class LambdaLayer(nn.Module):
def __init__(self, lambd, eps=1e-4):
super(LambdaLayer, self).__init__()
self.lambd = lambd
self.eps = eps
def forward(self, x):
return self.lambd(x) + self.eps
class MLP(nn.Module):
def __init__(
self,
input_shape,
dims=(500, 300, 200, 150),
activation=nn.ReLU(),
last_activation=None,
):
super(MLP, self).__init__()
if last_activation is None:
last_activation = activation
self.dims = dims
self.first_fc = nn.Linear(input_shape[0], dims[0])
self.first_activation = activation
more_hidden = []
if len(self.dims) > 2:
for i in range(1, len(self.dims) - 1):
more_hidden.append(nn.Linear(self.dims[i - 1], self.dims[i]))
more_hidden.append(nn.ReLU())
self.more_hidden = nn.ModuleList(more_hidden)
self.last_fc = nn.Linear(dims[-2], dims[-1])
self.last_activation = last_activation
def forward(self, x):
output = self.first_fc(x)
output = self.first_activation(output)
if self.more_hidden:
for layer in self.more_hidden:
output = layer(output)
output = self.last_fc(output)
output = self.last_activation(output)
return output
class CNN(nn.Module):
def __init__(
self,
input_shape=(-1, 6000, 3),
activation=nn.ReLU(),
downsample=1,
mlp_input=11665,
mlp_dims=(500, 300, 200, 150),
eps=1e-8,
):
super(CNN, self).__init__()
self.input_shape = input_shape
self.activation = activation
self.downsample = downsample
self.mlp_input = mlp_input
self.mlp_dims = mlp_dims
self.eps = eps
self.lambda_layer_1 = LambdaLayer(
lambda t: t
/ (
torch.max(
torch.max(torch.abs(t), dim=1, keepdim=True).values,
dim=2,
keepdim=True,
).values
+ self.eps
)
)
self.unsqueeze_layer1 = LambdaLayer(lambda t: torch.unsqueeze(t, dim=1))
self.lambda_layer_2 = LambdaLayer(
lambda t: torch.log(
torch.max(torch.max(torch.abs(t), dim=1).values, dim=1).values
+ self.eps
)
/ 100
)
self.unsqueeze_layer2 = LambdaLayer(lambda t: torch.unsqueeze(t, dim=1))
self.conv2d1 = nn.Sequential(
nn.Conv2d(1, 8, kernel_size=(1, downsample), stride=(1, downsample)),
nn.ReLU(),
)
self.conv2d2 = nn.Sequential(
nn.Conv2d(8, 32, kernel_size=(16, 3), stride=(1, 3)), nn.ReLU()
)
self.conv1d1 = nn.Sequential(nn.Conv1d(32, 64, kernel_size=16), nn.ReLU())
self.maxpooling = nn.MaxPool1d(2)
self.conv1d2 = nn.Sequential(nn.Conv1d(64, 128, kernel_size=16), nn.ReLU())
self.conv1d3 = nn.Sequential(nn.Conv1d(128, 32, kernel_size=8), nn.ReLU())
self.conv1d4 = nn.Sequential(nn.Conv1d(32, 32, kernel_size=8), nn.ReLU())
self.conv1d5 = nn.Sequential(nn.Conv1d(32, 16, kernel_size=4), nn.ReLU())
self.mlp = MLP((self.mlp_input,), dims=self.mlp_dims)
def forward(self, x):
output = self.lambda_layer_1(x)
output = self.unsqueeze_layer1(output)
scale = self.lambda_layer_2(x)
scale = self.unsqueeze_layer2(scale)
output = self.conv2d1(output)
output = self.conv2d2(output)
output = torch.squeeze(output, dim=-1)
output = self.conv1d1(output)
output = self.maxpooling(output)
output = self.conv1d2(output)
output = self.maxpooling(output)
output = self.conv1d3(output)
output = self.maxpooling(output)
output = self.conv1d4(output)
output = self.conv1d5(output)
output = torch.flatten(output, start_dim=1)
output = torch.cat((output, scale), dim=1)
output = self.mlp(output)
return output
class PositionEmbeddingVs30(nn.Module):
def __init__(
self, wavelengths=((5, 30), (110, 123), (0.01, 5000), (100, 1600)), emb_dim=500
):
super(PositionEmbeddingVs30, self).__init__()
self.wavelengths = wavelengths
self.emb_dim = emb_dim
min_lat, max_lat = wavelengths[0]
min_lon, max_lon = wavelengths[1]
min_depth, max_depth = wavelengths[2]
min_vs30, max_vs30 = wavelengths[3]
assert emb_dim % 10 == 0
lat_dim = emb_dim // 5
lon_dim = emb_dim // 5
depth_dim = emb_dim // 10
vs30_dim = emb_dim // 10
self.lat_coeff = (
2
* np.pi
* 1.0
/ min_lat
* ((min_lat / max_lat) ** (np.arange(lat_dim) / lat_dim))
)
self.lon_coeff = (
2
* np.pi
* 1.0
/ min_lon
* ((min_lon / max_lon) ** (np.arange(lon_dim) / lon_dim))
)
self.depth_coeff = (
2
* np.pi
* 1.0
/ min_depth
* ((min_depth / max_depth) ** (np.arange(depth_dim) / depth_dim))
)
self.vs30_coeff = (
2
* np.pi
* 1.0
/ min_vs30
* ((min_vs30 / max_vs30) ** (np.arange(vs30_dim) / vs30_dim))
)
lat_sin_mask = np.arange(emb_dim) % 5 == 0
lat_cos_mask = np.arange(emb_dim) % 5 == 1
lon_sin_mask = np.arange(emb_dim) % 5 == 2
lon_cos_mask = np.arange(emb_dim) % 5 == 3
depth_sin_mask = np.arange(emb_dim) % 10 == 4
depth_cos_mask = np.arange(emb_dim) % 10 == 9
vs30_sin_mask = np.arange(emb_dim) % 10 == 5
vs30_cos_mask = np.arange(emb_dim) % 10 == 8
self.mask = np.zeros(emb_dim)
self.mask[lat_sin_mask] = np.arange(lat_dim)
self.mask[lat_cos_mask] = lat_dim + np.arange(lat_dim)
self.mask[lon_sin_mask] = 2 * lat_dim + np.arange(lon_dim)
self.mask[lon_cos_mask] = 2 * lat_dim + lon_dim + np.arange(lon_dim)
self.mask[depth_sin_mask] = 2 * lat_dim + 2 * lon_dim + np.arange(depth_dim)
self.mask[depth_cos_mask] = (
2 * lat_dim + 2 * lon_dim + depth_dim + np.arange(depth_dim)
)
self.mask[vs30_sin_mask] = (
2 * lat_dim + 2 * lon_dim + 2 * depth_dim + np.arange(vs30_dim)
)
self.mask[vs30_cos_mask] = (
2 * lat_dim + 2 * lon_dim + 2 * depth_dim + vs30_dim + np.arange(vs30_dim)
)
self.mask = self.mask.astype("int32")
def forward(self, x):
lat_base = x[:, :, 0:1].to(device) * torch.Tensor(self.lat_coeff).to(device)
lon_base = x[:, :, 1:2].to(device) * torch.Tensor(self.lon_coeff).to(device)
depth_base = x[:, :, 2:3].to(device) * torch.Tensor(self.depth_coeff).to(device)
vs30_base = x[:, :, 3:4] * torch.Tensor(self.vs30_coeff).to(device)
output = torch.cat(
[
torch.sin(lat_base),
torch.cos(lat_base),
torch.sin(lon_base),
torch.cos(lon_base),
torch.sin(depth_base),
torch.cos(depth_base),
torch.sin(vs30_base),
torch.cos(vs30_base),
],
dim=-1,
)
maskk = torch.from_numpy(np.array(self.mask)).long()
index = (
(maskk.unsqueeze(0).unsqueeze(0))
.expand(x.shape[0], 1, self.emb_dim)
.to(device)
)
output = torch.gather(output, -1, index).to(device)
return output
class TransformerEncoder(nn.Module):
def __init__(
self,
d_model=150,
nhead=10,
batch_first=True,
activation="gelu",
dropout=0.0,
dim_feedforward=1000,
):
super(TransformerEncoder, self).__init__()
self.encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model,
nhead=nhead,
batch_first=batch_first,
activation=activation,
dropout=dropout,
dim_feedforward=dim_feedforward,
).to(device)
self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, 6).to(
device
)
def forward(self, x, src_key_padding_mask=None):
return self.transformer_encoder(x, src_key_padding_mask=src_key_padding_mask)
class MDN(nn.Module):
def __init__(self, input_shape=(150,), n_hidden=20, n_gaussians=5):
super(MDN, self).__init__()
self.z_h = nn.Sequential(nn.Linear(input_shape[0], n_hidden), nn.Tanh())
self.z_weight = nn.Linear(n_hidden, n_gaussians)
self.z_sigma = nn.Linear(n_hidden, n_gaussians)
self.z_mu = nn.Linear(n_hidden, n_gaussians)
def forward(self, x):
z_h = self.z_h(x)
weight = nn.functional.softmax(self.z_weight(z_h), -1)
sigma = torch.exp(self.z_sigma(z_h))
mu = self.z_mu(z_h)
return weight, sigma, mu
class FullModel(nn.Module):
def __init__(
self,
model_cnn,
model_position,
model_transformer,
model_mlp,
model_mdn,
max_station=25,
pga_targets=15,
emb_dim=150,
data_length=6000,
):
super(FullModel, self).__init__()
self.data_length = data_length
self.model_CNN = model_cnn
self.model_Position = model_position
self.model_Transformer = model_transformer
self.model_mlp = model_mlp
self.model_MDN = model_mdn
self.max_station = max_station
self.pga_targets = pga_targets
self.emb_dim = emb_dim
def forward(self, data):
cnn_output = self.model_CNN(
torch.DoubleTensor(data["waveform"].reshape(-1, self.data_length, 3))
.float()
.to(device)
)
cnn_output_reshape = torch.reshape(
cnn_output, (-1, self.max_station, self.emb_dim)
)
emb_output = self.model_Position(
torch.DoubleTensor(data["station"].reshape(-1, 1, data["station"].shape[2]))
.float()
.to(device)
)
emb_output = emb_output.reshape(-1, self.max_station, self.emb_dim)
station_pad_mask = data["station"] == 0
station_pad_mask = torch.all(station_pad_mask, 2)
pga_pos_emb_output = self.model_Position(
torch.DoubleTensor(data["target"].reshape(-1, 1, data["target"].shape[2]))
.float()
.to(device)
)
pga_pos_emb_output = pga_pos_emb_output.reshape(
-1, self.pga_targets, self.emb_dim
)
target_pad_mask = torch.ones_like(data["target"], dtype=torch.bool)
target_pad_mask = torch.all(target_pad_mask, 2)
pad_mask = torch.cat((station_pad_mask, target_pad_mask), dim=1).to(device)
add_pe_cnn_output = torch.add(cnn_output_reshape, emb_output)
transformer_input = torch.cat((add_pe_cnn_output, pga_pos_emb_output), dim=1)
transformer_output = self.model_Transformer(transformer_input, pad_mask)
mlp_input = transformer_output[:, -self.pga_targets :, :].to(device)
mlp_output = self.model_mlp(mlp_input)
weight, sigma, mu = self.model_MDN(mlp_output)
return weight, sigma, mu
def get_full_model(model_path):
emb_dim = 150
mlp_dims = (150, 100, 50, 30, 10)
cnn_model = CNN(mlp_input=5665).to(device)
pos_emb_model = PositionEmbeddingVs30(emb_dim=emb_dim).to(device)
transformer_model = TransformerEncoder()
mlp_model = MLP(input_shape=(emb_dim,), dims=mlp_dims).to(device)
mdn_model = MDN(input_shape=(mlp_dims[-1],)).to(device)
full_model = FullModel(
cnn_model,
pos_emb_model,
transformer_model,
mlp_model,
mdn_model,
pga_targets=25,
data_length=3000,
).to(device)
full_model.load_state_dict(
torch.load(model_path, weights_only=True, map_location=device)
)
return full_model