Injection_Molding_Design / main_injection.py
Rui Wan
upload model
6977ea8
import torch
import numpy as np
import matplotlib.pyplot as plt
from Dataset import Dataset
from model import NeuralNetwork
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Set global plotting parameters
plt.rcParams.update({'font.size': 14,
'figure.figsize': (10, 8),
'lines.linewidth': 2,
'lines.markersize': 6,
'axes.grid': True,
'axes.labelsize': 16,
'legend.fontsize': 14,
'xtick.labelsize': 14,
'ytick.labelsize': 14,
'figure.autolayout': True
})
def set_seed(seed=42):
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
def train_neural_network(model, inputs, outputs, optimizer, epochs=1000, lr_scheduler=None):
model.train()
for epoch in range(epochs):
optimizer.zero_grad()
predictions = model(inputs)
loss = torch.mean(torch.square(predictions - outputs))
loss.backward()
optimizer.step()
if lr_scheduler:
lr_scheduler.step()
if epoch % 100 == 0:
print(f'Epoch {epoch}, Loss: {loss.item()}, Learning Rate: {optimizer.param_groups[0]["lr"]}')
def main():
set_seed(42)
dataset = Dataset(mat_name='FRP')
# Load raw data; normalize using train-only statistics to avoid leakage.
inputs = dataset.get_input(normalize=False)
outputs = dataset.get_output(normalize=False)
# Train/val/test split for early stopping and unbiased test.
n = len(inputs)
perm = np.random.permutation(n)
n_train = int(0.8 * n)
n_val = int(0.1 * n)
idx_train = perm[:n_train]
idx_val = perm[n_train:n_train + n_val]
idx_test = perm[n_train + n_val:]
# Fit normalization on train split only.
input_mean = inputs[idx_train].mean(axis=0)
input_std = inputs[idx_train].std(axis=0) + 1e-8
output_mean = outputs[idx_train].mean(axis=0)
output_std = outputs[idx_train].std(axis=0) + 1e-8
inputs_norm = (inputs - input_mean) / input_std
outputs_norm = (outputs - output_mean) / output_std
inputs_train = torch.tensor(inputs_norm[idx_train], dtype=torch.float32).to(DEVICE)
outputs_train = torch.tensor(outputs_norm[idx_train], dtype=torch.float32).to(DEVICE)
inputs_val = torch.tensor(inputs_norm[idx_val], dtype=torch.float32).to(DEVICE)
outputs_val = torch.tensor(outputs_norm[idx_val], dtype=torch.float32).to(DEVICE)
inputs_test = torch.tensor(inputs_norm[idx_test], dtype=torch.float32).to(DEVICE)
outputs_test = torch.tensor(outputs_norm[idx_test], dtype=torch.float32).to(DEVICE)
# Linear regression baseline on normalized data.
X_train = np.concatenate([inputs_norm[idx_train], np.ones((len(idx_train), 1), dtype=np.float32)], axis=1)
Y_train = outputs_norm[idx_train]
coef, _, _, _ = np.linalg.lstsq(X_train, Y_train, rcond=None)
def linear_predict(x_norm):
X = np.concatenate([x_norm, np.ones((len(x_norm), 1), dtype=np.float32)], axis=1)
return X @ coef
val_pred_lr = linear_predict(inputs_norm[idx_val])
test_pred_lr = linear_predict(inputs_norm[idx_test])
val_mse_lr = np.mean((val_pred_lr - outputs_norm[idx_val]) ** 2)
test_mse_lr = np.mean((test_pred_lr - outputs_norm[idx_test]) ** 2)
print(f'Linear baseline - Val Loss: {val_mse_lr:.6f}, Test Loss: {test_mse_lr:.6f}')
# Smaller model to reduce overfitting on small data.
layer_sizes = [inputs.shape[1]] + [32] * 2 + [outputs.shape[1]]
dropout_rate = 0.2
model = NeuralNetwork(layer_sizes, dropout_rate=dropout_rate, activation=torch.nn.ReLU).to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5000, gamma=0.9)
# Create a proper dataset that keeps input-output pairs together
train_dataset = torch.utils.data.TensorDataset(inputs_train, outputs_train)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)
# Train the model
epochs = 10000
best_val = float('inf')
best_state = None
patience = 800
patience_left = patience
for epoch in range(epochs):
model.train()
for inputs_batch, outputs_batch in train_loader:
inputs_batch = inputs_batch.to(DEVICE)
outputs_batch = outputs_batch.to(DEVICE)
optimizer.zero_grad()
predictions = model(inputs_batch)
loss = torch.mean(torch.square(predictions - outputs_batch))
loss.backward()
optimizer.step()
if lr_scheduler:
lr_scheduler.step()
if epoch % 500 == 0:
model.eval()
with torch.no_grad():
train_pred = model(inputs_train)
train_loss = torch.mean(torch.square(train_pred - outputs_train))
val_pred = model(inputs_val)
val_loss = torch.mean(torch.square(val_pred - outputs_val))
print(f'Epoch {epoch}, Train Loss: {train_loss.item():.6f}, Val Loss: {val_loss.item():.6f}')
# Early stopping on validation loss (checked every epoch).
model.eval()
with torch.no_grad():
val_pred = model(inputs_val)
val_loss = torch.mean(torch.square(val_pred - outputs_val))
if val_loss.item() < best_val - 1e-5:
best_val = val_loss.item()
best_state = {k: v.clone() for k, v in model.state_dict().items()}
patience_left = patience
else:
patience_left -= 1
if patience_left <= 0:
print(f'Early stopping at epoch {epoch}. Best val loss: {best_val:.6f}')
break
if best_state is not None:
model.load_state_dict(best_state)
# MC Dropout inference for predictive mean/uncertainty.
def mc_dropout_predict(model, x, n_samples=50):
model.train() # keep dropout active
preds = []
with torch.no_grad():
for _ in range(n_samples):
preds.append(model(x).unsqueeze(0))
preds = torch.cat(preds, dim=0)
return preds.mean(dim=0), preds.std(dim=0)
predictions, pred_std = mc_dropout_predict(model, inputs_test, n_samples=50)
test_loss = torch.mean(torch.square(predictions - outputs_test))
print(f'Test Loss: {test_loss.item()}. Samples: {idx_test}')
x = np.arange(0, len(idx_test))
outputs_test = outputs_test.cpu().numpy() * output_std + output_mean
predictions = predictions.cpu().numpy() * output_std + output_mean
pred_std = pred_std.cpu().numpy() * output_std
print(f'Predictive STD (A, B, C): {pred_std.mean(axis=0)}')
plt.figure(figsize=(10, 6))
plt.plot(x, outputs_test[:, 0], color='b', linestyle='--', label='True A')
plt.plot(x, predictions[:, 0], color='b', linestyle='-', label='Predicted A')
plt.plot(x, outputs_test[:, 1], color='r', linestyle='--', label='True B')
plt.plot(x, predictions[:, 1], color='r', linestyle='-', label='Predicted B')
plt.plot(x, outputs_test[:, 2], color='g', linestyle='--', label='True C')
plt.plot(x, predictions[:, 2], color='g', linestyle='-', label='Predicted C')
plt.gca().xaxis.set_major_locator(plt.MaxNLocator(integer=True))
plt.xlabel('Sample Index')
plt.xticks(ticks=range(len(idx_test)),labels=idx_test + 1)
plt.ylabel('Angle (Degrees)')
plt.title('Angle Prediction')
plt.legend(loc='upper right')
plt.savefig('angle_prediction.png')
# MSE
mse = np.mean((predictions - outputs_test) ** 2, axis=0)
print(f'Mean Squared Error for A: {mse[0]:.6f}, B: {mse[1]:.6f}, C: {mse[2]:.6f}')
# R 2 score
ss_ress = np.sum((outputs_test - predictions) ** 2, axis=0)
ss_tots = np.sum((outputs_test - np.mean(outputs_test, axis=0)) ** 2, axis=0)
r2_scores = 1 - ss_ress / ss_tots
print(f'R² Score for A: {r2_scores[0]:.6f}, B: {r2_scores[1]:.6f}, C: {r2_scores[2]:.6f}')
# Error
# Save the model
model_save_path = './model_checkpoint.pth'
model_config = {'layer_sizes': layer_sizes,
'dropout_rate': dropout_rate
}
checkpoint = {
'model_state_dict': model.state_dict(),
'model_config': model_config
}
torch.save(checkpoint, model_save_path)
def load_model(model_path):
checkpoint = torch.load(model_path)
model_config = checkpoint['model_config']
model = NeuralNetwork(model_config['layer_sizes'], dropout_rate=model_config['dropout_rate'], activation=torch.nn.ReLU).to(DEVICE)
model.load_state_dict(checkpoint['model_state_dict'])
print(f"Model loaded from {model_path}")
return model
if __name__ == "__main__":
main()
# model = load_model('./model_checkpoint.pth').to(torch.device('cpu'))