| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import TensorDataset, DataLoader | |
| from sklearn.model_selection import train_test_split | |
| class MogiCoulombLayer(nn.Module): | |
| def __init__(self, hidden_dim): | |
| super(MogiCoulombLayer, self).__init__() | |
| self.C1_net = nn.Linear(hidden_dim, 1) | |
| self.C2_net = nn.Linear(hidden_dim, 1) | |
| def forward(self, features, sigma1, sigma2, sigma3): | |
| C1 = torch.relu(self.C1_net(features)) | |
| C2 = torch.sigmoid(self.C2_net(features)) | |
| tau_oct = (1.0/3.0) * torch.sqrt( | |
| (sigma1 - sigma2)**2 + (sigma2 - sigma3)**2 + (sigma1 - sigma3)**2 | |
| ) | |
| sigma_m2 = (sigma1 + sigma3) / 2.0 | |
| yield_stress = C1 + C2 * sigma_m2 | |
| return tau_oct, yield_stress, C1, C2 | |
| class WeibullStrengthLayer(nn.Module): | |
| def __init__(self, hidden_dim): | |
| super(WeibullStrengthLayer, self).__init__() | |
| self.m_net = nn.Sequential( | |
| nn.Linear(hidden_dim, 32), | |
| nn.Tanh(), | |
| nn.Linear(32, 1), | |
| nn.Softplus() | |
| ) | |
| self.F0_net = nn.Sequential( | |
| nn.Linear(hidden_dim, 32), | |
| nn.Tanh(), | |
| nn.Linear(32, 1), | |
| nn.Softplus() | |
| ) | |
| def forward(self, features, F): | |
| m = self.m_net(features) + 1.0 | |
| F0 = self.F0_net(features) + 0.1 | |
| D_q = 1.0 - torch.exp(-torch.pow(F / F0, m)) | |
| return D_q, m, F0 | |
| class EnergyDamageLayer(nn.Module): | |
| def __init__(self, hidden_dim): | |
| super(EnergyDamageLayer, self).__init__() | |
| self.a_net = nn.Sequential( | |
| nn.Linear(hidden_dim, 32), | |
| nn.Tanh(), | |
| nn.Linear(32, 1), | |
| nn.Softplus() | |
| ) | |
| self.b_net = nn.Sequential( | |
| nn.Linear(hidden_dim, 32), | |
| nn.Tanh(), | |
| nn.Linear(32, 1), | |
| nn.Softplus() | |
| ) | |
| def forward(self, features, delta_sigma, D0): | |
| a = self.a_net(features) + 0.1 | |
| b = self.b_net(features) + 0.01 | |
| effective_stress = delta_sigma / (1.0 - D0 + 1e-8) | |
| U_p = a * torch.exp(b * effective_stress) | |
| D_n = (2.0 / np.pi) * torch.atan(b * U_p) | |
| return D_n, U_p, a, b | |
| class CrackTransformerPINN(nn.Module): | |
| def __init__(self, input_dim=5, output_dim=72, hidden_dims=[128, 256, 256, 128], dropout=0.2): | |
| super(CrackTransformerPINN, self).__init__() | |
| self.input_dim = input_dim | |
| self.output_dim = output_dim | |
| self.input_embedding = nn.Sequential( | |
| nn.Linear(input_dim, hidden_dims[0]), | |
| nn.LayerNorm(hidden_dims[0]), | |
| nn.GELU(), | |
| nn.Dropout(dropout * 0.5) | |
| ) | |
| self.damage_encoder = nn.Sequential( | |
| nn.Linear(input_dim, hidden_dims[0]), | |
| nn.Tanh(), | |
| nn.Linear(hidden_dims[0], hidden_dims[0]) | |
| ) | |
| encoder_layer = nn.TransformerEncoderLayer( | |
| d_model=hidden_dims[0], | |
| nhead=8, | |
| dim_feedforward=hidden_dims[0] * 4, | |
| dropout=dropout, | |
| activation='gelu', | |
| batch_first=True, | |
| norm_first=True | |
| ) | |
| self.transformer_encoder = nn.TransformerEncoder( | |
| encoder_layer, | |
| num_layers=4, | |
| norm=nn.LayerNorm(hidden_dims[0]) | |
| ) | |
| self.mogi_coulomb = MogiCoulombLayer(hidden_dims[0]) | |
| self.weibull_strength = WeibullStrengthLayer(hidden_dims[0]) | |
| self.energy_damage = EnergyDamageLayer(hidden_dims[0]) | |
| self.angle_decoder = nn.ModuleList() | |
| prev_dim = hidden_dims[0] * 2 | |
| for hidden_dim in hidden_dims[1:]: | |
| self.angle_decoder.append(nn.Linear(prev_dim, hidden_dim)) | |
| self.angle_decoder.append(nn.LayerNorm(hidden_dim)) | |
| self.angle_decoder.append(nn.Tanh()) | |
| self.angle_decoder.append(nn.Dropout(dropout)) | |
| prev_dim = hidden_dim | |
| self.angle_output = nn.Sequential( | |
| nn.Linear(prev_dim, output_dim), | |
| nn.ReLU() | |
| ) | |
| self.total_count_head = nn.Sequential( | |
| nn.Linear(hidden_dims[0] * 2, 64), | |
| nn.Tanh(), | |
| nn.Linear(64, 1), | |
| nn.ReLU() | |
| ) | |
| self.damage_factor_head = nn.Sequential( | |
| nn.Linear(hidden_dims[0], 32), | |
| nn.Tanh(), | |
| nn.Linear(32, 1), | |
| nn.Sigmoid() | |
| ) | |
| def compute_initial_damage(self, pH, FN, FT, T): | |
| D_ft = 0.002 * FN * torch.exp(0.02 * FT) | |
| D_ch = 0.01 * torch.abs(pH - 7.0) ** 1.5 | |
| D_th = torch.where( | |
| T > 100.0, | |
| 0.0003 * (T - 100.0) ** 1.2, | |
| torch.zeros_like(T) | |
| ) | |
| D_total = 1.0 - (1.0 - D_ft) * (1.0 - D_ch) * (1.0 - D_th) | |
| D_total = torch.clamp(D_total, 0.0, 0.99) | |
| return D_total | |
| def forward(self, x, return_physics=False): | |
| batch_size = x.shape[0] | |
| pH = x[:, 0:1] | |
| FN = x[:, 1:2] | |
| FT = x[:, 2:3] | |
| T = x[:, 3:4] | |
| phase = x[:, 4:5] | |
| D0 = self.compute_initial_damage(pH, FN, FT, T) | |
| lambda_coef = 1.0 - D0 | |
| x_embedded = self.input_embedding(x) | |
| damage_features = self.damage_encoder(x) | |
| x_seq = x_embedded.unsqueeze(1) | |
| encoded = self.transformer_encoder(x_seq) | |
| encoded = encoded.squeeze(1) | |
| combined = torch.cat([encoded, damage_features], dim=-1) | |
| h = combined | |
| for layer in self.angle_decoder: | |
| h = layer(h) | |
| angle_dist = self.angle_output(h) | |
| total_count = self.total_count_head(combined) | |
| predicted_D0 = self.damage_factor_head(encoded) | |
| if return_physics: | |
| sigma1 = 100.0 * torch.ones(batch_size, 1, device=x.device) | |
| sigma2 = 50.0 * torch.ones(batch_size, 1, device=x.device) | |
| sigma3 = 30.0 * torch.ones(batch_size, 1, device=x.device) | |
| delta_sigma = 20.0 * torch.ones(batch_size, 1, device=x.device) | |
| F_contact = 10.0 * torch.ones(batch_size, 1, device=x.device) | |
| tau_oct, yield_stress, C1, C2 = self.mogi_coulomb(encoded, sigma1, sigma2, sigma3) | |
| D_q, m, F0 = self.weibull_strength(encoded, F_contact) | |
| D_n, U_p, a, b = self.energy_damage(encoded, delta_sigma, D0) | |
| physics_outputs = { | |
| 'D0': D0, | |
| 'lambda': lambda_coef, | |
| 'predicted_D0': predicted_D0, | |
| 'tau_oct': tau_oct, | |
| 'yield_stress': yield_stress, | |
| 'C1': C1, | |
| 'C2': C2, | |
| 'D_q': D_q, | |
| 'm': m, | |
| 'F0': F0, | |
| 'D_n': D_n, | |
| 'U_p': U_p, | |
| 'a': a, | |
| 'b': b | |
| } | |
| return angle_dist, total_count, physics_outputs | |
| return angle_dist, total_count | |
| class CrackPINNLoss(nn.Module): | |
| def __init__(self, lambda_data=1.0, lambda_physics=0.5, lambda_smooth=0.1, | |
| lambda_damage=0.3, lambda_mogi=0.2, lambda_reg=1e-4): | |
| super(CrackPINNLoss, self).__init__() | |
| self.lambda_data = lambda_data | |
| self.lambda_physics = lambda_physics | |
| self.lambda_smooth = lambda_smooth | |
| self.lambda_damage = lambda_damage | |
| self.lambda_mogi = lambda_mogi | |
| self.lambda_reg = lambda_reg | |
| self.mse_loss = nn.MSELoss() | |
| def data_loss(self, pred_dist, true_dist): | |
| return self.mse_loss(pred_dist, true_dist) | |
| def physics_loss(self, pred_dist, pred_total, true_dist): | |
| pred_sum = pred_dist.sum(dim=1, keepdim=True) | |
| true_sum = true_dist.sum(dim=1, keepdim=True) | |
| loss_consistency = self.mse_loss(pred_sum, pred_total) | |
| loss_total = self.mse_loss(pred_total, true_sum) | |
| return loss_consistency + loss_total | |
| def smoothness_loss(self, pred_dist): | |
| diff = pred_dist[:, 1:] - pred_dist[:, :-1] | |
| return torch.mean(diff ** 2) | |
| def damage_consistency_loss(self, physics_outputs): | |
| D0 = physics_outputs['D0'] | |
| predicted_D0 = physics_outputs['predicted_D0'] | |
| loss_D0 = self.mse_loss(predicted_D0, D0) | |
| lambda_coef = physics_outputs['lambda'] | |
| loss_lambda = torch.mean(torch.relu(-lambda_coef) + torch.relu(lambda_coef - 1.0)) | |
| return loss_D0 + loss_lambda | |
| def mogi_coulomb_loss(self, physics_outputs): | |
| tau_oct = physics_outputs['tau_oct'] | |
| yield_stress = physics_outputs['yield_stress'] | |
| loss_yield = torch.mean(torch.relu(tau_oct - yield_stress)) | |
| C1 = physics_outputs['C1'] | |
| C2 = physics_outputs['C2'] | |
| loss_params = torch.mean(torch.relu(-C1)) + torch.mean(torch.relu(C2 - 1.0)) | |
| return loss_yield + 0.1 * loss_params | |
| def regularization_loss(self, model): | |
| l2_reg = torch.tensor(0.0, device=next(model.parameters()).device) | |
| for param in model.parameters(): | |
| l2_reg += torch.norm(param, p=2) ** 2 | |
| return l2_reg | |
| def forward(self, pred_dist, pred_total, true_dist, model, physics_outputs=None): | |
| loss_data = self.data_loss(pred_dist, true_dist) | |
| loss_physics = self.physics_loss(pred_dist, pred_total, true_dist) | |
| loss_smooth = self.smoothness_loss(pred_dist) | |
| loss_reg = self.regularization_loss(model) | |
| loss_damage = torch.tensor(0.0, device=pred_dist.device) | |
| loss_mogi = torch.tensor(0.0, device=pred_dist.device) | |
| if physics_outputs is not None: | |
| loss_damage = self.damage_consistency_loss(physics_outputs) | |
| loss_mogi = self.mogi_coulomb_loss(physics_outputs) | |
| total_loss = (self.lambda_data * loss_data + | |
| self.lambda_physics * loss_physics + | |
| self.lambda_smooth * loss_smooth + | |
| self.lambda_damage * loss_damage + | |
| self.lambda_mogi * loss_mogi + | |
| self.lambda_reg * loss_reg) | |
| loss_dict = { | |
| 'total': total_loss.item(), | |
| 'data': loss_data.item(), | |
| 'physics': loss_physics.item(), | |
| 'smooth': loss_smooth.item(), | |
| 'damage': loss_damage.item(), | |
| 'mogi': loss_mogi.item(), | |
| 'reg': loss_reg.item() | |
| } | |
| return total_loss, loss_dict | |
| class CrackPINNTrainer: | |
| def __init__(self, model, device='cpu', lr=1e-3, weight_decay=1e-4): | |
| self.model = model.to(device) | |
| self.device = device | |
| self.optimizer = optim.AdamW( | |
| model.parameters(), | |
| lr=lr, | |
| weight_decay=weight_decay, | |
| betas=(0.9, 0.999) | |
| ) | |
| self.scheduler = optim.lr_scheduler.ReduceLROnPlateau( | |
| self.optimizer, | |
| mode='min', | |
| factor=0.5, | |
| patience=10, | |
| verbose=True | |
| ) | |
| self.criterion = CrackPINNLoss( | |
| lambda_data=1.0, | |
| lambda_physics=0.5, | |
| lambda_smooth=0.1, | |
| lambda_damage=0.3, | |
| lambda_mogi=0.2, | |
| lambda_reg=1e-4 | |
| ) | |
| self.train_losses = [] | |
| self.val_losses = [] | |
| def train_epoch(self, train_loader): | |
| self.model.train() | |
| epoch_losses = [] | |
| loss_components = { | |
| 'data': [], 'physics': [], 'smooth': [], | |
| 'damage': [], 'mogi': [], 'reg': [] | |
| } | |
| for X_batch, y_batch in train_loader: | |
| X_batch = X_batch.to(self.device) | |
| y_batch = y_batch.to(self.device) | |
| pred_dist, pred_total, physics_outputs = self.model(X_batch, return_physics=True) | |
| loss, loss_dict = self.criterion( | |
| pred_dist, pred_total, y_batch, self.model, physics_outputs | |
| ) | |
| self.optimizer.zero_grad() | |
| loss.backward() | |
| torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0) | |
| self.optimizer.step() | |
| epoch_losses.append(loss_dict['total']) | |
| for key in loss_components.keys(): | |
| loss_components[key].append(loss_dict[key]) | |
| avg_loss = np.mean(epoch_losses) | |
| avg_components = {key: np.mean(values) for key, values in loss_components.items()} | |
| return avg_loss, avg_components | |
| def validate(self, val_loader): | |
| self.model.eval() | |
| val_losses = [] | |
| all_preds = [] | |
| all_trues = [] | |
| with torch.no_grad(): | |
| for X_batch, y_batch in val_loader: | |
| X_batch = X_batch.to(self.device) | |
| y_batch = y_batch.to(self.device) | |
| pred_dist, pred_total = self.model(X_batch, return_physics=False) | |
| loss, _ = self.criterion(pred_dist, pred_total, y_batch, self.model) | |
| val_losses.append(loss.item()) | |
| all_preds.append(pred_dist.cpu().numpy()) | |
| all_trues.append(y_batch.cpu().numpy()) | |
| avg_loss = np.mean(val_losses) | |
| all_preds = np.concatenate(all_preds, axis=0) | |
| all_trues = np.concatenate(all_trues, axis=0) | |
| ss_res = np.sum((all_trues - all_preds) ** 2) | |
| ss_tot = np.sum((all_trues - np.mean(all_trues)) ** 2) | |
| r2 = 1 - (ss_res / (ss_tot + 1e-8)) | |
| rmse = np.sqrt(np.mean((all_trues - all_preds) ** 2)) | |
| pred_total_counts = all_preds.sum(axis=1) | |
| true_total_counts = all_trues.sum(axis=1) | |
| total_count_error = np.mean(np.abs(pred_total_counts - true_total_counts)) | |
| metrics = { | |
| 'r2': r2, | |
| 'rmse': rmse, | |
| 'total_count_mae': total_count_error | |
| } | |
| return avg_loss, metrics | |
| def fit(self, X_train, y_train, X_val, y_val, epochs=200, batch_size=16, patience=30): | |
| train_dataset = TensorDataset( | |
| torch.FloatTensor(X_train), | |
| torch.FloatTensor(y_train) | |
| ) | |
| train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) | |
| val_dataset = TensorDataset( | |
| torch.FloatTensor(X_val), | |
| torch.FloatTensor(y_val) | |
| ) | |
| val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False) | |
| best_val_loss = float('inf') | |
| patience_counter = 0 | |
| best_model_state = None | |
| print("\nStarting training...") | |
| print("=" * 80) | |
| for epoch in range(epochs): | |
| train_loss, train_components = self.train_epoch(train_loader) | |
| val_loss, val_metrics = self.validate(val_loader) | |
| self.train_losses.append(train_loss) | |
| self.val_losses.append(val_loss) | |
| self.scheduler.step(val_loss) | |
| if (epoch + 1) % 10 == 0 or epoch == 0: | |
| print(f"Epoch {epoch+1}/{epochs}") | |
| print(f" Train Loss: {train_loss:.4f} " | |
| f"(data: {train_components['data']:.4f}, " | |
| f"phys: {train_components['physics']:.4f}, " | |
| f"damage: {train_components['damage']:.4f})") | |
| print(f" Val Loss: {val_loss:.4f} | " | |
| f"R2: {val_metrics['r2']:.4f} | " | |
| f"RMSE: {val_metrics['rmse']:.2f}") | |
| if val_loss < best_val_loss: | |
| best_val_loss = val_loss | |
| patience_counter = 0 | |
| best_model_state = self.model.state_dict().copy() | |
| else: | |
| patience_counter += 1 | |
| if patience_counter >= patience: | |
| print(f"\nEarly stopping. Best val loss: {best_val_loss:.4f}") | |
| break | |
| if best_model_state is not None: | |
| self.model.load_state_dict(best_model_state) | |
| print("=" * 80) | |
| print(f"Training complete. Best val loss: {best_val_loss:.4f}") | |
| def predict(self, X): | |
| self.model.eval() | |
| with torch.no_grad(): | |
| X_tensor = torch.FloatTensor(X).to(self.device) | |
| pred_dist, pred_total = self.model(X_tensor, return_physics=False) | |
| pred_dist = pred_dist.cpu().numpy() | |
| pred_total = pred_total.cpu().numpy().flatten() | |
| return pred_dist, pred_total | |
| def predict_with_physics(self, X): | |
| self.model.eval() | |
| with torch.no_grad(): | |
| X_tensor = torch.FloatTensor(X).to(self.device) | |
| pred_dist, pred_total, physics = self.model(X_tensor, return_physics=True) | |
| result = { | |
| 'angle_distribution': pred_dist.cpu().numpy(), | |
| 'total_count': pred_total.cpu().numpy().flatten(), | |
| 'D0': physics['D0'].cpu().numpy().flatten(), | |
| 'lambda': physics['lambda'].cpu().numpy().flatten(), | |
| 'D_n': physics['D_n'].cpu().numpy().flatten() | |
| } | |
| return result | |