FridayCode's picture
Deploy polymer property prediction model with LFS
c53d10d
import timeit
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from sklearn.metrics import roc_auc_score
import NIPS_GNN.preprocess as pp
from datetime import datetime
import pickle
# import inference as predict
nips_gnn = {
'Tg' : {
'radius': 1,
'dim': 50,
'layer_hidden': 6,
'layer_output': 6,
'batch_train': 32,
'batch_test': 32,
'lr': 1e-4,
'lr_decay': 0.99,
'decay_interval': 10,
'iteration': 14,
'n_fingerprints': 651,
},
'FFV' : {
'radius': 1,
'dim': 100,
'layer_hidden': 12,
'layer_output': 12,
'batch_train': 16,
'batch_test': 16,
'lr': 1e-5,
'lr_decay': 0.99,
'decay_interval': 10,
'iteration': 100,
'n_fingerprints': 549,
},
}
task = 'regression' # 'regression'
dataset = 'FFV' # 'Tg' or 'FFV'
radius=nips_gnn[dataset]['radius']
dim=nips_gnn[dataset]['dim']
layer_hidden=nips_gnn[dataset]['layer_hidden']
layer_output=nips_gnn[dataset]['layer_output']
batch_train=nips_gnn[dataset]['batch_train']
batch_test=nips_gnn[dataset]['batch_test']
lr=nips_gnn[dataset]['lr']
lr_decay=nips_gnn[dataset]['lr_decay']
decay_interval=nips_gnn[dataset]['decay_interval']
iteration=nips_gnn[dataset]['iteration']
if torch.cuda.is_available():
device = torch.device('cuda')
print('The code uses a GPU!')
else:
device = torch.device('cpu')
print('The code uses a CPU...')
class MolecularGraphNeuralNetwork(nn.Module):
def __init__(self, N_fingerprints, dim, layer_hidden, layer_output):
super(MolecularGraphNeuralNetwork, self).__init__()
self.embed_fingerprint = nn.Embedding(N_fingerprints, dim)
self.W_fingerprint = nn.ModuleList([nn.Linear(dim, dim)
for _ in range(layer_hidden)])
self.W_output = nn.ModuleList([nn.Linear(dim, dim)
for _ in range(layer_output)])
if task == 'classification':
self.W_property = nn.Linear(dim, 2)
if task == 'regression':
self.W_property = nn.Linear(dim, 1)
def pad(self, matrices, pad_value):
"""Pad the list of matrices
with a pad_value (e.g., 0) for batch processing.
For example, given a list of matrices [A, B, C],
we obtain a new matrix [A00, 0B0, 00C],
where 0 is the zero (i.e., pad value) matrix.
"""
shapes = [m.shape for m in matrices]
M, N = sum([s[0] for s in shapes]), sum([s[1] for s in shapes])
zeros = torch.FloatTensor(np.zeros((M, N))).to(device)
pad_matrices = pad_value + zeros
i, j = 0, 0
for k, matrix in enumerate(matrices):
m, n = shapes[k]
pad_matrices[i:i+m, j:j+n] = matrix
i += m
j += n
return pad_matrices
def update(self, matrix, vectors, layer):
hidden_vectors = torch.relu(self.W_fingerprint[layer](vectors))
return hidden_vectors + torch.matmul(matrix, hidden_vectors)
def sum(self, vectors, axis):
sum_vectors = [torch.sum(v, 0) for v in torch.split(vectors, axis)]
return torch.stack(sum_vectors)
def mean(self, vectors, axis):
mean_vectors = [torch.mean(v, 0) for v in torch.split(vectors, axis)]
return torch.stack(mean_vectors)
def gnn(self, inputs):
"""Cat or pad each input data for batch processing."""
fingerprints, adjacencies, molecular_sizes = inputs
fingerprints = torch.cat(fingerprints)
adjacencies = self.pad(adjacencies, 0)
"""GNN layer (update the fingerprint vectors)."""
fingerprint_vectors = self.embed_fingerprint(fingerprints)
for l in range(layer_hidden):
hs = self.update(adjacencies, fingerprint_vectors, l)
fingerprint_vectors = F.normalize(hs, 2, 1) # normalize.
"""Molecular vector by sum or mean of the fingerprint vectors."""
molecular_vectors = self.sum(fingerprint_vectors, molecular_sizes)
# molecular_vectors = self.mean(fingerprint_vectors, molecular_sizes)
return molecular_vectors
def mlp(self, vectors):
"""Classifier or regressor based on multilayer perceptron."""
for l in range(layer_output):
vectors = torch.relu(self.W_output[l](vectors))
outputs = self.W_property(vectors)
return outputs
def forward_classifier(self, data_batch, train):
inputs = data_batch[:-1]
correct_labels = torch.cat(data_batch[-1])
if train:
molecular_vectors = self.gnn(inputs)
predicted_scores = self.mlp(molecular_vectors)
loss = F.cross_entropy(predicted_scores, correct_labels)
return loss
else:
with torch.no_grad():
molecular_vectors = self.gnn(inputs)
predicted_scores = self.mlp(molecular_vectors)
predicted_scores = predicted_scores.to('cpu').data.numpy()
predicted_scores = [s[1] for s in predicted_scores]
correct_labels = correct_labels.to('cpu').data.numpy()
return predicted_scores, correct_labels
def forward_regressor(self, data_batch, train):
inputs = data_batch[:-1]
correct_values = torch.cat(data_batch[-1])
if train:
molecular_vectors = self.gnn(inputs)
predicted_values = self.mlp(molecular_vectors)
loss = F.mse_loss(predicted_values, correct_values)
return loss
else:
with torch.no_grad():
molecular_vectors = self.gnn(inputs)
predicted_values = self.mlp(molecular_vectors)
predicted_values = predicted_values.to('cpu').data.numpy()
correct_values = correct_values.to('cpu').data.numpy()
predicted_values = np.concatenate(predicted_values)
correct_values = np.concatenate(correct_values)
return predicted_values, correct_values
class Trainer(object):
def __init__(self, model):
self.model = model
self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
def train(self, dataset):
np.random.shuffle(dataset)
N = len(dataset)
loss_total = 0
for i in range(0, N, batch_train):
data_batch = list(zip(*dataset[i:i+batch_train]))
if task == 'classification':
loss = self.model.forward_classifier(data_batch, train=True)
if task == 'regression':
loss = self.model.forward_regressor(data_batch, train=True)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
loss_total += loss.item()
return loss_total
class Tester(object):
def __init__(self, model):
self.model = model
def test_classifier(self, dataset):
N = len(dataset)
P, C = [], []
for i in range(0, N, batch_test):
data_batch = list(zip(*dataset[i:i+batch_test]))
predicted_scores, correct_labels = self.model.forward_classifier(
data_batch, train=False)
P.append(predicted_scores)
C.append(correct_labels)
AUC = roc_auc_score(np.concatenate(C), np.concatenate(P))
return AUC
def test_regressor(self, dataset):
N = len(dataset)
SAE = 0 # sum absolute error.
for i in range(0, N, batch_test):
data_batch = list(zip(*dataset[i:i+batch_test]))
predicted_values, correct_values = self.model.forward_regressor(
data_batch, train=False)
SAE += sum(np.abs(predicted_values-correct_values))
MAE = SAE / N # mean absolute error.
return MAE
def predict_regressor(self, dataset):
N = len(dataset)
predictions = []
for i in range(0, N, 1):
data_batch = list(zip(*dataset[i:i+1]))
predicted_values = self.model.forward_regressor(
data_batch, train=False)
predictions.append(predicted_values)
return predictions
def save_result(self, result, filename):
with open(filename, 'a') as f:
f.write(result + '\n')
if __name__ == '__main__':
if torch.cuda.is_available():
device = torch.device('cuda')
print('The code uses a GPU!')
else:
device = torch.device('cpu')
print('The code uses a CPU...')
print('-'*100)
print('Preprocessing the', dataset, 'dataset.')
print('Just a moment......')
(dataset_train, dataset_dev, dataset_test,
N_fingerprints) = pp.create_datasets(task, dataset, radius, device)
print('-'*100)
print('The preprocess has finished!')
print('# of training data samples:', len(dataset_train))
print('# of development data samples:', len(dataset_dev))
print('# of test data samples:', len(dataset_test))
print('# of unique fingerprints:', N_fingerprints)
print('-'*100)
print('Creating a model.')
torch.manual_seed(1234)
model = MolecularGraphNeuralNetwork(
N_fingerprints, dim, layer_hidden, layer_output).to(device)
trainer = Trainer(model)
tester = Tester(model)
print('# of model parameters:',
sum([np.prod(p.size()) for p in model.parameters()]))
print('-'*100)
file_result = '../output/result' + '.txt'
if task == 'classification':
result = 'Epoch\tTime(sec)\tLoss_train\tAUC_dev\tAUC_test'
if task == 'regression':
result = 'Epoch\tTime\tTime(sec)\tLoss_train\tMAE_dev\tMAE_test'
with open(file_result, 'w') as f:
f.write(result + '\n')
print('Start training.')
print('The result is saved in the output directory every epoch!')
np.random.seed(1234)
start = timeit.default_timer()
for epoch in range(iteration):
epoch += 1
if epoch % decay_interval == 0:
trainer.optimizer.param_groups[0]['lr'] *= lr_decay
loss_train = trainer.train(dataset_train)
if task == 'classification':
prediction_dev = tester.test_classifier(dataset_dev)
prediction_test = tester.test_classifier(dataset_test)
if task == 'regression':
prediction_dev = tester.test_regressor(dataset_dev)
prediction_test = tester.test_regressor(dataset_test)
time = timeit.default_timer() - start
now = datetime.now()
formatted_time = now.strftime("%H:%M:%S")
if epoch == 1:
minutes = time * iteration / 60
hours = int(minutes / 60)
minutes = int(minutes - 60 * hours)
print('The training will finish in about',
hours, 'hours', minutes, 'minutes.')
print('-'*100)
print(result)
result = '\t'.join(map(str, [epoch, formatted_time, time, loss_train,
prediction_dev, prediction_test]))
tester.save_result(result, file_result)
if epoch % 2 == 0:
model_path = f'./NIPS_GNN/model/{dataset.lower()}_model.pt'
torch.save(model.state_dict(), model_path)
print('The trained model state_dict is saved at:', model_path)
dict_path = f'./NIPS_GNN/model/{dataset.lower()}_dictionaries.pkl'
smiles_list = ["*c1cccc(OCCCCCCCCOc2cccc(N3C(=O)c4ccc(-c5cccc6c5C(=O)N(*)C6=O)cc4C3=O)c2)c1", "*Oc1ccc(C=NN=Cc2ccc(Oc3ccc(C(c4ccc(*)cc4)(C(F)(F)F)C(F)(F)F)cc3)cc2)cc1",]
predictions = predict.predict(smiles_list, model_path, dict_path)
for i, pred in enumerate(predictions):
if pred is not None:
print(f"{smiles_list[i]}: {pred[0][0][0]}")
print('-'*100)
# Save the trained model's state_dict
model_path = f'./NIPS_GNN/model/{dataset.lower()}_model.pt'
torch.save(model.state_dict(), model_path)
print('The trained model state_dict is saved at:', model_path)