Transformer-PINN / model.py
guanwencan's picture
Upload 5 files
5e4dee3 verified
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
class MogiCoulombLayer(nn.Module):
def __init__(self, hidden_dim):
super(MogiCoulombLayer, self).__init__()
self.C1_net = nn.Linear(hidden_dim, 1)
self.C2_net = nn.Linear(hidden_dim, 1)
def forward(self, features, sigma1, sigma2, sigma3):
C1 = torch.relu(self.C1_net(features))
C2 = torch.sigmoid(self.C2_net(features))
tau_oct = (1.0/3.0) * torch.sqrt(
(sigma1 - sigma2)**2 + (sigma2 - sigma3)**2 + (sigma1 - sigma3)**2
)
sigma_m2 = (sigma1 + sigma3) / 2.0
yield_stress = C1 + C2 * sigma_m2
return tau_oct, yield_stress, C1, C2
class WeibullStrengthLayer(nn.Module):
def __init__(self, hidden_dim):
super(WeibullStrengthLayer, self).__init__()
self.m_net = nn.Sequential(
nn.Linear(hidden_dim, 32),
nn.Tanh(),
nn.Linear(32, 1),
nn.Softplus()
)
self.F0_net = nn.Sequential(
nn.Linear(hidden_dim, 32),
nn.Tanh(),
nn.Linear(32, 1),
nn.Softplus()
)
def forward(self, features, F):
m = self.m_net(features) + 1.0
F0 = self.F0_net(features) + 0.1
D_q = 1.0 - torch.exp(-torch.pow(F / F0, m))
return D_q, m, F0
class EnergyDamageLayer(nn.Module):
def __init__(self, hidden_dim):
super(EnergyDamageLayer, self).__init__()
self.a_net = nn.Sequential(
nn.Linear(hidden_dim, 32),
nn.Tanh(),
nn.Linear(32, 1),
nn.Softplus()
)
self.b_net = nn.Sequential(
nn.Linear(hidden_dim, 32),
nn.Tanh(),
nn.Linear(32, 1),
nn.Softplus()
)
def forward(self, features, delta_sigma, D0):
a = self.a_net(features) + 0.1
b = self.b_net(features) + 0.01
effective_stress = delta_sigma / (1.0 - D0 + 1e-8)
U_p = a * torch.exp(b * effective_stress)
D_n = (2.0 / np.pi) * torch.atan(b * U_p)
return D_n, U_p, a, b
class CrackTransformerPINN(nn.Module):
def __init__(self, input_dim=5, output_dim=72, hidden_dims=[128, 256, 256, 128], dropout=0.2):
super(CrackTransformerPINN, self).__init__()
self.input_dim = input_dim
self.output_dim = output_dim
self.input_embedding = nn.Sequential(
nn.Linear(input_dim, hidden_dims[0]),
nn.LayerNorm(hidden_dims[0]),
nn.GELU(),
nn.Dropout(dropout * 0.5)
)
self.damage_encoder = nn.Sequential(
nn.Linear(input_dim, hidden_dims[0]),
nn.Tanh(),
nn.Linear(hidden_dims[0], hidden_dims[0])
)
encoder_layer = nn.TransformerEncoderLayer(
d_model=hidden_dims[0],
nhead=8,
dim_feedforward=hidden_dims[0] * 4,
dropout=dropout,
activation='gelu',
batch_first=True,
norm_first=True
)
self.transformer_encoder = nn.TransformerEncoder(
encoder_layer,
num_layers=4,
norm=nn.LayerNorm(hidden_dims[0])
)
self.mogi_coulomb = MogiCoulombLayer(hidden_dims[0])
self.weibull_strength = WeibullStrengthLayer(hidden_dims[0])
self.energy_damage = EnergyDamageLayer(hidden_dims[0])
self.angle_decoder = nn.ModuleList()
prev_dim = hidden_dims[0] * 2
for hidden_dim in hidden_dims[1:]:
self.angle_decoder.append(nn.Linear(prev_dim, hidden_dim))
self.angle_decoder.append(nn.LayerNorm(hidden_dim))
self.angle_decoder.append(nn.Tanh())
self.angle_decoder.append(nn.Dropout(dropout))
prev_dim = hidden_dim
self.angle_output = nn.Sequential(
nn.Linear(prev_dim, output_dim),
nn.ReLU()
)
self.total_count_head = nn.Sequential(
nn.Linear(hidden_dims[0] * 2, 64),
nn.Tanh(),
nn.Linear(64, 1),
nn.ReLU()
)
self.damage_factor_head = nn.Sequential(
nn.Linear(hidden_dims[0], 32),
nn.Tanh(),
nn.Linear(32, 1),
nn.Sigmoid()
)
def compute_initial_damage(self, pH, FN, FT, T):
D_ft = 0.002 * FN * torch.exp(0.02 * FT)
D_ch = 0.01 * torch.abs(pH - 7.0) ** 1.5
D_th = torch.where(
T > 100.0,
0.0003 * (T - 100.0) ** 1.2,
torch.zeros_like(T)
)
D_total = 1.0 - (1.0 - D_ft) * (1.0 - D_ch) * (1.0 - D_th)
D_total = torch.clamp(D_total, 0.0, 0.99)
return D_total
def forward(self, x, return_physics=False):
batch_size = x.shape[0]
pH = x[:, 0:1]
FN = x[:, 1:2]
FT = x[:, 2:3]
T = x[:, 3:4]
phase = x[:, 4:5]
D0 = self.compute_initial_damage(pH, FN, FT, T)
lambda_coef = 1.0 - D0
x_embedded = self.input_embedding(x)
damage_features = self.damage_encoder(x)
x_seq = x_embedded.unsqueeze(1)
encoded = self.transformer_encoder(x_seq)
encoded = encoded.squeeze(1)
combined = torch.cat([encoded, damage_features], dim=-1)
h = combined
for layer in self.angle_decoder:
h = layer(h)
angle_dist = self.angle_output(h)
total_count = self.total_count_head(combined)
predicted_D0 = self.damage_factor_head(encoded)
if return_physics:
sigma1 = 100.0 * torch.ones(batch_size, 1, device=x.device)
sigma2 = 50.0 * torch.ones(batch_size, 1, device=x.device)
sigma3 = 30.0 * torch.ones(batch_size, 1, device=x.device)
delta_sigma = 20.0 * torch.ones(batch_size, 1, device=x.device)
F_contact = 10.0 * torch.ones(batch_size, 1, device=x.device)
tau_oct, yield_stress, C1, C2 = self.mogi_coulomb(encoded, sigma1, sigma2, sigma3)
D_q, m, F0 = self.weibull_strength(encoded, F_contact)
D_n, U_p, a, b = self.energy_damage(encoded, delta_sigma, D0)
physics_outputs = {
'D0': D0,
'lambda': lambda_coef,
'predicted_D0': predicted_D0,
'tau_oct': tau_oct,
'yield_stress': yield_stress,
'C1': C1,
'C2': C2,
'D_q': D_q,
'm': m,
'F0': F0,
'D_n': D_n,
'U_p': U_p,
'a': a,
'b': b
}
return angle_dist, total_count, physics_outputs
return angle_dist, total_count
class CrackPINNLoss(nn.Module):
def __init__(self, lambda_data=1.0, lambda_physics=0.5, lambda_smooth=0.1,
lambda_damage=0.3, lambda_mogi=0.2, lambda_reg=1e-4):
super(CrackPINNLoss, self).__init__()
self.lambda_data = lambda_data
self.lambda_physics = lambda_physics
self.lambda_smooth = lambda_smooth
self.lambda_damage = lambda_damage
self.lambda_mogi = lambda_mogi
self.lambda_reg = lambda_reg
self.mse_loss = nn.MSELoss()
def data_loss(self, pred_dist, true_dist):
return self.mse_loss(pred_dist, true_dist)
def physics_loss(self, pred_dist, pred_total, true_dist):
pred_sum = pred_dist.sum(dim=1, keepdim=True)
true_sum = true_dist.sum(dim=1, keepdim=True)
loss_consistency = self.mse_loss(pred_sum, pred_total)
loss_total = self.mse_loss(pred_total, true_sum)
return loss_consistency + loss_total
def smoothness_loss(self, pred_dist):
diff = pred_dist[:, 1:] - pred_dist[:, :-1]
return torch.mean(diff ** 2)
def damage_consistency_loss(self, physics_outputs):
D0 = physics_outputs['D0']
predicted_D0 = physics_outputs['predicted_D0']
loss_D0 = self.mse_loss(predicted_D0, D0)
lambda_coef = physics_outputs['lambda']
loss_lambda = torch.mean(torch.relu(-lambda_coef) + torch.relu(lambda_coef - 1.0))
return loss_D0 + loss_lambda
def mogi_coulomb_loss(self, physics_outputs):
tau_oct = physics_outputs['tau_oct']
yield_stress = physics_outputs['yield_stress']
loss_yield = torch.mean(torch.relu(tau_oct - yield_stress))
C1 = physics_outputs['C1']
C2 = physics_outputs['C2']
loss_params = torch.mean(torch.relu(-C1)) + torch.mean(torch.relu(C2 - 1.0))
return loss_yield + 0.1 * loss_params
def regularization_loss(self, model):
l2_reg = torch.tensor(0.0, device=next(model.parameters()).device)
for param in model.parameters():
l2_reg += torch.norm(param, p=2) ** 2
return l2_reg
def forward(self, pred_dist, pred_total, true_dist, model, physics_outputs=None):
loss_data = self.data_loss(pred_dist, true_dist)
loss_physics = self.physics_loss(pred_dist, pred_total, true_dist)
loss_smooth = self.smoothness_loss(pred_dist)
loss_reg = self.regularization_loss(model)
loss_damage = torch.tensor(0.0, device=pred_dist.device)
loss_mogi = torch.tensor(0.0, device=pred_dist.device)
if physics_outputs is not None:
loss_damage = self.damage_consistency_loss(physics_outputs)
loss_mogi = self.mogi_coulomb_loss(physics_outputs)
total_loss = (self.lambda_data * loss_data +
self.lambda_physics * loss_physics +
self.lambda_smooth * loss_smooth +
self.lambda_damage * loss_damage +
self.lambda_mogi * loss_mogi +
self.lambda_reg * loss_reg)
loss_dict = {
'total': total_loss.item(),
'data': loss_data.item(),
'physics': loss_physics.item(),
'smooth': loss_smooth.item(),
'damage': loss_damage.item(),
'mogi': loss_mogi.item(),
'reg': loss_reg.item()
}
return total_loss, loss_dict
class CrackPINNTrainer:
def __init__(self, model, device='cpu', lr=1e-3, weight_decay=1e-4):
self.model = model.to(device)
self.device = device
self.optimizer = optim.AdamW(
model.parameters(),
lr=lr,
weight_decay=weight_decay,
betas=(0.9, 0.999)
)
self.scheduler = optim.lr_scheduler.ReduceLROnPlateau(
self.optimizer,
mode='min',
factor=0.5,
patience=10,
verbose=True
)
self.criterion = CrackPINNLoss(
lambda_data=1.0,
lambda_physics=0.5,
lambda_smooth=0.1,
lambda_damage=0.3,
lambda_mogi=0.2,
lambda_reg=1e-4
)
self.train_losses = []
self.val_losses = []
def train_epoch(self, train_loader):
self.model.train()
epoch_losses = []
loss_components = {
'data': [], 'physics': [], 'smooth': [],
'damage': [], 'mogi': [], 'reg': []
}
for X_batch, y_batch in train_loader:
X_batch = X_batch.to(self.device)
y_batch = y_batch.to(self.device)
pred_dist, pred_total, physics_outputs = self.model(X_batch, return_physics=True)
loss, loss_dict = self.criterion(
pred_dist, pred_total, y_batch, self.model, physics_outputs
)
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
self.optimizer.step()
epoch_losses.append(loss_dict['total'])
for key in loss_components.keys():
loss_components[key].append(loss_dict[key])
avg_loss = np.mean(epoch_losses)
avg_components = {key: np.mean(values) for key, values in loss_components.items()}
return avg_loss, avg_components
def validate(self, val_loader):
self.model.eval()
val_losses = []
all_preds = []
all_trues = []
with torch.no_grad():
for X_batch, y_batch in val_loader:
X_batch = X_batch.to(self.device)
y_batch = y_batch.to(self.device)
pred_dist, pred_total = self.model(X_batch, return_physics=False)
loss, _ = self.criterion(pred_dist, pred_total, y_batch, self.model)
val_losses.append(loss.item())
all_preds.append(pred_dist.cpu().numpy())
all_trues.append(y_batch.cpu().numpy())
avg_loss = np.mean(val_losses)
all_preds = np.concatenate(all_preds, axis=0)
all_trues = np.concatenate(all_trues, axis=0)
ss_res = np.sum((all_trues - all_preds) ** 2)
ss_tot = np.sum((all_trues - np.mean(all_trues)) ** 2)
r2 = 1 - (ss_res / (ss_tot + 1e-8))
rmse = np.sqrt(np.mean((all_trues - all_preds) ** 2))
pred_total_counts = all_preds.sum(axis=1)
true_total_counts = all_trues.sum(axis=1)
total_count_error = np.mean(np.abs(pred_total_counts - true_total_counts))
metrics = {
'r2': r2,
'rmse': rmse,
'total_count_mae': total_count_error
}
return avg_loss, metrics
def fit(self, X_train, y_train, X_val, y_val, epochs=200, batch_size=16, patience=30):
train_dataset = TensorDataset(
torch.FloatTensor(X_train),
torch.FloatTensor(y_train)
)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataset = TensorDataset(
torch.FloatTensor(X_val),
torch.FloatTensor(y_val)
)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
best_val_loss = float('inf')
patience_counter = 0
best_model_state = None
print("\nStarting training...")
print("=" * 80)
for epoch in range(epochs):
train_loss, train_components = self.train_epoch(train_loader)
val_loss, val_metrics = self.validate(val_loader)
self.train_losses.append(train_loss)
self.val_losses.append(val_loss)
self.scheduler.step(val_loss)
if (epoch + 1) % 10 == 0 or epoch == 0:
print(f"Epoch {epoch+1}/{epochs}")
print(f" Train Loss: {train_loss:.4f} "
f"(data: {train_components['data']:.4f}, "
f"phys: {train_components['physics']:.4f}, "
f"damage: {train_components['damage']:.4f})")
print(f" Val Loss: {val_loss:.4f} | "
f"R2: {val_metrics['r2']:.4f} | "
f"RMSE: {val_metrics['rmse']:.2f}")
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
best_model_state = self.model.state_dict().copy()
else:
patience_counter += 1
if patience_counter >= patience:
print(f"\nEarly stopping. Best val loss: {best_val_loss:.4f}")
break
if best_model_state is not None:
self.model.load_state_dict(best_model_state)
print("=" * 80)
print(f"Training complete. Best val loss: {best_val_loss:.4f}")
def predict(self, X):
self.model.eval()
with torch.no_grad():
X_tensor = torch.FloatTensor(X).to(self.device)
pred_dist, pred_total = self.model(X_tensor, return_physics=False)
pred_dist = pred_dist.cpu().numpy()
pred_total = pred_total.cpu().numpy().flatten()
return pred_dist, pred_total
def predict_with_physics(self, X):
self.model.eval()
with torch.no_grad():
X_tensor = torch.FloatTensor(X).to(self.device)
pred_dist, pred_total, physics = self.model(X_tensor, return_physics=True)
result = {
'angle_distribution': pred_dist.cpu().numpy(),
'total_count': pred_total.cpu().numpy().flatten(),
'D0': physics['D0'].cpu().numpy().flatten(),
'lambda': physics['lambda'].cpu().numpy().flatten(),
'D_n': physics['D_n'].cpu().numpy().flatten()
}
return result