bobbypaton
Initial CASCADE HF Space deployment
233f6d4
import logging, sys
import numpy as np
from tqdm import tqdm
from scipy.linalg import eigh
from rdkit import Chem
from rdkit.Chem import MolFromSmiles, MolToSmiles, AddHs
from nfp.preprocessing import features
from nfp.preprocessing.features import Tokenizer
import time
class SmilesPreprocessor(object):
""" Given a list of SMILES strings, encode these molecules as atom and
connectivity feature matricies.
Example:
>>> preprocessor = SmilesPreprocessor(explicit_hs=False)
>>> inputs = preprocessor.fit(data.smiles)
"""
def __init__(self, explicit_hs=True, atom_features=None, bond_features=None):
"""
explicit_hs : bool
whether to tell RDkit to add H's to a molecule.
atom_features : function
A function applied to an rdkit.Atom that returns some
representation (i.e., string, integer) for the Tokenizer class.
bond_features : function
A function applied to an rdkit Bond to return some description.
"""
self.atom_tokenizer = Tokenizer()
self.bond_tokenizer = Tokenizer()
self.explicit_hs = explicit_hs
if atom_features is None:
atom_features = features.atom_features_v1
if bond_features is None:
bond_features = features.bond_features_v1
self.atom_features = atom_features
self.bond_features = bond_features
def fit(self, smiles_iterator):
""" Fit an iterator of SMILES strings, creating new atom and bond
tokens for unseen molecules. Returns a dictionary with 'atom' and
'connectivity' entries """
return list(self.preprocess(smiles_iterator, train=True))
def predict(self, smiles_iterator):
""" Uses previously determined atom and bond tokens to convert a SMILES
iterator into 'atom' and 'connectivity' matrices. Ensures that atom and
bond classes commute with previously determined results. """
return list(self.preprocess(smiles_iterator, train=False))
def preprocess(self, smiles_iterator, train=True):
self.atom_tokenizer.train = train
self.bond_tokenizer.train = train
for smiles in tqdm(smiles_iterator):
yield self.construct_feature_matrices(smiles)
@property
def atom_classes(self):
""" The number of atom types found (includes the 0 null-atom type) """
return self.atom_tokenizer.num_classes + 1
@property
def bond_classes(self):
""" The number of bond types found (includes the 0 null-bond type) """
return self.bond_tokenizer.num_classes + 1
def construct_feature_matrices(self, smiles):
""" construct a molecule from the given smiles string and return atom
and bond classes.
Returns
dict with entries
'n_atom' : number of atoms in the molecule
'n_bond' : number of bonds in the molecule
'atom' : (n_atom,) length list of atom classes
'bond' : (n_bond,) list of bond classes
'connectivity' : (n_bond, 2) array of source atom, target atom pairs.
"""
mol = MolFromSmiles(smiles)
if self.explicit_hs:
mol = AddHs(mol)
n_atom = len(mol.GetAtoms())
n_bond = 2 * len(mol.GetBonds())
# If its an isolated atom, add a self-link
if n_bond == 0:
n_bond = 1
atom_feature_matrix = np.zeros(n_atom, dtype='int')
bond_feature_matrix = np.zeros(n_bond, dtype='int')
connectivity = np.zeros((n_bond, 2), dtype='int')
bond_index = 0
atom_seq = mol.GetAtoms()
atoms = [atom_seq[i] for i in range(n_atom)]
for n, atom in enumerate(atoms):
# Atom Classes
atom_feature_matrix[n] = self.atom_tokenizer(
self.atom_features(atom))
start_index = atom.GetIdx()
for bond in atom.GetBonds():
# Is the bond pointing at the target atom
rev = bond.GetBeginAtomIdx() != start_index
# Bond Classes
bond_feature_matrix[n] = self.bond_tokenizer(
self.bond_features(bond, flipped=rev))
# Connectivity
if not rev: # Original direction
connectivity[bond_index, 0] = bond.GetBeginAtomIdx()
connectivity[bond_index, 1] = bond.GetEndAtomIdx()
else: # Reversed
connectivity[bond_index, 0] = bond.GetEndAtomIdx()
connectivity[bond_index, 1] = bond.GetBeginAtomIdx()
bond_index += 1
return {
'n_atom': n_atom,
'n_bond': n_bond,
'atom': atom_feature_matrix,
'bond': bond_feature_matrix,
'connectivity': connectivity,
}
class ConnectivityAPreprocessor(object):
""" Given a list of SMILES strings, encode these molecules as atom and
connectivity feature matricies.
Example:
>>> preprocessor = SmilesPreprocessor(explicit_hs=False)
>>> inputs = preprocessor.fit(data.smiles)
"""
def __init__(self, explicit_hs=True, atom_features=None, bond_features=None):
"""
explicit_hs : bool
whether to tell RDkit to add H's to a molecule.
atom_features : function
A function applied to an rdkit.Atom that returns some
representation (i.e., string, integer) for the Tokenizer class.
bond_features : function
A function applied to an rdkit Bond to return some description.
"""
self.atom_tokenizer = Tokenizer()
self.bond_tokenizer = Tokenizer()
self.explicit_hs = explicit_hs
if atom_features is None:
atom_features = features.atom_features_v1
if bond_features is None:
bond_features = features.bond_features_v1
self.atom_features = atom_features
self.bond_features = bond_features
def fit(self, smiles_iterator):
""" Fit an iterator of SMILES strings, creating new atom and bond
tokens for unseen molecules. Returns a dictionary with 'atom' and
'connectivity' entries """
return list(self.preprocess(smiles_iterator, train=True))
def predict(self, smiles_iterator):
""" Uses previously determined atom and bond tokens to convert a SMILES
iterator into 'atom' and 'connectivity' matrices. Ensures that atom and
bond classes commute with previously determined results. """
return list(self.preprocess(smiles_iterator, train=False))
def preprocess(self, smiles_iterator, train=True):
self.atom_tokenizer.train = train
self.bond_tokenizer.train = train
for smiles in tqdm(smiles_iterator):
yield self.construct_feature_matrices(smiles)
@property
def atom_classes(self):
""" The number of atom types found (includes the 0 null-atom type) """
return self.atom_tokenizer.num_classes + 1
@property
def bond_classes(self):
""" The number of bond types found (includes the 0 null-bond type) """
return self.bond_tokenizer.num_classes + 1
def construct_feature_matrices(self, smiles):
""" construct a molecule from the given smiles string and return atom
and bond classes.
Returns
dict with entries
'n_atom' : number of atoms in the molecule
'n_bond' : number of bonds in the molecule
'atom' : (n_atom,) length list of atom classes
'bond' : (n_bond,) list of bond classes
'connectivity' : (n_bond, 2) array of source atom, target atom pairs.
"""
mol = MolFromSmiles(smiles)
if self.explicit_hs:
mol = AddHs(mol)
n_atom = len(mol.GetAtoms())
n_bond = 2 * len(mol.GetBonds())
# If its an isolated atom, add a self-link
if n_bond == 0:
n_bond = 1
atom_feature_matrix = np.zeros(n_atom, dtype='int')
bond_feature_matrix = np.zeros(n_bond, dtype='int')
connectivity = np.zeros((n_bond, 2), dtype='int')
bond_index = 0
atom_seq = mol.GetAtoms()
atoms = [atom_seq[i] for i in range(n_atom)]
for n, atom in enumerate(atoms):
# Atom Classes
atom_feature_matrix[n] = self.atom_tokenizer(
self.atom_features(atom))
start_index = atom.GetIdx()
for bond in atom.GetBonds():
# Is the bond pointing at the target atom
rev = bond.GetBeginAtomIdx() != start_index
# Bond Classes
bond_feature_matrix[n] = self.bond_tokenizer(
self.bond_features(bond, flipped=rev))
# Connectivity
if not rev: # Original direction
connectivity[bond_index, 0] = bond.GetBeginAtomIdx()
connectivity[bond_index, 1] = bond.GetEndAtomIdx()
else: # Reversed
connectivity[bond_index, 0] = bond.GetEndAtomIdx()
connectivity[bond_index, 1] = bond.GetBeginAtomIdx()
bond_index += 1
return {
'n_atom': n_atom,
'n_bond': n_bond,
'atom': atom_feature_matrix,
'bond': bond_feature_matrix,
'connectivity': connectivity,
}
class MolPreprocessor(SmilesPreprocessor):
""" I should refactor this into a base class and separate
SmilesPreprocessor classes. But the idea is that we only need to redefine
the `construct_feature_matrices` method to have a working preprocessor that
handles 3D structures.
We'll pass an iterator of mol objects instead of SMILES strings this time,
though.
"""
def __init__(self, n_neighbors, cutoff, **kwargs):
""" A preprocessor class that also returns distances between
neighboring atoms. Adds edges for non-bonded atoms to include a maximum
of n_neighbors around each atom """
self.n_neighbors = n_neighbors
self.cutoff = cutoff
super(MolPreprocessor, self).__init__(**kwargs)
def construct_feature_matrices(self, mol):
""" Given an rdkit mol, return atom feature matrices, bond feature
matrices, and connectivity matrices.
Returns
dict with entries
'n_atom' : number of atoms in the molecule
'n_bond' : number of edges (likely n_atom * n_neighbors)
'atom' : (n_atom,) length list of atom classes
'bond' : (n_bond,) list of bond classes. 0 for no bond
'distance' : (n_bond,) list of bond distances
'connectivity' : (n_bond, 2) array of source atom, target atom pairs.
"""
n_atom = len(mol.GetAtoms())
# n_bond is actually the number of atom-atom pairs, so this is defined
# by the number of neighbors for each atom.
#if there is cutoff,
distance_matrix = Chem.Get3DDistanceMatrix(mol)
if self.n_neighbors <= (n_atom - 1):
n_bond = self.n_neighbors * n_atom
else:
# If there are fewer atoms than n_neighbors, all atoms will be
# connected
n_bond = distance_matrix[(distance_matrix < self.cutoff) & (distance_matrix != 0)].size
if n_bond == 0: n_bond = 1
# Initialize the matrices to be filled in during the following loop.
atom_feature_matrix = np.zeros(n_atom, dtype='int')
bond_feature_matrix = np.zeros(n_bond, dtype='int')
bond_distance_matrix = np.zeros(n_bond, dtype=np.float32)
connectivity = np.zeros((n_bond, 2), dtype='int')
# Hopefully we've filtered out all problem mols by now.
if mol is None:
raise RuntimeError("Issue in loading mol")
# Get a list of the atoms in the molecule.
atom_seq = mol.GetAtoms()
atoms = [atom_seq[i] for i in range(n_atom)]
# Here we loop over each atom, and the inner loop iterates over each
# neighbor of the current atom.
bond_index = 0 # keep track of our current bond.
for n, atom in enumerate(atoms):
# update atom feature matrix
atom_feature_matrix[n] = self.atom_tokenizer(
self.atom_features(atom))
# if n_neighbors is greater than total atoms, then each atom is a
# neighbor.
if (self.n_neighbors + 1) > len(mol.GetAtoms()):
neighbor_end_index = len(mol.GetAtoms())
else:
neighbor_end_index = (self.n_neighbors + 1)
distance_atom = distance_matrix[n, :]
cutoff_end_index = distance_atom[distance_atom < self.cutoff].size
end_index = min(neighbor_end_index, cutoff_end_index)
# Loop over each of the nearest neighbors
neighbor_inds = distance_matrix[n, :].argsort()[1:end_index]
if len(neighbor_inds)==0: neighbor_inds = [n]
for neighbor in neighbor_inds:
# update bond feature matrix
bond = mol.GetBondBetweenAtoms(n, int(neighbor))
if bond is None:
bond_feature_matrix[bond_index] = 0
else:
rev = False if bond.GetBeginAtomIdx() == n else True
bond_feature_matrix[bond_index] = self.bond_tokenizer(
self.bond_features(bond, flipped=rev))
distance = distance_matrix[n, neighbor]
bond_distance_matrix[bond_index] = distance
# update connectivity matrix
connectivity[bond_index, 0] = n
connectivity[bond_index, 1] = neighbor
bond_index += 1
return {
'n_atom': n_atom,
'n_bond': n_bond,
'atom': atom_feature_matrix,
'bond': bond_feature_matrix,
'distance': bond_distance_matrix,
'connectivity': connectivity,
}
class MolBPreprocessor(MolPreprocessor):
"""
This is a subclass of Molpreprocessor that preprocessor molecule with
bond property target
"""
def __init__(self, **kwargs):
"""
A preprocessor class that also returns bond_target_matrix, besides the bond matrix
returned by MolPreprocessor. The bond_target_matrix is then used as ref to reduce molecule
to bond property
"""
super(MolBPreprocessor, self).__init__(**kwargs)
def construct_feature_matrices(self, entry):
"""
Given an entry contining rdkit molecule, bond_index and for the target property,
return atom
feature matrices, bond feature matrices, distance matrices, connectivity matrices and bond
ref matrices.
returns
dict with entries
see MolPreproccessor
'bond_index' : ref array to the bond index
"""
mol, bond_index_array = entry
n_atom = len(mol.GetAtoms())
n_pro = len(bond_index_array)
# n_bond is actually the number of atom-atom pairs, so this is defined
# by the number of neighbors for each atom.
#if there is cutoff,
distance_matrix = Chem.Get3DDistanceMatrix(mol)
if self.n_neighbors <= (n_atom - 1):
n_bond = self.n_neighbors * n_atom
else:
# If there are fewer atoms than n_neighbors, all atoms will be
# connected
n_bond = distance_matrix[(distance_matrix < self.cutoff) & (distance_matrix != 0)].size
if n_bond == 0: n_bond = 1
# Initialize the matrices to be filled in during the following loop.
atom_feature_matrix = np.zeros(n_atom, dtype='int')
bond_feature_matrix = np.zeros(n_bond, dtype='int')
bond_distance_matrix = np.zeros(n_bond, dtype=np.float32)
bond_index_matrix = np.full(n_bond, -1, dtype='int')
connectivity = np.zeros((n_bond, 2), dtype='int')
# Hopefully we've filtered out all problem mols by now.
if mol is None:
raise RuntimeError("Issue in loading mol")
# Get a list of the atoms in the molecule.
atom_seq = mol.GetAtoms()
atoms = [atom_seq[i] for i in range(n_atom)]
# Here we loop over each atom, and the inner loop iterates over each
# neighbor of the current atom.
bond_index = 0 # keep track of our current bond.
for n, atom in enumerate(atoms):
# update atom feature matrix
atom_feature_matrix[n] = self.atom_tokenizer(
self.atom_features(atom))
# if n_neighbors is greater than total atoms, then each atom is a
# neighbor.
if (self.n_neighbors + 1) > len(mol.GetAtoms()):
neighbor_end_index = len(mol.GetAtoms())
else:
neighbor_end_index = (self.n_neighbors + 1)
distance_atom = distance_matrix[n, :]
cutoff_end_index = distance_atom[distance_atom < self.cutoff].size
end_index = min(neighbor_end_index, cutoff_end_index)
# Loop over each of the nearest neighbors
neighbor_inds = distance_matrix[n, :].argsort()[1:end_index]
if len(neighbor_inds)==0: neighbor_inds = [n]
for neighbor in neighbor_inds:
# update bond feature matrix
bond = mol.GetBondBetweenAtoms(n, int(neighbor))
if bond is None:
bond_feature_matrix[bond_index] = 0
else:
rev = False if bond.GetBeginAtomIdx() == n else True
bond_feature_matrix[bond_index] = self.bond_tokenizer(
self.bond_features(bond, flipped=rev))
try:
bond_index_matrix[bond_index] = bond_index_array.tolist().index(bond.GetIdx())
except:
pass
distance = distance_matrix[n, neighbor]
bond_distance_matrix[bond_index] = distance
# update connectivity matrix
connectivity[bond_index, 0] = n
connectivity[bond_index, 1] = neighbor
bond_index += 1
return {
'n_atom': n_atom,
'n_bond': n_bond,
'n_pro': n_pro,
'atom': atom_feature_matrix,
'bond': bond_feature_matrix,
'distance': bond_distance_matrix,
'connectivity': connectivity,
'bond_index': bond_index_matrix,
}
class MolAPreprocessor(MolPreprocessor):
"""
This is a subclass of Molpreprocessor that preprocessor molecule with
bond property target
"""
def __init__(self, **kwargs):
"""
A preprocessor class that also returns bond_target_matrix, besides the bond matrix
returned by MolPreprocessor. The bond_target_matrix is then used as ref to reduce molecule
to bond property
"""
super(MolAPreprocessor, self).__init__(**kwargs)
def construct_feature_matrices(self, entry):
"""
Given an entry contining rdkit molecule, bond_index and for the target property,
return atom
feature matrices, bond feature matrices, distance matrices, connectivity matrices and bond
ref matrices.
returns
dict with entries
see MolPreproccessor
'bond_index' : ref array to the bond index
"""
mol, atom_index_array = entry
n_atom = len(mol.GetAtoms())
n_pro = len(atom_index_array)
# n_bond is actually the number of atom-atom pairs, so this is defined
# by the number of neighbors for each atom.
#if there is cutoff,
distance_matrix = Chem.Get3DDistanceMatrix(mol)
if self.n_neighbors <= (n_atom - 1):
n_bond = self.n_neighbors * n_atom
else:
# If there are fewer atoms than n_neighbors, all atoms will be
# connected
n_bond = distance_matrix[(distance_matrix < self.cutoff) & (distance_matrix != 0)].size
if n_bond == 0: n_bond = 1
# Initialize the matrices to be filled in during the following loop.
atom_feature_matrix = np.zeros(n_atom, dtype='int')
bond_feature_matrix = np.zeros(n_bond, dtype='int')
bond_distance_matrix = np.zeros(n_bond, dtype=np.float32)
atom_index_matrix = np.full(n_atom, -1, dtype='int')
connectivity = np.zeros((n_bond, 2), dtype='int')
# Hopefully we've filtered out all problem mols by now.
if mol is None:
raise RuntimeError("Issue in loading mol")
# Get a list of the atoms in the molecule.
atom_seq = mol.GetAtoms()
atoms = [atom_seq[i] for i in range(n_atom)]
# Here we loop over each atom, and the inner loop iterates over each
# neighbor of the current atom.
bond_index = 0 # keep track of our current bond.
for n, atom in enumerate(atoms):
# update atom feature matrix
atom_feature_matrix[n] = self.atom_tokenizer(
self.atom_features(atom))
try:
atom_index_matrix[n] = atom_index_array.tolist().index(atom.GetIdx())
except:
pass
# if n_neighbors is greater than total atoms, then each atom is a
# neighbor.
if (self.n_neighbors + 1) > len(mol.GetAtoms()):
neighbor_end_index = len(mol.GetAtoms())
else:
neighbor_end_index = (self.n_neighbors + 1)
distance_atom = distance_matrix[n, :]
cutoff_end_index = distance_atom[distance_atom < self.cutoff].size
end_index = min(neighbor_end_index, cutoff_end_index)
# Loop over each of the nearest neighbors
neighbor_inds = distance_matrix[n, :].argsort()[1:end_index]
if len(neighbor_inds)==0: neighbor_inds = [n]
for neighbor in neighbor_inds:
# update bond feature matrix
bond = mol.GetBondBetweenAtoms(n, int(neighbor))
if bond is None:
bond_feature_matrix[bond_index] = 0
else:
rev = False if bond.GetBeginAtomIdx() == n else True
bond_feature_matrix[bond_index] = self.bond_tokenizer(
self.bond_features(bond, flipped=rev))
distance = distance_matrix[n, neighbor]
bond_distance_matrix[bond_index] = distance
# update connectivity matrix
connectivity[bond_index, 0] = n
connectivity[bond_index, 1] = neighbor
bond_index += 1
return {
'n_atom': n_atom,
'n_bond': n_bond,
'n_pro': n_pro,
'atom': atom_feature_matrix,
'bond': bond_feature_matrix,
'distance': bond_distance_matrix,
'connectivity': connectivity,
'atom_index': atom_index_matrix,
}
# TODO: rewrite this
# class LaplacianSmilesPreprocessor(SmilesPreprocessor):
# """ Extends the SmilesPreprocessor class to also return eigenvalues and
# eigenvectors of the graph laplacian matrix.
#
# Example:
# >>> preprocessor = SmilesPreprocessor(
# >>> max_atoms=55, max_bonds=62, max_degree=4, explicit_hs=False)
# >>> atom, connectivity, eigenvalues, eigenvectors = preprocessor.fit(
# data.smiles)
# """
#
# def preprocess(self, smiles_iterator, train=True):
#
# self.atom_tokenizer.train = train
# self.bond_tokenizer.train = train
#
# for smiles in tqdm(smiles_iterator):
# G = self._mol_to_nx(smiles)
# A = self._get_atom_feature_matrix(G)
# C = self._get_connectivity_matrix(G)
# W, V = self._get_laplacian_spectral_decomp(G)
# yield A, C, W, V
#
#
# def _get_laplacian_spectral_decomp(self, G):
# """ Return the eigenvalues and eigenvectors of the graph G, padded to
# `self.max_atoms`.
# """
#
# w0 = np.zeros((self.max_atoms, 1))
# v0 = np.zeros((self.max_atoms, self.max_atoms))
#
# w, v = eigh(nx.laplacian_matrix(G).todense())
#
# num_atoms = len(v)
#
# w0[:num_atoms, 0] = w
# v0[:num_atoms, :num_atoms] = v
#
# return w0, v0
#
#
# def fit(self, smiles_iterator):
# results = self._fit(smiles_iterator)
# return {'atom': results[0],
# 'connectivity': results[1],
# 'w': results[2],
# 'v': results[3]}
#
#
# def predict(self, smiles_iterator):
# results = self._predict(smiles_iterator)
# return {'atom': results[0],
# 'connectivity': results[1],
# 'w': results[2],
# 'v': results[3]}
def get_max_atom_bond_size(smiles_iterator, explicit_hs=True):
""" Convienence function to get max_atoms, max_bonds for a set of input
SMILES """
max_atoms = 0
max_bonds = 0
for smiles in tqdm(smiles_iterator):
mol = MolFromSmiles(smiles)
if explicit_hs:
mol = AddHs(mol)
max_atoms = max([max_atoms, len(mol.GetAtoms())])
max_bonds = max([max_bonds, len(mol.GetBonds())])
return dict(max_atoms=max_atoms, max_bonds=max_bonds*2)
def canonicalize_smiles(smiles, isomeric=True, sanitize=True):
try:
mol = MolFromSmiles(smiles, sanitize=sanitize)
return MolToSmiles(mol, isomericSmiles=isomeric)
except Exception:
pass