vladimir.manuylov
initial commit
bd082dc
import importlib
from typing import Optional, Tuple, Union, List
import numpy as np
import pandas as pd
from tqdm import tqdm
from multiprocessing import Pool
from pathlib import Path
from rdkit import Chem
from FPSim2 import FPSim2Engine
import rdkit
from rdkit import Chem, RDLogger
from rdkit.Chem import DataStructs, Descriptors
from rdkit.DataStructs import BulkTanimotoSimilarity
from sklearn.cluster import DBSCAN
import scipy
RDLogger.DisableLog('rdApp.*')
class BoostWrapper(object):
""" Help joblib to deal with boost functions """
def __init__(self, method_name, module_name):
self.method_name = method_name
self.module = importlib.import_module(module_name)
@property
def method(self):
return getattr(self.module, self.method_name)
def __call__(self, *args, **kwargs):
return self.method(*args, **kwargs)
def cluster_fpsim2(distance_path, smiles_h5_path=None, dist_eps=0.15):
""" Cluster precomputed FPSim2 distance matrix using DBSCAN algorithm """
if isinstance(distance_path, str):
distance_path = Path(distance_path, smiles_h5_path=None)
if smiles_h5_path is None:
smiles_h5_path = distance_path.parent / 'all_smiles.h5'
precomputed_indices = FPSim2Engine(smiles_h5_path).fps[:, 0]
map_precomputed = np.argsort(precomputed_indices) # maps original smiles order to FPSim2 order
precomputed_distance = scipy.sparse.load_npz(distance_path)
db = DBSCAN(eps=dist_eps, min_samples=1, metric='precomputed', n_jobs=-1)
labels = db.fit_predict(precomputed_distance)
# df_ = pd.DataFrame(data=smiles.keys(), index=list(smiles.values()), columns=['SMILES'])
# df_ = df_.sort_index()
# df_['cluster'] = labels[map_precomputed]
return labels[map_precomputed]
def tanimoto_smiles(mol1, mol2, fp='rdkit', bits=2048, radius=2):
if isinstance(mol1, str):
mol1 = Chem.MolFromSmiles(mol1)
if isinstance(mol2, str):
mol2 = Chem.MolFromSmiles(mol2)
_supported_fps = {
'rdkit': Chem.RDKFingerprint,
'morgan': Chem.rdMolDescriptors.GetMorganFingerprintAsBitVect,
'maccs': Chem.rdMolDescriptors.GetMACCSKeysFingerprint,
}
if fp not in _supported_fps:
raise ValueError(f"Fingerprint {fp} is not supported, available fps {_supported_fps.keys()}")
ffp = None
if fp == 'rdkit':
ffp = lambda x: _supported_fps[fp](x, fpSize=bits)
elif fp == 'morgan':
ffp = lambda x: _supported_fps[fp](x, fpSize=bits, radius=radius, nBits=bits)
elif fp == 'maccs':
ffp = _supported_fps[fp]
return rdkit.DataStructs.TanimotoSimilarity(ffp(mol1), ffp(mol2))
def validate_smile(smile):
try:
mol = Chem.MolFromSmiles(smile)
Chem.SanitizeMol(mol)
return smile
except Exception:
return None
def calc_chem_desc(smiles):
rdkit_features = {'MolWt': rdkit.Chem.Descriptors.MolWt,
'MolLogP': rdkit.Chem.Descriptors.MolLogP,
'NumRotatableBonds': rdkit.Chem.Descriptors.NumRotatableBonds,
'CalcTPSA': rdkit.Chem.rdMolDescriptors.CalcTPSA,
'RingCount': rdkit.Chem.Descriptors.RingCount,
}
if isinstance(smiles[0], str):
mols = smiles_to_mols(smiles)
elif isinstance(smiles[0], rdkit.Chem.rdchem.Mol):
mols = smiles
else:
raise TypeError(f'smiles must be a string or a rdkit.Chem.rdchem.Mol: {type(smiles[0])}')
res = {}
for name, func in rdkit_features.items():
res[name] = np.asarray([func(m) if m is not None else np.nan for m in mols ])
return pd.DataFrame(res)
def smiles_to_mols(smiles, n_jobs=8):
if isinstance(smiles, (list, tuple, np.ndarray)):
pass
elif isinstance(smiles, pd.Series):
smiles = smiles.tolist()
else:
raise TypeError(f"{type(smiles)=}")
assert len(smiles) > 0
assert isinstance(smiles[0], str), f"expect smiles string, got f{smiles[0]}"
mols = joblib.Parallel(n_jobs=n_jobs)(
joblib.delayed(BoostWrapper('MolFromSmiles', 'rdkit.Chem.rdmolfiles', ))(smi) for smi in smiles)
return mols
def smiles_to_fps(smiles_or_mols, finger_type='rdkit', n_jobs=8, fp_param=None):
if isinstance(smiles_or_mols, (list, tuple, np.ndarray)):
pass
elif isinstance(smiles_or_mols, pd.Series):
smiles_or_mols = smiles_or_mols.tolist()
else:
raise TypeError(f"{type(smiles_or_mols)=}")
assert len(smiles_or_mols) > 0
assert isinstance(smiles_or_mols[0],
(str, rdkit.Chem.rdchem.Mol)), f"variable {smiles_or_mols[0]} has type {type(smiles_or_mols[0])}"
if isinstance(smiles_or_mols[0], str):
mols = smiles_to_mols(smiles_or_mols)
else:
mols = smiles_or_mols
if fp_param is None:
fp_param = {}
fp_func, fp_func_name, fp_func_module, fp_params = _find_fingerprint_function(finger_type)
fp_params.update(fp_param)
if finger_type == 'morgan':
fp_func = fp_func(**fp_params).GetFingerprint
fp_params = {}
fps = joblib.Parallel(n_jobs=n_jobs, prefer="threads")(
joblib.delayed(fp_func)(mol, **fp_params) for mol in mols)
return fps
def _find_fingerprint_function(finger_type: str) -> Tuple[callable, str, str, dict]:
kwargs = {}
if finger_type == 'rdkit':
fp_func_name = 'RDKFingerprint'
fp_func_module = 'rdkit.Chem'
elif finger_type == 'maccs':
fp_func_name = 'GetMACCSKeysFingerprint'
fp_func_module = 'rdkit.Chem.rdMolDescriptors'
elif finger_type == 'morgan':
fp_func_name = 'GetMorganGenerator'
fp_func_module = 'rdkit.Chem.AllChem'
kwargs = dict(atomInvariantsGenerator=rdkit.Chem.rdFingerprintGenerator.GetMorganFeatureAtomInvGen(),
radius=2, fpSize=2048, countSimulation=True)
else:
raise NotImplementedError(f"Use `rdkit` or `maccs` or `morgan` as fps")
fp_func = getattr(importlib.import_module(fp_func_module), fp_func_name)
return fp_func, fp_func_name, fp_func_module, kwargs
def randomize_smiles_rotated(smiles: str, with_order_reversal: bool = True) -> str:
"""
Randomize a SMILES string by doing a cyclic rotation of the atomic indices.
Adapted from https://github.com/GLambard/SMILES-X/blob/758478663030580a363a9ee61c11f6d6448e18a1/SMILESX/augm.py#L19.
The outputs of this function can be reproduced by setting the seed with random.seed().
Raises:
InvalidSmiles: for invalid molecules.
Args:
smiles: SMILES string to randomize.
with_order_reversal: whether to reverse the atom order with 50% chance.
Returns:
Randomized SMILES string.
"""
mol = Chem.MolFromSmiles(smiles, sanitize=False)
n_atoms = mol.GetNumAtoms()
# Generate random values
rotation_index = np.random.randint(0, n_atoms - 1)
reverse_order = with_order_reversal and np.random.choice([True, False])
# Generate new atom indices order
atoms = list(range(n_atoms))
new_atoms_order = (
atoms[rotation_index % len(atoms) :] + atoms[: rotation_index % len(atoms)]
)
if reverse_order:
new_atoms_order.reverse()
mol = Chem.RenumberAtoms(mol, new_atoms_order)
return Chem.MolToSmiles(mol, canonical=False)