import numpy as np import torch import torch.nn as nn from loguru import logger # GPU/CPU 設定 if torch.cuda.is_available(): device = torch.device("cuda") logger.info("使用 GPU") elif torch.mps.is_available(): device = torch.device("mps") logger.info("使用 Apple MPS") else: device = torch.device("cpu") logger.info("使用 CPU") class LambdaLayer(nn.Module): def __init__(self, lambd, eps=1e-4): super(LambdaLayer, self).__init__() self.lambd = lambd self.eps = eps def forward(self, x): return self.lambd(x) + self.eps class MLP(nn.Module): def __init__( self, input_shape, dims=(500, 300, 200, 150), activation=nn.ReLU(), last_activation=None, ): super(MLP, self).__init__() if last_activation is None: last_activation = activation self.dims = dims self.first_fc = nn.Linear(input_shape[0], dims[0]) self.first_activation = activation more_hidden = [] if len(self.dims) > 2: for i in range(1, len(self.dims) - 1): more_hidden.append(nn.Linear(self.dims[i - 1], self.dims[i])) more_hidden.append(nn.ReLU()) self.more_hidden = nn.ModuleList(more_hidden) self.last_fc = nn.Linear(dims[-2], dims[-1]) self.last_activation = last_activation def forward(self, x): output = self.first_fc(x) output = self.first_activation(output) if self.more_hidden: for layer in self.more_hidden: output = layer(output) output = self.last_fc(output) output = self.last_activation(output) return output class CNN(nn.Module): def __init__( self, input_shape=(-1, 6000, 3), activation=nn.ReLU(), downsample=1, mlp_input=11665, mlp_dims=(500, 300, 200, 150), eps=1e-8, ): super(CNN, self).__init__() self.input_shape = input_shape self.activation = activation self.downsample = downsample self.mlp_input = mlp_input self.mlp_dims = mlp_dims self.eps = eps self.lambda_layer_1 = LambdaLayer( lambda t: t / ( torch.max( torch.max(torch.abs(t), dim=1, keepdim=True).values, dim=2, keepdim=True, ).values + self.eps ) ) self.unsqueeze_layer1 = LambdaLayer(lambda t: torch.unsqueeze(t, dim=1)) self.lambda_layer_2 = LambdaLayer( lambda t: torch.log( torch.max(torch.max(torch.abs(t), dim=1).values, dim=1).values + self.eps ) / 100 ) self.unsqueeze_layer2 = LambdaLayer(lambda t: torch.unsqueeze(t, dim=1)) self.conv2d1 = nn.Sequential( nn.Conv2d(1, 8, kernel_size=(1, downsample), stride=(1, downsample)), nn.ReLU(), ) self.conv2d2 = nn.Sequential( nn.Conv2d(8, 32, kernel_size=(16, 3), stride=(1, 3)), nn.ReLU() ) self.conv1d1 = nn.Sequential(nn.Conv1d(32, 64, kernel_size=16), nn.ReLU()) self.maxpooling = nn.MaxPool1d(2) self.conv1d2 = nn.Sequential(nn.Conv1d(64, 128, kernel_size=16), nn.ReLU()) self.conv1d3 = nn.Sequential(nn.Conv1d(128, 32, kernel_size=8), nn.ReLU()) self.conv1d4 = nn.Sequential(nn.Conv1d(32, 32, kernel_size=8), nn.ReLU()) self.conv1d5 = nn.Sequential(nn.Conv1d(32, 16, kernel_size=4), nn.ReLU()) self.mlp = MLP((self.mlp_input,), dims=self.mlp_dims) def forward(self, x): output = self.lambda_layer_1(x) output = self.unsqueeze_layer1(output) scale = self.lambda_layer_2(x) scale = self.unsqueeze_layer2(scale) output = self.conv2d1(output) output = self.conv2d2(output) output = torch.squeeze(output, dim=-1) output = self.conv1d1(output) output = self.maxpooling(output) output = self.conv1d2(output) output = self.maxpooling(output) output = self.conv1d3(output) output = self.maxpooling(output) output = self.conv1d4(output) output = self.conv1d5(output) output = torch.flatten(output, start_dim=1) output = torch.cat((output, scale), dim=1) output = self.mlp(output) return output class PositionEmbeddingVs30(nn.Module): def __init__( self, wavelengths=((5, 30), (110, 123), (0.01, 5000), (100, 1600)), emb_dim=500 ): super(PositionEmbeddingVs30, self).__init__() self.wavelengths = wavelengths self.emb_dim = emb_dim min_lat, max_lat = wavelengths[0] min_lon, max_lon = wavelengths[1] min_depth, max_depth = wavelengths[2] min_vs30, max_vs30 = wavelengths[3] assert emb_dim % 10 == 0 lat_dim = emb_dim // 5 lon_dim = emb_dim // 5 depth_dim = emb_dim // 10 vs30_dim = emb_dim // 10 self.lat_coeff = ( 2 * np.pi * 1.0 / min_lat * ((min_lat / max_lat) ** (np.arange(lat_dim) / lat_dim)) ) self.lon_coeff = ( 2 * np.pi * 1.0 / min_lon * ((min_lon / max_lon) ** (np.arange(lon_dim) / lon_dim)) ) self.depth_coeff = ( 2 * np.pi * 1.0 / min_depth * ((min_depth / max_depth) ** (np.arange(depth_dim) / depth_dim)) ) self.vs30_coeff = ( 2 * np.pi * 1.0 / min_vs30 * ((min_vs30 / max_vs30) ** (np.arange(vs30_dim) / vs30_dim)) ) lat_sin_mask = np.arange(emb_dim) % 5 == 0 lat_cos_mask = np.arange(emb_dim) % 5 == 1 lon_sin_mask = np.arange(emb_dim) % 5 == 2 lon_cos_mask = np.arange(emb_dim) % 5 == 3 depth_sin_mask = np.arange(emb_dim) % 10 == 4 depth_cos_mask = np.arange(emb_dim) % 10 == 9 vs30_sin_mask = np.arange(emb_dim) % 10 == 5 vs30_cos_mask = np.arange(emb_dim) % 10 == 8 self.mask = np.zeros(emb_dim) self.mask[lat_sin_mask] = np.arange(lat_dim) self.mask[lat_cos_mask] = lat_dim + np.arange(lat_dim) self.mask[lon_sin_mask] = 2 * lat_dim + np.arange(lon_dim) self.mask[lon_cos_mask] = 2 * lat_dim + lon_dim + np.arange(lon_dim) self.mask[depth_sin_mask] = 2 * lat_dim + 2 * lon_dim + np.arange(depth_dim) self.mask[depth_cos_mask] = ( 2 * lat_dim + 2 * lon_dim + depth_dim + np.arange(depth_dim) ) self.mask[vs30_sin_mask] = ( 2 * lat_dim + 2 * lon_dim + 2 * depth_dim + np.arange(vs30_dim) ) self.mask[vs30_cos_mask] = ( 2 * lat_dim + 2 * lon_dim + 2 * depth_dim + vs30_dim + np.arange(vs30_dim) ) self.mask = self.mask.astype("int32") def forward(self, x): lat_base = x[:, :, 0:1].to(device) * torch.Tensor(self.lat_coeff).to(device) lon_base = x[:, :, 1:2].to(device) * torch.Tensor(self.lon_coeff).to(device) depth_base = x[:, :, 2:3].to(device) * torch.Tensor(self.depth_coeff).to(device) vs30_base = x[:, :, 3:4] * torch.Tensor(self.vs30_coeff).to(device) output = torch.cat( [ torch.sin(lat_base), torch.cos(lat_base), torch.sin(lon_base), torch.cos(lon_base), torch.sin(depth_base), torch.cos(depth_base), torch.sin(vs30_base), torch.cos(vs30_base), ], dim=-1, ) maskk = torch.from_numpy(np.array(self.mask)).long() index = ( (maskk.unsqueeze(0).unsqueeze(0)) .expand(x.shape[0], 1, self.emb_dim) .to(device) ) output = torch.gather(output, -1, index).to(device) return output class TransformerEncoder(nn.Module): def __init__( self, d_model=150, nhead=10, batch_first=True, activation="gelu", dropout=0.0, dim_feedforward=1000, ): super(TransformerEncoder, self).__init__() self.encoder_layer = nn.TransformerEncoderLayer( d_model=d_model, nhead=nhead, batch_first=batch_first, activation=activation, dropout=dropout, dim_feedforward=dim_feedforward, ).to(device) self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, 6).to( device ) def forward(self, x, src_key_padding_mask=None): return self.transformer_encoder(x, src_key_padding_mask=src_key_padding_mask) class MDN(nn.Module): def __init__(self, input_shape=(150,), n_hidden=20, n_gaussians=5): super(MDN, self).__init__() self.z_h = nn.Sequential(nn.Linear(input_shape[0], n_hidden), nn.Tanh()) self.z_weight = nn.Linear(n_hidden, n_gaussians) self.z_sigma = nn.Linear(n_hidden, n_gaussians) self.z_mu = nn.Linear(n_hidden, n_gaussians) def forward(self, x): z_h = self.z_h(x) weight = nn.functional.softmax(self.z_weight(z_h), -1) sigma = torch.exp(self.z_sigma(z_h)) mu = self.z_mu(z_h) return weight, sigma, mu class FullModel(nn.Module): def __init__( self, model_cnn, model_position, model_transformer, model_mlp, model_mdn, max_station=25, pga_targets=15, emb_dim=150, data_length=6000, ): super(FullModel, self).__init__() self.data_length = data_length self.model_CNN = model_cnn self.model_Position = model_position self.model_Transformer = model_transformer self.model_mlp = model_mlp self.model_MDN = model_mdn self.max_station = max_station self.pga_targets = pga_targets self.emb_dim = emb_dim def forward(self, data): cnn_output = self.model_CNN( torch.DoubleTensor(data["waveform"].reshape(-1, self.data_length, 3)) .float() .to(device) ) cnn_output_reshape = torch.reshape( cnn_output, (-1, self.max_station, self.emb_dim) ) emb_output = self.model_Position( torch.DoubleTensor(data["station"].reshape(-1, 1, data["station"].shape[2])) .float() .to(device) ) emb_output = emb_output.reshape(-1, self.max_station, self.emb_dim) station_pad_mask = data["station"] == 0 station_pad_mask = torch.all(station_pad_mask, 2) pga_pos_emb_output = self.model_Position( torch.DoubleTensor(data["target"].reshape(-1, 1, data["target"].shape[2])) .float() .to(device) ) pga_pos_emb_output = pga_pos_emb_output.reshape( -1, self.pga_targets, self.emb_dim ) target_pad_mask = torch.ones_like(data["target"], dtype=torch.bool) target_pad_mask = torch.all(target_pad_mask, 2) pad_mask = torch.cat((station_pad_mask, target_pad_mask), dim=1).to(device) add_pe_cnn_output = torch.add(cnn_output_reshape, emb_output) transformer_input = torch.cat((add_pe_cnn_output, pga_pos_emb_output), dim=1) transformer_output = self.model_Transformer(transformer_input, pad_mask) mlp_input = transformer_output[:, -self.pga_targets :, :].to(device) mlp_output = self.model_mlp(mlp_input) weight, sigma, mu = self.model_MDN(mlp_output) return weight, sigma, mu def get_full_model(model_path): emb_dim = 150 mlp_dims = (150, 100, 50, 30, 10) cnn_model = CNN(mlp_input=5665).to(device) pos_emb_model = PositionEmbeddingVs30(emb_dim=emb_dim).to(device) transformer_model = TransformerEncoder() mlp_model = MLP(input_shape=(emb_dim,), dims=mlp_dims).to(device) mdn_model = MDN(input_shape=(mlp_dims[-1],)).to(device) full_model = FullModel( cnn_model, pos_emb_model, transformer_model, mlp_model, mdn_model, pga_targets=25, data_length=3000, ).to(device) full_model.load_state_dict( torch.load(model_path, weights_only=True, map_location=device) ) return full_model