import numpy as np import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import TensorDataset, DataLoader from sklearn.model_selection import train_test_split class MogiCoulombLayer(nn.Module): def __init__(self, hidden_dim): super(MogiCoulombLayer, self).__init__() self.C1_net = nn.Linear(hidden_dim, 1) self.C2_net = nn.Linear(hidden_dim, 1) def forward(self, features, sigma1, sigma2, sigma3): C1 = torch.relu(self.C1_net(features)) C2 = torch.sigmoid(self.C2_net(features)) tau_oct = (1.0/3.0) * torch.sqrt( (sigma1 - sigma2)**2 + (sigma2 - sigma3)**2 + (sigma1 - sigma3)**2 ) sigma_m2 = (sigma1 + sigma3) / 2.0 yield_stress = C1 + C2 * sigma_m2 return tau_oct, yield_stress, C1, C2 class WeibullStrengthLayer(nn.Module): def __init__(self, hidden_dim): super(WeibullStrengthLayer, self).__init__() self.m_net = nn.Sequential( nn.Linear(hidden_dim, 32), nn.Tanh(), nn.Linear(32, 1), nn.Softplus() ) self.F0_net = nn.Sequential( nn.Linear(hidden_dim, 32), nn.Tanh(), nn.Linear(32, 1), nn.Softplus() ) def forward(self, features, F): m = self.m_net(features) + 1.0 F0 = self.F0_net(features) + 0.1 D_q = 1.0 - torch.exp(-torch.pow(F / F0, m)) return D_q, m, F0 class EnergyDamageLayer(nn.Module): def __init__(self, hidden_dim): super(EnergyDamageLayer, self).__init__() self.a_net = nn.Sequential( nn.Linear(hidden_dim, 32), nn.Tanh(), nn.Linear(32, 1), nn.Softplus() ) self.b_net = nn.Sequential( nn.Linear(hidden_dim, 32), nn.Tanh(), nn.Linear(32, 1), nn.Softplus() ) def forward(self, features, delta_sigma, D0): a = self.a_net(features) + 0.1 b = self.b_net(features) + 0.01 effective_stress = delta_sigma / (1.0 - D0 + 1e-8) U_p = a * torch.exp(b * effective_stress) D_n = (2.0 / np.pi) * torch.atan(b * U_p) return D_n, U_p, a, b class CrackTransformerPINN(nn.Module): def __init__(self, input_dim=5, output_dim=72, hidden_dims=[128, 256, 256, 128], dropout=0.2): super(CrackTransformerPINN, self).__init__() self.input_dim = input_dim self.output_dim = output_dim self.input_embedding = nn.Sequential( nn.Linear(input_dim, hidden_dims[0]), nn.LayerNorm(hidden_dims[0]), nn.GELU(), nn.Dropout(dropout * 0.5) ) self.damage_encoder = nn.Sequential( nn.Linear(input_dim, hidden_dims[0]), nn.Tanh(), nn.Linear(hidden_dims[0], hidden_dims[0]) ) encoder_layer = nn.TransformerEncoderLayer( d_model=hidden_dims[0], nhead=8, dim_feedforward=hidden_dims[0] * 4, dropout=dropout, activation='gelu', batch_first=True, norm_first=True ) self.transformer_encoder = nn.TransformerEncoder( encoder_layer, num_layers=4, norm=nn.LayerNorm(hidden_dims[0]) ) self.mogi_coulomb = MogiCoulombLayer(hidden_dims[0]) self.weibull_strength = WeibullStrengthLayer(hidden_dims[0]) self.energy_damage = EnergyDamageLayer(hidden_dims[0]) self.angle_decoder = nn.ModuleList() prev_dim = hidden_dims[0] * 2 for hidden_dim in hidden_dims[1:]: self.angle_decoder.append(nn.Linear(prev_dim, hidden_dim)) self.angle_decoder.append(nn.LayerNorm(hidden_dim)) self.angle_decoder.append(nn.Tanh()) self.angle_decoder.append(nn.Dropout(dropout)) prev_dim = hidden_dim self.angle_output = nn.Sequential( nn.Linear(prev_dim, output_dim), nn.ReLU() ) self.total_count_head = nn.Sequential( nn.Linear(hidden_dims[0] * 2, 64), nn.Tanh(), nn.Linear(64, 1), nn.ReLU() ) self.damage_factor_head = nn.Sequential( nn.Linear(hidden_dims[0], 32), nn.Tanh(), nn.Linear(32, 1), nn.Sigmoid() ) def compute_initial_damage(self, pH, FN, FT, T): D_ft = 0.002 * FN * torch.exp(0.02 * FT) D_ch = 0.01 * torch.abs(pH - 7.0) ** 1.5 D_th = torch.where( T > 100.0, 0.0003 * (T - 100.0) ** 1.2, torch.zeros_like(T) ) D_total = 1.0 - (1.0 - D_ft) * (1.0 - D_ch) * (1.0 - D_th) D_total = torch.clamp(D_total, 0.0, 0.99) return D_total def forward(self, x, return_physics=False): batch_size = x.shape[0] pH = x[:, 0:1] FN = x[:, 1:2] FT = x[:, 2:3] T = x[:, 3:4] phase = x[:, 4:5] D0 = self.compute_initial_damage(pH, FN, FT, T) lambda_coef = 1.0 - D0 x_embedded = self.input_embedding(x) damage_features = self.damage_encoder(x) x_seq = x_embedded.unsqueeze(1) encoded = self.transformer_encoder(x_seq) encoded = encoded.squeeze(1) combined = torch.cat([encoded, damage_features], dim=-1) h = combined for layer in self.angle_decoder: h = layer(h) angle_dist = self.angle_output(h) total_count = self.total_count_head(combined) predicted_D0 = self.damage_factor_head(encoded) if return_physics: sigma1 = 100.0 * torch.ones(batch_size, 1, device=x.device) sigma2 = 50.0 * torch.ones(batch_size, 1, device=x.device) sigma3 = 30.0 * torch.ones(batch_size, 1, device=x.device) delta_sigma = 20.0 * torch.ones(batch_size, 1, device=x.device) F_contact = 10.0 * torch.ones(batch_size, 1, device=x.device) tau_oct, yield_stress, C1, C2 = self.mogi_coulomb(encoded, sigma1, sigma2, sigma3) D_q, m, F0 = self.weibull_strength(encoded, F_contact) D_n, U_p, a, b = self.energy_damage(encoded, delta_sigma, D0) physics_outputs = { 'D0': D0, 'lambda': lambda_coef, 'predicted_D0': predicted_D0, 'tau_oct': tau_oct, 'yield_stress': yield_stress, 'C1': C1, 'C2': C2, 'D_q': D_q, 'm': m, 'F0': F0, 'D_n': D_n, 'U_p': U_p, 'a': a, 'b': b } return angle_dist, total_count, physics_outputs return angle_dist, total_count class CrackPINNLoss(nn.Module): def __init__(self, lambda_data=1.0, lambda_physics=0.5, lambda_smooth=0.1, lambda_damage=0.3, lambda_mogi=0.2, lambda_reg=1e-4): super(CrackPINNLoss, self).__init__() self.lambda_data = lambda_data self.lambda_physics = lambda_physics self.lambda_smooth = lambda_smooth self.lambda_damage = lambda_damage self.lambda_mogi = lambda_mogi self.lambda_reg = lambda_reg self.mse_loss = nn.MSELoss() def data_loss(self, pred_dist, true_dist): return self.mse_loss(pred_dist, true_dist) def physics_loss(self, pred_dist, pred_total, true_dist): pred_sum = pred_dist.sum(dim=1, keepdim=True) true_sum = true_dist.sum(dim=1, keepdim=True) loss_consistency = self.mse_loss(pred_sum, pred_total) loss_total = self.mse_loss(pred_total, true_sum) return loss_consistency + loss_total def smoothness_loss(self, pred_dist): diff = pred_dist[:, 1:] - pred_dist[:, :-1] return torch.mean(diff ** 2) def damage_consistency_loss(self, physics_outputs): D0 = physics_outputs['D0'] predicted_D0 = physics_outputs['predicted_D0'] loss_D0 = self.mse_loss(predicted_D0, D0) lambda_coef = physics_outputs['lambda'] loss_lambda = torch.mean(torch.relu(-lambda_coef) + torch.relu(lambda_coef - 1.0)) return loss_D0 + loss_lambda def mogi_coulomb_loss(self, physics_outputs): tau_oct = physics_outputs['tau_oct'] yield_stress = physics_outputs['yield_stress'] loss_yield = torch.mean(torch.relu(tau_oct - yield_stress)) C1 = physics_outputs['C1'] C2 = physics_outputs['C2'] loss_params = torch.mean(torch.relu(-C1)) + torch.mean(torch.relu(C2 - 1.0)) return loss_yield + 0.1 * loss_params def regularization_loss(self, model): l2_reg = torch.tensor(0.0, device=next(model.parameters()).device) for param in model.parameters(): l2_reg += torch.norm(param, p=2) ** 2 return l2_reg def forward(self, pred_dist, pred_total, true_dist, model, physics_outputs=None): loss_data = self.data_loss(pred_dist, true_dist) loss_physics = self.physics_loss(pred_dist, pred_total, true_dist) loss_smooth = self.smoothness_loss(pred_dist) loss_reg = self.regularization_loss(model) loss_damage = torch.tensor(0.0, device=pred_dist.device) loss_mogi = torch.tensor(0.0, device=pred_dist.device) if physics_outputs is not None: loss_damage = self.damage_consistency_loss(physics_outputs) loss_mogi = self.mogi_coulomb_loss(physics_outputs) total_loss = (self.lambda_data * loss_data + self.lambda_physics * loss_physics + self.lambda_smooth * loss_smooth + self.lambda_damage * loss_damage + self.lambda_mogi * loss_mogi + self.lambda_reg * loss_reg) loss_dict = { 'total': total_loss.item(), 'data': loss_data.item(), 'physics': loss_physics.item(), 'smooth': loss_smooth.item(), 'damage': loss_damage.item(), 'mogi': loss_mogi.item(), 'reg': loss_reg.item() } return total_loss, loss_dict class CrackPINNTrainer: def __init__(self, model, device='cpu', lr=1e-3, weight_decay=1e-4): self.model = model.to(device) self.device = device self.optimizer = optim.AdamW( model.parameters(), lr=lr, weight_decay=weight_decay, betas=(0.9, 0.999) ) self.scheduler = optim.lr_scheduler.ReduceLROnPlateau( self.optimizer, mode='min', factor=0.5, patience=10, verbose=True ) self.criterion = CrackPINNLoss( lambda_data=1.0, lambda_physics=0.5, lambda_smooth=0.1, lambda_damage=0.3, lambda_mogi=0.2, lambda_reg=1e-4 ) self.train_losses = [] self.val_losses = [] def train_epoch(self, train_loader): self.model.train() epoch_losses = [] loss_components = { 'data': [], 'physics': [], 'smooth': [], 'damage': [], 'mogi': [], 'reg': [] } for X_batch, y_batch in train_loader: X_batch = X_batch.to(self.device) y_batch = y_batch.to(self.device) pred_dist, pred_total, physics_outputs = self.model(X_batch, return_physics=True) loss, loss_dict = self.criterion( pred_dist, pred_total, y_batch, self.model, physics_outputs ) self.optimizer.zero_grad() loss.backward() torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0) self.optimizer.step() epoch_losses.append(loss_dict['total']) for key in loss_components.keys(): loss_components[key].append(loss_dict[key]) avg_loss = np.mean(epoch_losses) avg_components = {key: np.mean(values) for key, values in loss_components.items()} return avg_loss, avg_components def validate(self, val_loader): self.model.eval() val_losses = [] all_preds = [] all_trues = [] with torch.no_grad(): for X_batch, y_batch in val_loader: X_batch = X_batch.to(self.device) y_batch = y_batch.to(self.device) pred_dist, pred_total = self.model(X_batch, return_physics=False) loss, _ = self.criterion(pred_dist, pred_total, y_batch, self.model) val_losses.append(loss.item()) all_preds.append(pred_dist.cpu().numpy()) all_trues.append(y_batch.cpu().numpy()) avg_loss = np.mean(val_losses) all_preds = np.concatenate(all_preds, axis=0) all_trues = np.concatenate(all_trues, axis=0) ss_res = np.sum((all_trues - all_preds) ** 2) ss_tot = np.sum((all_trues - np.mean(all_trues)) ** 2) r2 = 1 - (ss_res / (ss_tot + 1e-8)) rmse = np.sqrt(np.mean((all_trues - all_preds) ** 2)) pred_total_counts = all_preds.sum(axis=1) true_total_counts = all_trues.sum(axis=1) total_count_error = np.mean(np.abs(pred_total_counts - true_total_counts)) metrics = { 'r2': r2, 'rmse': rmse, 'total_count_mae': total_count_error } return avg_loss, metrics def fit(self, X_train, y_train, X_val, y_val, epochs=200, batch_size=16, patience=30): train_dataset = TensorDataset( torch.FloatTensor(X_train), torch.FloatTensor(y_train) ) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) val_dataset = TensorDataset( torch.FloatTensor(X_val), torch.FloatTensor(y_val) ) val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False) best_val_loss = float('inf') patience_counter = 0 best_model_state = None print("\nStarting training...") print("=" * 80) for epoch in range(epochs): train_loss, train_components = self.train_epoch(train_loader) val_loss, val_metrics = self.validate(val_loader) self.train_losses.append(train_loss) self.val_losses.append(val_loss) self.scheduler.step(val_loss) if (epoch + 1) % 10 == 0 or epoch == 0: print(f"Epoch {epoch+1}/{epochs}") print(f" Train Loss: {train_loss:.4f} " f"(data: {train_components['data']:.4f}, " f"phys: {train_components['physics']:.4f}, " f"damage: {train_components['damage']:.4f})") print(f" Val Loss: {val_loss:.4f} | " f"R2: {val_metrics['r2']:.4f} | " f"RMSE: {val_metrics['rmse']:.2f}") if val_loss < best_val_loss: best_val_loss = val_loss patience_counter = 0 best_model_state = self.model.state_dict().copy() else: patience_counter += 1 if patience_counter >= patience: print(f"\nEarly stopping. Best val loss: {best_val_loss:.4f}") break if best_model_state is not None: self.model.load_state_dict(best_model_state) print("=" * 80) print(f"Training complete. Best val loss: {best_val_loss:.4f}") def predict(self, X): self.model.eval() with torch.no_grad(): X_tensor = torch.FloatTensor(X).to(self.device) pred_dist, pred_total = self.model(X_tensor, return_physics=False) pred_dist = pred_dist.cpu().numpy() pred_total = pred_total.cpu().numpy().flatten() return pred_dist, pred_total def predict_with_physics(self, X): self.model.eval() with torch.no_grad(): X_tensor = torch.FloatTensor(X).to(self.device) pred_dist, pred_total, physics = self.model(X_tensor, return_physics=True) result = { 'angle_distribution': pred_dist.cpu().numpy(), 'total_count': pred_total.cpu().numpy().flatten(), 'D0': physics['D0'].cpu().numpy().flatten(), 'lambda': physics['lambda'].cpu().numpy().flatten(), 'D_n': physics['D_n'].cpu().numpy().flatten() } return result