Spaces:
Configuration error
Configuration error
| import timeit | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import torch.optim as optim | |
| from sklearn.metrics import roc_auc_score | |
| import NIPS_GNN.preprocess as pp | |
| from datetime import datetime | |
| import pickle | |
| # import inference as predict | |
| nips_gnn = { | |
| 'Tg' : { | |
| 'radius': 1, | |
| 'dim': 50, | |
| 'layer_hidden': 6, | |
| 'layer_output': 6, | |
| 'batch_train': 32, | |
| 'batch_test': 32, | |
| 'lr': 1e-4, | |
| 'lr_decay': 0.99, | |
| 'decay_interval': 10, | |
| 'iteration': 14, | |
| 'n_fingerprints': 651, | |
| }, | |
| 'FFV' : { | |
| 'radius': 1, | |
| 'dim': 100, | |
| 'layer_hidden': 12, | |
| 'layer_output': 12, | |
| 'batch_train': 16, | |
| 'batch_test': 16, | |
| 'lr': 1e-5, | |
| 'lr_decay': 0.99, | |
| 'decay_interval': 10, | |
| 'iteration': 100, | |
| 'n_fingerprints': 549, | |
| }, | |
| } | |
| task = 'regression' # 'regression' | |
| dataset = 'FFV' # 'Tg' or 'FFV' | |
| radius=nips_gnn[dataset]['radius'] | |
| dim=nips_gnn[dataset]['dim'] | |
| layer_hidden=nips_gnn[dataset]['layer_hidden'] | |
| layer_output=nips_gnn[dataset]['layer_output'] | |
| batch_train=nips_gnn[dataset]['batch_train'] | |
| batch_test=nips_gnn[dataset]['batch_test'] | |
| lr=nips_gnn[dataset]['lr'] | |
| lr_decay=nips_gnn[dataset]['lr_decay'] | |
| decay_interval=nips_gnn[dataset]['decay_interval'] | |
| iteration=nips_gnn[dataset]['iteration'] | |
| if torch.cuda.is_available(): | |
| device = torch.device('cuda') | |
| print('The code uses a GPU!') | |
| else: | |
| device = torch.device('cpu') | |
| print('The code uses a CPU...') | |
| class MolecularGraphNeuralNetwork(nn.Module): | |
| def __init__(self, N_fingerprints, dim, layer_hidden, layer_output): | |
| super(MolecularGraphNeuralNetwork, self).__init__() | |
| self.embed_fingerprint = nn.Embedding(N_fingerprints, dim) | |
| self.W_fingerprint = nn.ModuleList([nn.Linear(dim, dim) | |
| for _ in range(layer_hidden)]) | |
| self.W_output = nn.ModuleList([nn.Linear(dim, dim) | |
| for _ in range(layer_output)]) | |
| if task == 'classification': | |
| self.W_property = nn.Linear(dim, 2) | |
| if task == 'regression': | |
| self.W_property = nn.Linear(dim, 1) | |
| def pad(self, matrices, pad_value): | |
| """Pad the list of matrices | |
| with a pad_value (e.g., 0) for batch processing. | |
| For example, given a list of matrices [A, B, C], | |
| we obtain a new matrix [A00, 0B0, 00C], | |
| where 0 is the zero (i.e., pad value) matrix. | |
| """ | |
| shapes = [m.shape for m in matrices] | |
| M, N = sum([s[0] for s in shapes]), sum([s[1] for s in shapes]) | |
| zeros = torch.FloatTensor(np.zeros((M, N))).to(device) | |
| pad_matrices = pad_value + zeros | |
| i, j = 0, 0 | |
| for k, matrix in enumerate(matrices): | |
| m, n = shapes[k] | |
| pad_matrices[i:i+m, j:j+n] = matrix | |
| i += m | |
| j += n | |
| return pad_matrices | |
| def update(self, matrix, vectors, layer): | |
| hidden_vectors = torch.relu(self.W_fingerprint[layer](vectors)) | |
| return hidden_vectors + torch.matmul(matrix, hidden_vectors) | |
| def sum(self, vectors, axis): | |
| sum_vectors = [torch.sum(v, 0) for v in torch.split(vectors, axis)] | |
| return torch.stack(sum_vectors) | |
| def mean(self, vectors, axis): | |
| mean_vectors = [torch.mean(v, 0) for v in torch.split(vectors, axis)] | |
| return torch.stack(mean_vectors) | |
| def gnn(self, inputs): | |
| """Cat or pad each input data for batch processing.""" | |
| fingerprints, adjacencies, molecular_sizes = inputs | |
| fingerprints = torch.cat(fingerprints) | |
| adjacencies = self.pad(adjacencies, 0) | |
| """GNN layer (update the fingerprint vectors).""" | |
| fingerprint_vectors = self.embed_fingerprint(fingerprints) | |
| for l in range(layer_hidden): | |
| hs = self.update(adjacencies, fingerprint_vectors, l) | |
| fingerprint_vectors = F.normalize(hs, 2, 1) # normalize. | |
| """Molecular vector by sum or mean of the fingerprint vectors.""" | |
| molecular_vectors = self.sum(fingerprint_vectors, molecular_sizes) | |
| # molecular_vectors = self.mean(fingerprint_vectors, molecular_sizes) | |
| return molecular_vectors | |
| def mlp(self, vectors): | |
| """Classifier or regressor based on multilayer perceptron.""" | |
| for l in range(layer_output): | |
| vectors = torch.relu(self.W_output[l](vectors)) | |
| outputs = self.W_property(vectors) | |
| return outputs | |
| def forward_classifier(self, data_batch, train): | |
| inputs = data_batch[:-1] | |
| correct_labels = torch.cat(data_batch[-1]) | |
| if train: | |
| molecular_vectors = self.gnn(inputs) | |
| predicted_scores = self.mlp(molecular_vectors) | |
| loss = F.cross_entropy(predicted_scores, correct_labels) | |
| return loss | |
| else: | |
| with torch.no_grad(): | |
| molecular_vectors = self.gnn(inputs) | |
| predicted_scores = self.mlp(molecular_vectors) | |
| predicted_scores = predicted_scores.to('cpu').data.numpy() | |
| predicted_scores = [s[1] for s in predicted_scores] | |
| correct_labels = correct_labels.to('cpu').data.numpy() | |
| return predicted_scores, correct_labels | |
| def forward_regressor(self, data_batch, train): | |
| inputs = data_batch[:-1] | |
| correct_values = torch.cat(data_batch[-1]) | |
| if train: | |
| molecular_vectors = self.gnn(inputs) | |
| predicted_values = self.mlp(molecular_vectors) | |
| loss = F.mse_loss(predicted_values, correct_values) | |
| return loss | |
| else: | |
| with torch.no_grad(): | |
| molecular_vectors = self.gnn(inputs) | |
| predicted_values = self.mlp(molecular_vectors) | |
| predicted_values = predicted_values.to('cpu').data.numpy() | |
| correct_values = correct_values.to('cpu').data.numpy() | |
| predicted_values = np.concatenate(predicted_values) | |
| correct_values = np.concatenate(correct_values) | |
| return predicted_values, correct_values | |
| class Trainer(object): | |
| def __init__(self, model): | |
| self.model = model | |
| self.optimizer = optim.Adam(self.model.parameters(), lr=lr) | |
| def train(self, dataset): | |
| np.random.shuffle(dataset) | |
| N = len(dataset) | |
| loss_total = 0 | |
| for i in range(0, N, batch_train): | |
| data_batch = list(zip(*dataset[i:i+batch_train])) | |
| if task == 'classification': | |
| loss = self.model.forward_classifier(data_batch, train=True) | |
| if task == 'regression': | |
| loss = self.model.forward_regressor(data_batch, train=True) | |
| self.optimizer.zero_grad() | |
| loss.backward() | |
| self.optimizer.step() | |
| loss_total += loss.item() | |
| return loss_total | |
| class Tester(object): | |
| def __init__(self, model): | |
| self.model = model | |
| def test_classifier(self, dataset): | |
| N = len(dataset) | |
| P, C = [], [] | |
| for i in range(0, N, batch_test): | |
| data_batch = list(zip(*dataset[i:i+batch_test])) | |
| predicted_scores, correct_labels = self.model.forward_classifier( | |
| data_batch, train=False) | |
| P.append(predicted_scores) | |
| C.append(correct_labels) | |
| AUC = roc_auc_score(np.concatenate(C), np.concatenate(P)) | |
| return AUC | |
| def test_regressor(self, dataset): | |
| N = len(dataset) | |
| SAE = 0 # sum absolute error. | |
| for i in range(0, N, batch_test): | |
| data_batch = list(zip(*dataset[i:i+batch_test])) | |
| predicted_values, correct_values = self.model.forward_regressor( | |
| data_batch, train=False) | |
| SAE += sum(np.abs(predicted_values-correct_values)) | |
| MAE = SAE / N # mean absolute error. | |
| return MAE | |
| def predict_regressor(self, dataset): | |
| N = len(dataset) | |
| predictions = [] | |
| for i in range(0, N, 1): | |
| data_batch = list(zip(*dataset[i:i+1])) | |
| predicted_values = self.model.forward_regressor( | |
| data_batch, train=False) | |
| predictions.append(predicted_values) | |
| return predictions | |
| def save_result(self, result, filename): | |
| with open(filename, 'a') as f: | |
| f.write(result + '\n') | |
| if __name__ == '__main__': | |
| if torch.cuda.is_available(): | |
| device = torch.device('cuda') | |
| print('The code uses a GPU!') | |
| else: | |
| device = torch.device('cpu') | |
| print('The code uses a CPU...') | |
| print('-'*100) | |
| print('Preprocessing the', dataset, 'dataset.') | |
| print('Just a moment......') | |
| (dataset_train, dataset_dev, dataset_test, | |
| N_fingerprints) = pp.create_datasets(task, dataset, radius, device) | |
| print('-'*100) | |
| print('The preprocess has finished!') | |
| print('# of training data samples:', len(dataset_train)) | |
| print('# of development data samples:', len(dataset_dev)) | |
| print('# of test data samples:', len(dataset_test)) | |
| print('# of unique fingerprints:', N_fingerprints) | |
| print('-'*100) | |
| print('Creating a model.') | |
| torch.manual_seed(1234) | |
| model = MolecularGraphNeuralNetwork( | |
| N_fingerprints, dim, layer_hidden, layer_output).to(device) | |
| trainer = Trainer(model) | |
| tester = Tester(model) | |
| print('# of model parameters:', | |
| sum([np.prod(p.size()) for p in model.parameters()])) | |
| print('-'*100) | |
| file_result = '../output/result' + '.txt' | |
| if task == 'classification': | |
| result = 'Epoch\tTime(sec)\tLoss_train\tAUC_dev\tAUC_test' | |
| if task == 'regression': | |
| result = 'Epoch\tTime\tTime(sec)\tLoss_train\tMAE_dev\tMAE_test' | |
| with open(file_result, 'w') as f: | |
| f.write(result + '\n') | |
| print('Start training.') | |
| print('The result is saved in the output directory every epoch!') | |
| np.random.seed(1234) | |
| start = timeit.default_timer() | |
| for epoch in range(iteration): | |
| epoch += 1 | |
| if epoch % decay_interval == 0: | |
| trainer.optimizer.param_groups[0]['lr'] *= lr_decay | |
| loss_train = trainer.train(dataset_train) | |
| if task == 'classification': | |
| prediction_dev = tester.test_classifier(dataset_dev) | |
| prediction_test = tester.test_classifier(dataset_test) | |
| if task == 'regression': | |
| prediction_dev = tester.test_regressor(dataset_dev) | |
| prediction_test = tester.test_regressor(dataset_test) | |
| time = timeit.default_timer() - start | |
| now = datetime.now() | |
| formatted_time = now.strftime("%H:%M:%S") | |
| if epoch == 1: | |
| minutes = time * iteration / 60 | |
| hours = int(minutes / 60) | |
| minutes = int(minutes - 60 * hours) | |
| print('The training will finish in about', | |
| hours, 'hours', minutes, 'minutes.') | |
| print('-'*100) | |
| print(result) | |
| result = '\t'.join(map(str, [epoch, formatted_time, time, loss_train, | |
| prediction_dev, prediction_test])) | |
| tester.save_result(result, file_result) | |
| if epoch % 2 == 0: | |
| model_path = f'./NIPS_GNN/model/{dataset.lower()}_model.pt' | |
| torch.save(model.state_dict(), model_path) | |
| print('The trained model state_dict is saved at:', model_path) | |
| dict_path = f'./NIPS_GNN/model/{dataset.lower()}_dictionaries.pkl' | |
| smiles_list = ["*c1cccc(OCCCCCCCCOc2cccc(N3C(=O)c4ccc(-c5cccc6c5C(=O)N(*)C6=O)cc4C3=O)c2)c1", "*Oc1ccc(C=NN=Cc2ccc(Oc3ccc(C(c4ccc(*)cc4)(C(F)(F)F)C(F)(F)F)cc3)cc2)cc1",] | |
| predictions = predict.predict(smiles_list, model_path, dict_path) | |
| for i, pred in enumerate(predictions): | |
| if pred is not None: | |
| print(f"{smiles_list[i]}: {pred[0][0][0]}") | |
| print('-'*100) | |
| # Save the trained model's state_dict | |
| model_path = f'./NIPS_GNN/model/{dataset.lower()}_model.pt' | |
| torch.save(model.state_dict(), model_path) | |
| print('The trained model state_dict is saved at:', model_path) | |