import logging, sys import numpy as np from tqdm import tqdm from scipy.linalg import eigh from rdkit import Chem from rdkit.Chem import MolFromSmiles, MolToSmiles, AddHs from nfp.preprocessing import features from nfp.preprocessing.features import Tokenizer import time class SmilesPreprocessor(object): """ Given a list of SMILES strings, encode these molecules as atom and connectivity feature matricies. Example: >>> preprocessor = SmilesPreprocessor(explicit_hs=False) >>> inputs = preprocessor.fit(data.smiles) """ def __init__(self, explicit_hs=True, atom_features=None, bond_features=None): """ explicit_hs : bool whether to tell RDkit to add H's to a molecule. atom_features : function A function applied to an rdkit.Atom that returns some representation (i.e., string, integer) for the Tokenizer class. bond_features : function A function applied to an rdkit Bond to return some description. """ self.atom_tokenizer = Tokenizer() self.bond_tokenizer = Tokenizer() self.explicit_hs = explicit_hs if atom_features is None: atom_features = features.atom_features_v1 if bond_features is None: bond_features = features.bond_features_v1 self.atom_features = atom_features self.bond_features = bond_features def fit(self, smiles_iterator): """ Fit an iterator of SMILES strings, creating new atom and bond tokens for unseen molecules. Returns a dictionary with 'atom' and 'connectivity' entries """ return list(self.preprocess(smiles_iterator, train=True)) def predict(self, smiles_iterator): """ Uses previously determined atom and bond tokens to convert a SMILES iterator into 'atom' and 'connectivity' matrices. Ensures that atom and bond classes commute with previously determined results. """ return list(self.preprocess(smiles_iterator, train=False)) def preprocess(self, smiles_iterator, train=True): self.atom_tokenizer.train = train self.bond_tokenizer.train = train for smiles in tqdm(smiles_iterator): yield self.construct_feature_matrices(smiles) @property def atom_classes(self): """ The number of atom types found (includes the 0 null-atom type) """ return self.atom_tokenizer.num_classes + 1 @property def bond_classes(self): """ The number of bond types found (includes the 0 null-bond type) """ return self.bond_tokenizer.num_classes + 1 def construct_feature_matrices(self, smiles): """ construct a molecule from the given smiles string and return atom and bond classes. Returns dict with entries 'n_atom' : number of atoms in the molecule 'n_bond' : number of bonds in the molecule 'atom' : (n_atom,) length list of atom classes 'bond' : (n_bond,) list of bond classes 'connectivity' : (n_bond, 2) array of source atom, target atom pairs. """ mol = MolFromSmiles(smiles) if self.explicit_hs: mol = AddHs(mol) n_atom = len(mol.GetAtoms()) n_bond = 2 * len(mol.GetBonds()) # If its an isolated atom, add a self-link if n_bond == 0: n_bond = 1 atom_feature_matrix = np.zeros(n_atom, dtype='int') bond_feature_matrix = np.zeros(n_bond, dtype='int') connectivity = np.zeros((n_bond, 2), dtype='int') bond_index = 0 atom_seq = mol.GetAtoms() atoms = [atom_seq[i] for i in range(n_atom)] for n, atom in enumerate(atoms): # Atom Classes atom_feature_matrix[n] = self.atom_tokenizer( self.atom_features(atom)) start_index = atom.GetIdx() for bond in atom.GetBonds(): # Is the bond pointing at the target atom rev = bond.GetBeginAtomIdx() != start_index # Bond Classes bond_feature_matrix[n] = self.bond_tokenizer( self.bond_features(bond, flipped=rev)) # Connectivity if not rev: # Original direction connectivity[bond_index, 0] = bond.GetBeginAtomIdx() connectivity[bond_index, 1] = bond.GetEndAtomIdx() else: # Reversed connectivity[bond_index, 0] = bond.GetEndAtomIdx() connectivity[bond_index, 1] = bond.GetBeginAtomIdx() bond_index += 1 return { 'n_atom': n_atom, 'n_bond': n_bond, 'atom': atom_feature_matrix, 'bond': bond_feature_matrix, 'connectivity': connectivity, } class ConnectivityAPreprocessor(object): """ Given a list of SMILES strings, encode these molecules as atom and connectivity feature matricies. Example: >>> preprocessor = SmilesPreprocessor(explicit_hs=False) >>> inputs = preprocessor.fit(data.smiles) """ def __init__(self, explicit_hs=True, atom_features=None, bond_features=None): """ explicit_hs : bool whether to tell RDkit to add H's to a molecule. atom_features : function A function applied to an rdkit.Atom that returns some representation (i.e., string, integer) for the Tokenizer class. bond_features : function A function applied to an rdkit Bond to return some description. """ self.atom_tokenizer = Tokenizer() self.bond_tokenizer = Tokenizer() self.explicit_hs = explicit_hs if atom_features is None: atom_features = features.atom_features_v1 if bond_features is None: bond_features = features.bond_features_v1 self.atom_features = atom_features self.bond_features = bond_features def fit(self, smiles_iterator): """ Fit an iterator of SMILES strings, creating new atom and bond tokens for unseen molecules. Returns a dictionary with 'atom' and 'connectivity' entries """ return list(self.preprocess(smiles_iterator, train=True)) def predict(self, smiles_iterator): """ Uses previously determined atom and bond tokens to convert a SMILES iterator into 'atom' and 'connectivity' matrices. Ensures that atom and bond classes commute with previously determined results. """ return list(self.preprocess(smiles_iterator, train=False)) def preprocess(self, smiles_iterator, train=True): self.atom_tokenizer.train = train self.bond_tokenizer.train = train for smiles in tqdm(smiles_iterator): yield self.construct_feature_matrices(smiles) @property def atom_classes(self): """ The number of atom types found (includes the 0 null-atom type) """ return self.atom_tokenizer.num_classes + 1 @property def bond_classes(self): """ The number of bond types found (includes the 0 null-bond type) """ return self.bond_tokenizer.num_classes + 1 def construct_feature_matrices(self, smiles): """ construct a molecule from the given smiles string and return atom and bond classes. Returns dict with entries 'n_atom' : number of atoms in the molecule 'n_bond' : number of bonds in the molecule 'atom' : (n_atom,) length list of atom classes 'bond' : (n_bond,) list of bond classes 'connectivity' : (n_bond, 2) array of source atom, target atom pairs. """ mol = MolFromSmiles(smiles) if self.explicit_hs: mol = AddHs(mol) n_atom = len(mol.GetAtoms()) n_bond = 2 * len(mol.GetBonds()) # If its an isolated atom, add a self-link if n_bond == 0: n_bond = 1 atom_feature_matrix = np.zeros(n_atom, dtype='int') bond_feature_matrix = np.zeros(n_bond, dtype='int') connectivity = np.zeros((n_bond, 2), dtype='int') bond_index = 0 atom_seq = mol.GetAtoms() atoms = [atom_seq[i] for i in range(n_atom)] for n, atom in enumerate(atoms): # Atom Classes atom_feature_matrix[n] = self.atom_tokenizer( self.atom_features(atom)) start_index = atom.GetIdx() for bond in atom.GetBonds(): # Is the bond pointing at the target atom rev = bond.GetBeginAtomIdx() != start_index # Bond Classes bond_feature_matrix[n] = self.bond_tokenizer( self.bond_features(bond, flipped=rev)) # Connectivity if not rev: # Original direction connectivity[bond_index, 0] = bond.GetBeginAtomIdx() connectivity[bond_index, 1] = bond.GetEndAtomIdx() else: # Reversed connectivity[bond_index, 0] = bond.GetEndAtomIdx() connectivity[bond_index, 1] = bond.GetBeginAtomIdx() bond_index += 1 return { 'n_atom': n_atom, 'n_bond': n_bond, 'atom': atom_feature_matrix, 'bond': bond_feature_matrix, 'connectivity': connectivity, } class MolPreprocessor(SmilesPreprocessor): """ I should refactor this into a base class and separate SmilesPreprocessor classes. But the idea is that we only need to redefine the `construct_feature_matrices` method to have a working preprocessor that handles 3D structures. We'll pass an iterator of mol objects instead of SMILES strings this time, though. """ def __init__(self, n_neighbors, cutoff, **kwargs): """ A preprocessor class that also returns distances between neighboring atoms. Adds edges for non-bonded atoms to include a maximum of n_neighbors around each atom """ self.n_neighbors = n_neighbors self.cutoff = cutoff super(MolPreprocessor, self).__init__(**kwargs) def construct_feature_matrices(self, mol): """ Given an rdkit mol, return atom feature matrices, bond feature matrices, and connectivity matrices. Returns dict with entries 'n_atom' : number of atoms in the molecule 'n_bond' : number of edges (likely n_atom * n_neighbors) 'atom' : (n_atom,) length list of atom classes 'bond' : (n_bond,) list of bond classes. 0 for no bond 'distance' : (n_bond,) list of bond distances 'connectivity' : (n_bond, 2) array of source atom, target atom pairs. """ n_atom = len(mol.GetAtoms()) # n_bond is actually the number of atom-atom pairs, so this is defined # by the number of neighbors for each atom. #if there is cutoff, distance_matrix = Chem.Get3DDistanceMatrix(mol) if self.n_neighbors <= (n_atom - 1): n_bond = self.n_neighbors * n_atom else: # If there are fewer atoms than n_neighbors, all atoms will be # connected n_bond = distance_matrix[(distance_matrix < self.cutoff) & (distance_matrix != 0)].size if n_bond == 0: n_bond = 1 # Initialize the matrices to be filled in during the following loop. atom_feature_matrix = np.zeros(n_atom, dtype='int') bond_feature_matrix = np.zeros(n_bond, dtype='int') bond_distance_matrix = np.zeros(n_bond, dtype=np.float32) connectivity = np.zeros((n_bond, 2), dtype='int') # Hopefully we've filtered out all problem mols by now. if mol is None: raise RuntimeError("Issue in loading mol") # Get a list of the atoms in the molecule. atom_seq = mol.GetAtoms() atoms = [atom_seq[i] for i in range(n_atom)] # Here we loop over each atom, and the inner loop iterates over each # neighbor of the current atom. bond_index = 0 # keep track of our current bond. for n, atom in enumerate(atoms): # update atom feature matrix atom_feature_matrix[n] = self.atom_tokenizer( self.atom_features(atom)) # if n_neighbors is greater than total atoms, then each atom is a # neighbor. if (self.n_neighbors + 1) > len(mol.GetAtoms()): neighbor_end_index = len(mol.GetAtoms()) else: neighbor_end_index = (self.n_neighbors + 1) distance_atom = distance_matrix[n, :] cutoff_end_index = distance_atom[distance_atom < self.cutoff].size end_index = min(neighbor_end_index, cutoff_end_index) # Loop over each of the nearest neighbors neighbor_inds = distance_matrix[n, :].argsort()[1:end_index] if len(neighbor_inds)==0: neighbor_inds = [n] for neighbor in neighbor_inds: # update bond feature matrix bond = mol.GetBondBetweenAtoms(n, int(neighbor)) if bond is None: bond_feature_matrix[bond_index] = 0 else: rev = False if bond.GetBeginAtomIdx() == n else True bond_feature_matrix[bond_index] = self.bond_tokenizer( self.bond_features(bond, flipped=rev)) distance = distance_matrix[n, neighbor] bond_distance_matrix[bond_index] = distance # update connectivity matrix connectivity[bond_index, 0] = n connectivity[bond_index, 1] = neighbor bond_index += 1 return { 'n_atom': n_atom, 'n_bond': n_bond, 'atom': atom_feature_matrix, 'bond': bond_feature_matrix, 'distance': bond_distance_matrix, 'connectivity': connectivity, } class MolBPreprocessor(MolPreprocessor): """ This is a subclass of Molpreprocessor that preprocessor molecule with bond property target """ def __init__(self, **kwargs): """ A preprocessor class that also returns bond_target_matrix, besides the bond matrix returned by MolPreprocessor. The bond_target_matrix is then used as ref to reduce molecule to bond property """ super(MolBPreprocessor, self).__init__(**kwargs) def construct_feature_matrices(self, entry): """ Given an entry contining rdkit molecule, bond_index and for the target property, return atom feature matrices, bond feature matrices, distance matrices, connectivity matrices and bond ref matrices. returns dict with entries see MolPreproccessor 'bond_index' : ref array to the bond index """ mol, bond_index_array = entry n_atom = len(mol.GetAtoms()) n_pro = len(bond_index_array) # n_bond is actually the number of atom-atom pairs, so this is defined # by the number of neighbors for each atom. #if there is cutoff, distance_matrix = Chem.Get3DDistanceMatrix(mol) if self.n_neighbors <= (n_atom - 1): n_bond = self.n_neighbors * n_atom else: # If there are fewer atoms than n_neighbors, all atoms will be # connected n_bond = distance_matrix[(distance_matrix < self.cutoff) & (distance_matrix != 0)].size if n_bond == 0: n_bond = 1 # Initialize the matrices to be filled in during the following loop. atom_feature_matrix = np.zeros(n_atom, dtype='int') bond_feature_matrix = np.zeros(n_bond, dtype='int') bond_distance_matrix = np.zeros(n_bond, dtype=np.float32) bond_index_matrix = np.full(n_bond, -1, dtype='int') connectivity = np.zeros((n_bond, 2), dtype='int') # Hopefully we've filtered out all problem mols by now. if mol is None: raise RuntimeError("Issue in loading mol") # Get a list of the atoms in the molecule. atom_seq = mol.GetAtoms() atoms = [atom_seq[i] for i in range(n_atom)] # Here we loop over each atom, and the inner loop iterates over each # neighbor of the current atom. bond_index = 0 # keep track of our current bond. for n, atom in enumerate(atoms): # update atom feature matrix atom_feature_matrix[n] = self.atom_tokenizer( self.atom_features(atom)) # if n_neighbors is greater than total atoms, then each atom is a # neighbor. if (self.n_neighbors + 1) > len(mol.GetAtoms()): neighbor_end_index = len(mol.GetAtoms()) else: neighbor_end_index = (self.n_neighbors + 1) distance_atom = distance_matrix[n, :] cutoff_end_index = distance_atom[distance_atom < self.cutoff].size end_index = min(neighbor_end_index, cutoff_end_index) # Loop over each of the nearest neighbors neighbor_inds = distance_matrix[n, :].argsort()[1:end_index] if len(neighbor_inds)==0: neighbor_inds = [n] for neighbor in neighbor_inds: # update bond feature matrix bond = mol.GetBondBetweenAtoms(n, int(neighbor)) if bond is None: bond_feature_matrix[bond_index] = 0 else: rev = False if bond.GetBeginAtomIdx() == n else True bond_feature_matrix[bond_index] = self.bond_tokenizer( self.bond_features(bond, flipped=rev)) try: bond_index_matrix[bond_index] = bond_index_array.tolist().index(bond.GetIdx()) except: pass distance = distance_matrix[n, neighbor] bond_distance_matrix[bond_index] = distance # update connectivity matrix connectivity[bond_index, 0] = n connectivity[bond_index, 1] = neighbor bond_index += 1 return { 'n_atom': n_atom, 'n_bond': n_bond, 'n_pro': n_pro, 'atom': atom_feature_matrix, 'bond': bond_feature_matrix, 'distance': bond_distance_matrix, 'connectivity': connectivity, 'bond_index': bond_index_matrix, } class MolAPreprocessor(MolPreprocessor): """ This is a subclass of Molpreprocessor that preprocessor molecule with bond property target """ def __init__(self, **kwargs): """ A preprocessor class that also returns bond_target_matrix, besides the bond matrix returned by MolPreprocessor. The bond_target_matrix is then used as ref to reduce molecule to bond property """ super(MolAPreprocessor, self).__init__(**kwargs) def construct_feature_matrices(self, entry): """ Given an entry contining rdkit molecule, bond_index and for the target property, return atom feature matrices, bond feature matrices, distance matrices, connectivity matrices and bond ref matrices. returns dict with entries see MolPreproccessor 'bond_index' : ref array to the bond index """ mol, atom_index_array = entry n_atom = len(mol.GetAtoms()) n_pro = len(atom_index_array) # n_bond is actually the number of atom-atom pairs, so this is defined # by the number of neighbors for each atom. #if there is cutoff, distance_matrix = Chem.Get3DDistanceMatrix(mol) if self.n_neighbors <= (n_atom - 1): n_bond = self.n_neighbors * n_atom else: # If there are fewer atoms than n_neighbors, all atoms will be # connected n_bond = distance_matrix[(distance_matrix < self.cutoff) & (distance_matrix != 0)].size if n_bond == 0: n_bond = 1 # Initialize the matrices to be filled in during the following loop. atom_feature_matrix = np.zeros(n_atom, dtype='int') bond_feature_matrix = np.zeros(n_bond, dtype='int') bond_distance_matrix = np.zeros(n_bond, dtype=np.float32) atom_index_matrix = np.full(n_atom, -1, dtype='int') connectivity = np.zeros((n_bond, 2), dtype='int') # Hopefully we've filtered out all problem mols by now. if mol is None: raise RuntimeError("Issue in loading mol") # Get a list of the atoms in the molecule. atom_seq = mol.GetAtoms() atoms = [atom_seq[i] for i in range(n_atom)] # Here we loop over each atom, and the inner loop iterates over each # neighbor of the current atom. bond_index = 0 # keep track of our current bond. for n, atom in enumerate(atoms): # update atom feature matrix atom_feature_matrix[n] = self.atom_tokenizer( self.atom_features(atom)) try: atom_index_matrix[n] = atom_index_array.tolist().index(atom.GetIdx()) except: pass # if n_neighbors is greater than total atoms, then each atom is a # neighbor. if (self.n_neighbors + 1) > len(mol.GetAtoms()): neighbor_end_index = len(mol.GetAtoms()) else: neighbor_end_index = (self.n_neighbors + 1) distance_atom = distance_matrix[n, :] cutoff_end_index = distance_atom[distance_atom < self.cutoff].size end_index = min(neighbor_end_index, cutoff_end_index) # Loop over each of the nearest neighbors neighbor_inds = distance_matrix[n, :].argsort()[1:end_index] if len(neighbor_inds)==0: neighbor_inds = [n] for neighbor in neighbor_inds: # update bond feature matrix bond = mol.GetBondBetweenAtoms(n, int(neighbor)) if bond is None: bond_feature_matrix[bond_index] = 0 else: rev = False if bond.GetBeginAtomIdx() == n else True bond_feature_matrix[bond_index] = self.bond_tokenizer( self.bond_features(bond, flipped=rev)) distance = distance_matrix[n, neighbor] bond_distance_matrix[bond_index] = distance # update connectivity matrix connectivity[bond_index, 0] = n connectivity[bond_index, 1] = neighbor bond_index += 1 return { 'n_atom': n_atom, 'n_bond': n_bond, 'n_pro': n_pro, 'atom': atom_feature_matrix, 'bond': bond_feature_matrix, 'distance': bond_distance_matrix, 'connectivity': connectivity, 'atom_index': atom_index_matrix, } # TODO: rewrite this # class LaplacianSmilesPreprocessor(SmilesPreprocessor): # """ Extends the SmilesPreprocessor class to also return eigenvalues and # eigenvectors of the graph laplacian matrix. # # Example: # >>> preprocessor = SmilesPreprocessor( # >>> max_atoms=55, max_bonds=62, max_degree=4, explicit_hs=False) # >>> atom, connectivity, eigenvalues, eigenvectors = preprocessor.fit( # data.smiles) # """ # # def preprocess(self, smiles_iterator, train=True): # # self.atom_tokenizer.train = train # self.bond_tokenizer.train = train # # for smiles in tqdm(smiles_iterator): # G = self._mol_to_nx(smiles) # A = self._get_atom_feature_matrix(G) # C = self._get_connectivity_matrix(G) # W, V = self._get_laplacian_spectral_decomp(G) # yield A, C, W, V # # # def _get_laplacian_spectral_decomp(self, G): # """ Return the eigenvalues and eigenvectors of the graph G, padded to # `self.max_atoms`. # """ # # w0 = np.zeros((self.max_atoms, 1)) # v0 = np.zeros((self.max_atoms, self.max_atoms)) # # w, v = eigh(nx.laplacian_matrix(G).todense()) # # num_atoms = len(v) # # w0[:num_atoms, 0] = w # v0[:num_atoms, :num_atoms] = v # # return w0, v0 # # # def fit(self, smiles_iterator): # results = self._fit(smiles_iterator) # return {'atom': results[0], # 'connectivity': results[1], # 'w': results[2], # 'v': results[3]} # # # def predict(self, smiles_iterator): # results = self._predict(smiles_iterator) # return {'atom': results[0], # 'connectivity': results[1], # 'w': results[2], # 'v': results[3]} def get_max_atom_bond_size(smiles_iterator, explicit_hs=True): """ Convienence function to get max_atoms, max_bonds for a set of input SMILES """ max_atoms = 0 max_bonds = 0 for smiles in tqdm(smiles_iterator): mol = MolFromSmiles(smiles) if explicit_hs: mol = AddHs(mol) max_atoms = max([max_atoms, len(mol.GetAtoms())]) max_bonds = max([max_bonds, len(mol.GetBonds())]) return dict(max_atoms=max_atoms, max_bonds=max_bonds*2) def canonicalize_smiles(smiles, isomeric=True, sanitize=True): try: mol = MolFromSmiles(smiles, sanitize=sanitize) return MolToSmiles(mol, isomericSmiles=isomeric) except Exception: pass