| import os |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from tqdm import tqdm |
| import pandas as pd |
| from typing import List |
|
|
| from rdkit import Chem |
| from rdkit.Chem import AllChem |
|
|
| from transformers import PretrainedConfig |
| from transformers import PreTrainedModel |
| from transformers import AutoModel |
|
|
| from torch_geometric.nn import GCNConv |
| from torch_geometric.data import Data |
| from torch_geometric.loader import DataLoader |
| from torch_scatter import scatter |
|
|
|
|
| class SmilesDataset(torch.utils.data.Dataset): |
| def __init__(self, smiles): |
| self.smiles_list = smiles |
| self.data_list = [] |
|
|
|
|
| def __len__(self): |
| return len(self.data_list) |
|
|
| def __getitem__(self, idx): |
| return self.data_list[idx] |
|
|
| def get_data(self, smiles): |
| self.smiles_list = smiles |
| |
| |
| types = {'H': 0, 'C': 1, 'N': 2, 'O': 3, 'S': 4} |
|
|
| for i in range(len(self.smiles_list)): |
| |
| |
| mol = Chem.MolFromSmiles(self.smiles_list[i]) |
| if mol is None: |
| print("无法创建Mol对象", self.smiles_list[i]) |
| else: |
|
|
| mol3d = Chem.AddHs( |
| mol) |
| if mol3d is None: |
| print("无法创建mol3d对象", self.smiles_list[i]) |
| else: |
| AllChem.EmbedMolecule(mol3d, randomSeed=1) |
|
|
| N = mol3d.GetNumAtoms() |
| |
| if mol3d.GetNumConformers() > 0: |
| conformer = mol3d.GetConformer() |
| pos = conformer.GetPositions() |
| pos = torch.tensor(pos, dtype=torch.float) |
|
|
| type_idx = [] |
| |
| |
| |
| |
| |
| for atom in mol3d.GetAtoms(): |
| type_idx.append(types[atom.GetSymbol()]) |
| |
| |
| |
| |
| |
| |
|
|
| |
|
|
| row, col, edge_type = [], [], [] |
| for bond in mol3d.GetBonds(): |
| start, end = bond.GetBeginAtomIdx(), bond.GetEndAtomIdx() |
| row += [start, end] |
| col += [end, start] |
| |
|
|
| edge_index = torch.tensor([row, col], dtype=torch.long) |
| |
| |
|
|
| perm = (edge_index[0] * N + edge_index[1]).argsort() |
| edge_index = edge_index[:, perm] |
| |
| |
| |
| |
| |
|
|
| x = torch.tensor(type_idx).to(torch.float) |
|
|
| |
|
|
| data = Data(x=x, pos=pos, edge_index=edge_index, smiles=self.smiles_list[i]) |
|
|
| self.data_list.append(data) |
| else: |
| print("无法创建comfor", self.smiles_list[i]) |
| return self.data_list |
|
|
| """ |
| MLP Layer used after graph vector representation |
| """ |
| class MLPReadout(nn.Module): |
|
|
| def __init__(self, input_dim, output_dim, L=2): |
| super().__init__() |
| list_FC_layers = [nn.Linear(input_dim // 2 ** l, input_dim // 2 ** (l + 1), bias=True) for l in range(L)] |
| list_FC_layers.append(nn.Linear(input_dim // 2 ** L, output_dim, bias=True)) |
| self.FC_layers = nn.ModuleList(list_FC_layers) |
| self.L = L |
|
|
| def forward(self, x): |
| y = x |
| for l in range(self.L): |
| y = self.FC_layers[l](y) |
| y = F.relu(y) |
| y = self.FC_layers[self.L](y) |
| return y |
|
|
| class GCNNet(torch.nn.Module): |
| def __init__(self, input_feature=64, emb_input=20, hidden_size=64, n_layers=6, num_classes=1): |
| super(GCNNet, self).__init__() |
|
|
| self.embedding = torch.nn.Embedding(emb_input, hidden_size, padding_idx=0) |
| self.input_feature = input_feature |
| self.n_layers = n_layers |
| self.num_classes = num_classes |
|
|
| self.conv1 = GCNConv(hidden_size, hidden_size) |
|
|
| self.conv2 = GCNConv(hidden_size, 32) |
| self.mlp = MLPReadout(32, num_classes) |
|
|
| def forward_features(self, data): |
| x, edge_index, batch = data.x.long(), data.edge_index, data.batch |
| x = self.embedding(x.reshape(-1)) |
|
|
| for i in range(self.n_layers): |
| x = F.relu(self.conv1(x, edge_index)) |
|
|
| x = F.relu(self.conv2(x, edge_index)) |
| x = scatter(x, batch, dim=-2, reduce='mean') |
| x = self.mlp(x) |
|
|
| return x.squeeze(-1) |
|
|
|
|
| class GCNConfig(PretrainedConfig): |
| model_type = "gcn" |
|
|
| def __init__( |
| self, |
| input_feature: int=64, |
| emb_input: int=20, |
| hidden_size: int=64, |
| n_layers: int=6, |
| num_classes: int=1, |
| |
| smiles: List[str] = None, |
| processor_class: str = "SmilesProcessor", |
| **kwargs, |
| ): |
|
|
| self.input_feature = input_feature |
| self.emb_input = emb_input |
| self.hidden_size = hidden_size |
| self.n_layers = n_layers |
| self.num_classes = num_classes |
|
|
| self.smiles = smiles |
| self.processor_class = processor_class |
|
|
| super().__init__(**kwargs) |
|
|
|
|
| class GCNModel(PreTrainedModel): |
| config_class = GCNConfig |
|
|
| def __init__(self, config): |
| super().__init__(config) |
|
|
| self.model = GCNNet( |
| input_feature=config.input_feature, |
| emb_input=config.emb_input, |
| hidden_size=config.hidden_size, |
| n_layers=config.n_layers, |
| num_classes=config.num_classes, |
| ) |
| self.process = SmilesDataset( |
| smiles=config.smiles, |
| ) |
|
|
| self.gcn_model = None |
| self.dataset = None |
| self.output = None |
| self.data_loader = None |
| self.pred_data = None |
|
|
| def forward(self, tensor): |
| return self.model.forward_features(tensor) |
|
|
| |
| |
|
|
| def predict_smiles(self, smiles, device: str='cpu', result_dir: str='./', **kwargs): |
|
|
|
|
| batch_size = kwargs.pop('batch_size', 1) |
| shuffle = kwargs.pop('shuffle', False) |
| drop_last = kwargs.pop('drop_last', False) |
| num_workers = kwargs.pop('num_workers', 0) |
|
|
| self.gcn_model = AutoModel.from_pretrained("Huhujingjing/custom-gcn", trust_remote_code=True).to(device) |
| self.gcn_model.eval() |
|
|
| self.dataset = self.process.get_data(smiles) |
| self.output = "" |
| self.output += ("predicted samples num: {}\n".format(len(self.dataset))) |
| self.output +=("predicted samples:{}\n".format(self.dataset[0])) |
| self.data_loader = DataLoader(self.dataset, |
| batch_size=batch_size, |
| shuffle=shuffle, |
| drop_last=drop_last, |
| num_workers=num_workers |
| ) |
| self.pred_data = { |
| 'smiles': [], |
| 'pred': [] |
| } |
|
|
| for batch in tqdm(self.data_loader): |
| batch = batch.to(device) |
| with torch.no_grad(): |
| self.pred_data['smiles'] += batch['smiles'] |
| self.pred_data['pred'] += self.gcn_model(batch).cpu().tolist() |
|
|
| pred = torch.tensor(self.pred_data['pred']).reshape(-1) |
| if device == 'cuda': |
| pred = pred.cpu().tolist() |
| self.pred_data['pred'] = pred |
| pred_df = pd.DataFrame(self.pred_data) |
| pred_df['pred'] = pred_df['pred'].apply(lambda x: round(x, 2)) |
| self.output +=('-' * 40 + '\n'+'predicted result: \n'+'{}\n'.format(pred_df)) |
| self.output +=('-' * 40) |
|
|
| pred_df.to_csv(os.path.join(result_dir, 'gcn.csv'), index=False) |
| self.output +=('\nsave predicted result to {}\n'.format(os.path.join(result_dir, 'gcn.csv'))) |
|
|
| return self.output |
|
|
|
|
| if __name__ == "__main__": |
| gcn_config = GCNConfig(input_feature=64, emb_input=20, hidden_size=64, n_layers=6, num_classes=1, |
| smiles=["C", "CC", "CCC"], processor_class="SmilesProcessor") |
|
|
| gcnd = GCNModel(gcn_config) |
| gcnd.model.load_state_dict(torch.load(r'G:\Trans_MXM\gcn_model\gcn.pt')) |
| gcnd.save_pretrained("custom-gcn") |
|
|
|
|