import torch import numpy as np import matplotlib.pyplot as plt from Dataset import Dataset from model import NeuralNetwork DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # Set global plotting parameters plt.rcParams.update({'font.size': 14, 'figure.figsize': (10, 8), 'lines.linewidth': 2, 'lines.markersize': 6, 'axes.grid': True, 'axes.labelsize': 16, 'legend.fontsize': 14, 'xtick.labelsize': 14, 'ytick.labelsize': 14, 'figure.autolayout': True }) def set_seed(seed=42): np.random.seed(seed) torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed) def train_neural_network(model, inputs, outputs, optimizer, epochs=1000, lr_scheduler=None): model.train() for epoch in range(epochs): optimizer.zero_grad() predictions = model(inputs) loss = torch.mean(torch.square(predictions - outputs)) loss.backward() optimizer.step() if lr_scheduler: lr_scheduler.step() if epoch % 100 == 0: print(f'Epoch {epoch}, Loss: {loss.item()}, Learning Rate: {optimizer.param_groups[0]["lr"]}') def main(): set_seed(42) dataset = Dataset(mat_name='FRP') # Load raw data; normalize using train-only statistics to avoid leakage. inputs = dataset.get_input(normalize=False) outputs = dataset.get_output(normalize=False) # Train/val/test split for early stopping and unbiased test. n = len(inputs) perm = np.random.permutation(n) n_train = int(0.8 * n) n_val = int(0.1 * n) idx_train = perm[:n_train] idx_val = perm[n_train:n_train + n_val] idx_test = perm[n_train + n_val:] # Fit normalization on train split only. input_mean = inputs[idx_train].mean(axis=0) input_std = inputs[idx_train].std(axis=0) + 1e-8 output_mean = outputs[idx_train].mean(axis=0) output_std = outputs[idx_train].std(axis=0) + 1e-8 inputs_norm = (inputs - input_mean) / input_std outputs_norm = (outputs - output_mean) / output_std inputs_train = torch.tensor(inputs_norm[idx_train], dtype=torch.float32).to(DEVICE) outputs_train = torch.tensor(outputs_norm[idx_train], dtype=torch.float32).to(DEVICE) inputs_val = torch.tensor(inputs_norm[idx_val], dtype=torch.float32).to(DEVICE) outputs_val = torch.tensor(outputs_norm[idx_val], dtype=torch.float32).to(DEVICE) inputs_test = torch.tensor(inputs_norm[idx_test], dtype=torch.float32).to(DEVICE) outputs_test = torch.tensor(outputs_norm[idx_test], dtype=torch.float32).to(DEVICE) # Linear regression baseline on normalized data. X_train = np.concatenate([inputs_norm[idx_train], np.ones((len(idx_train), 1), dtype=np.float32)], axis=1) Y_train = outputs_norm[idx_train] coef, _, _, _ = np.linalg.lstsq(X_train, Y_train, rcond=None) def linear_predict(x_norm): X = np.concatenate([x_norm, np.ones((len(x_norm), 1), dtype=np.float32)], axis=1) return X @ coef val_pred_lr = linear_predict(inputs_norm[idx_val]) test_pred_lr = linear_predict(inputs_norm[idx_test]) val_mse_lr = np.mean((val_pred_lr - outputs_norm[idx_val]) ** 2) test_mse_lr = np.mean((test_pred_lr - outputs_norm[idx_test]) ** 2) print(f'Linear baseline - Val Loss: {val_mse_lr:.6f}, Test Loss: {test_mse_lr:.6f}') # Smaller model to reduce overfitting on small data. layer_sizes = [inputs.shape[1]] + [32] * 2 + [outputs.shape[1]] dropout_rate = 0.2 model = NeuralNetwork(layer_sizes, dropout_rate=dropout_rate, activation=torch.nn.ReLU).to(DEVICE) optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4) lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5000, gamma=0.9) # Create a proper dataset that keeps input-output pairs together train_dataset = torch.utils.data.TensorDataset(inputs_train, outputs_train) train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True) # Train the model epochs = 10000 best_val = float('inf') best_state = None patience = 800 patience_left = patience for epoch in range(epochs): model.train() for inputs_batch, outputs_batch in train_loader: inputs_batch = inputs_batch.to(DEVICE) outputs_batch = outputs_batch.to(DEVICE) optimizer.zero_grad() predictions = model(inputs_batch) loss = torch.mean(torch.square(predictions - outputs_batch)) loss.backward() optimizer.step() if lr_scheduler: lr_scheduler.step() if epoch % 500 == 0: model.eval() with torch.no_grad(): train_pred = model(inputs_train) train_loss = torch.mean(torch.square(train_pred - outputs_train)) val_pred = model(inputs_val) val_loss = torch.mean(torch.square(val_pred - outputs_val)) print(f'Epoch {epoch}, Train Loss: {train_loss.item():.6f}, Val Loss: {val_loss.item():.6f}') # Early stopping on validation loss (checked every epoch). model.eval() with torch.no_grad(): val_pred = model(inputs_val) val_loss = torch.mean(torch.square(val_pred - outputs_val)) if val_loss.item() < best_val - 1e-5: best_val = val_loss.item() best_state = {k: v.clone() for k, v in model.state_dict().items()} patience_left = patience else: patience_left -= 1 if patience_left <= 0: print(f'Early stopping at epoch {epoch}. Best val loss: {best_val:.6f}') break if best_state is not None: model.load_state_dict(best_state) # MC Dropout inference for predictive mean/uncertainty. def mc_dropout_predict(model, x, n_samples=50): model.train() # keep dropout active preds = [] with torch.no_grad(): for _ in range(n_samples): preds.append(model(x).unsqueeze(0)) preds = torch.cat(preds, dim=0) return preds.mean(dim=0), preds.std(dim=0) predictions, pred_std = mc_dropout_predict(model, inputs_test, n_samples=50) test_loss = torch.mean(torch.square(predictions - outputs_test)) print(f'Test Loss: {test_loss.item()}. Samples: {idx_test}') x = np.arange(0, len(idx_test)) outputs_test = outputs_test.cpu().numpy() * output_std + output_mean predictions = predictions.cpu().numpy() * output_std + output_mean pred_std = pred_std.cpu().numpy() * output_std print(f'Predictive STD (A, B, C): {pred_std.mean(axis=0)}') plt.figure(figsize=(10, 6)) plt.plot(x, outputs_test[:, 0], color='b', linestyle='--', label='True A') plt.plot(x, predictions[:, 0], color='b', linestyle='-', label='Predicted A') plt.plot(x, outputs_test[:, 1], color='r', linestyle='--', label='True B') plt.plot(x, predictions[:, 1], color='r', linestyle='-', label='Predicted B') plt.plot(x, outputs_test[:, 2], color='g', linestyle='--', label='True C') plt.plot(x, predictions[:, 2], color='g', linestyle='-', label='Predicted C') plt.gca().xaxis.set_major_locator(plt.MaxNLocator(integer=True)) plt.xlabel('Sample Index') plt.xticks(ticks=range(len(idx_test)),labels=idx_test + 1) plt.ylabel('Angle (Degrees)') plt.title('Angle Prediction') plt.legend(loc='upper right') plt.savefig('angle_prediction.png') # MSE mse = np.mean((predictions - outputs_test) ** 2, axis=0) print(f'Mean Squared Error for A: {mse[0]:.6f}, B: {mse[1]:.6f}, C: {mse[2]:.6f}') # R 2 score ss_ress = np.sum((outputs_test - predictions) ** 2, axis=0) ss_tots = np.sum((outputs_test - np.mean(outputs_test, axis=0)) ** 2, axis=0) r2_scores = 1 - ss_ress / ss_tots print(f'R² Score for A: {r2_scores[0]:.6f}, B: {r2_scores[1]:.6f}, C: {r2_scores[2]:.6f}') # Error # Save the model model_save_path = './model_checkpoint.pth' model_config = {'layer_sizes': layer_sizes, 'dropout_rate': dropout_rate } checkpoint = { 'model_state_dict': model.state_dict(), 'model_config': model_config } torch.save(checkpoint, model_save_path) def load_model(model_path): checkpoint = torch.load(model_path) model_config = checkpoint['model_config'] model = NeuralNetwork(model_config['layer_sizes'], dropout_rate=model_config['dropout_rate'], activation=torch.nn.ReLU).to(DEVICE) model.load_state_dict(checkpoint['model_state_dict']) print(f"Model loaded from {model_path}") return model if __name__ == "__main__": main() # model = load_model('./model_checkpoint.pth').to(torch.device('cpu'))