Spaces:
Sleeping
Sleeping
File size: 7,362 Bytes
bd082dc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
import importlib
from typing import Optional, Tuple, Union, List
import numpy as np
import pandas as pd
from tqdm import tqdm
from multiprocessing import Pool
from pathlib import Path
from rdkit import Chem
from FPSim2 import FPSim2Engine
import rdkit
from rdkit import Chem, RDLogger
from rdkit.Chem import DataStructs, Descriptors
from rdkit.DataStructs import BulkTanimotoSimilarity
from sklearn.cluster import DBSCAN
import scipy
RDLogger.DisableLog('rdApp.*')
class BoostWrapper(object):
""" Help joblib to deal with boost functions """
def __init__(self, method_name, module_name):
self.method_name = method_name
self.module = importlib.import_module(module_name)
@property
def method(self):
return getattr(self.module, self.method_name)
def __call__(self, *args, **kwargs):
return self.method(*args, **kwargs)
def cluster_fpsim2(distance_path, smiles_h5_path=None, dist_eps=0.15):
""" Cluster precomputed FPSim2 distance matrix using DBSCAN algorithm """
if isinstance(distance_path, str):
distance_path = Path(distance_path, smiles_h5_path=None)
if smiles_h5_path is None:
smiles_h5_path = distance_path.parent / 'all_smiles.h5'
precomputed_indices = FPSim2Engine(smiles_h5_path).fps[:, 0]
map_precomputed = np.argsort(precomputed_indices) # maps original smiles order to FPSim2 order
precomputed_distance = scipy.sparse.load_npz(distance_path)
db = DBSCAN(eps=dist_eps, min_samples=1, metric='precomputed', n_jobs=-1)
labels = db.fit_predict(precomputed_distance)
# df_ = pd.DataFrame(data=smiles.keys(), index=list(smiles.values()), columns=['SMILES'])
# df_ = df_.sort_index()
# df_['cluster'] = labels[map_precomputed]
return labels[map_precomputed]
def tanimoto_smiles(mol1, mol2, fp='rdkit', bits=2048, radius=2):
if isinstance(mol1, str):
mol1 = Chem.MolFromSmiles(mol1)
if isinstance(mol2, str):
mol2 = Chem.MolFromSmiles(mol2)
_supported_fps = {
'rdkit': Chem.RDKFingerprint,
'morgan': Chem.rdMolDescriptors.GetMorganFingerprintAsBitVect,
'maccs': Chem.rdMolDescriptors.GetMACCSKeysFingerprint,
}
if fp not in _supported_fps:
raise ValueError(f"Fingerprint {fp} is not supported, available fps {_supported_fps.keys()}")
ffp = None
if fp == 'rdkit':
ffp = lambda x: _supported_fps[fp](x, fpSize=bits)
elif fp == 'morgan':
ffp = lambda x: _supported_fps[fp](x, fpSize=bits, radius=radius, nBits=bits)
elif fp == 'maccs':
ffp = _supported_fps[fp]
return rdkit.DataStructs.TanimotoSimilarity(ffp(mol1), ffp(mol2))
def validate_smile(smile):
try:
mol = Chem.MolFromSmiles(smile)
Chem.SanitizeMol(mol)
return smile
except Exception:
return None
def calc_chem_desc(smiles):
rdkit_features = {'MolWt': rdkit.Chem.Descriptors.MolWt,
'MolLogP': rdkit.Chem.Descriptors.MolLogP,
'NumRotatableBonds': rdkit.Chem.Descriptors.NumRotatableBonds,
'CalcTPSA': rdkit.Chem.rdMolDescriptors.CalcTPSA,
'RingCount': rdkit.Chem.Descriptors.RingCount,
}
if isinstance(smiles[0], str):
mols = smiles_to_mols(smiles)
elif isinstance(smiles[0], rdkit.Chem.rdchem.Mol):
mols = smiles
else:
raise TypeError(f'smiles must be a string or a rdkit.Chem.rdchem.Mol: {type(smiles[0])}')
res = {}
for name, func in rdkit_features.items():
res[name] = np.asarray([func(m) if m is not None else np.nan for m in mols ])
return pd.DataFrame(res)
def smiles_to_mols(smiles, n_jobs=8):
if isinstance(smiles, (list, tuple, np.ndarray)):
pass
elif isinstance(smiles, pd.Series):
smiles = smiles.tolist()
else:
raise TypeError(f"{type(smiles)=}")
assert len(smiles) > 0
assert isinstance(smiles[0], str), f"expect smiles string, got f{smiles[0]}"
mols = joblib.Parallel(n_jobs=n_jobs)(
joblib.delayed(BoostWrapper('MolFromSmiles', 'rdkit.Chem.rdmolfiles', ))(smi) for smi in smiles)
return mols
def smiles_to_fps(smiles_or_mols, finger_type='rdkit', n_jobs=8, fp_param=None):
if isinstance(smiles_or_mols, (list, tuple, np.ndarray)):
pass
elif isinstance(smiles_or_mols, pd.Series):
smiles_or_mols = smiles_or_mols.tolist()
else:
raise TypeError(f"{type(smiles_or_mols)=}")
assert len(smiles_or_mols) > 0
assert isinstance(smiles_or_mols[0],
(str, rdkit.Chem.rdchem.Mol)), f"variable {smiles_or_mols[0]} has type {type(smiles_or_mols[0])}"
if isinstance(smiles_or_mols[0], str):
mols = smiles_to_mols(smiles_or_mols)
else:
mols = smiles_or_mols
if fp_param is None:
fp_param = {}
fp_func, fp_func_name, fp_func_module, fp_params = _find_fingerprint_function(finger_type)
fp_params.update(fp_param)
if finger_type == 'morgan':
fp_func = fp_func(**fp_params).GetFingerprint
fp_params = {}
fps = joblib.Parallel(n_jobs=n_jobs, prefer="threads")(
joblib.delayed(fp_func)(mol, **fp_params) for mol in mols)
return fps
def _find_fingerprint_function(finger_type: str) -> Tuple[callable, str, str, dict]:
kwargs = {}
if finger_type == 'rdkit':
fp_func_name = 'RDKFingerprint'
fp_func_module = 'rdkit.Chem'
elif finger_type == 'maccs':
fp_func_name = 'GetMACCSKeysFingerprint'
fp_func_module = 'rdkit.Chem.rdMolDescriptors'
elif finger_type == 'morgan':
fp_func_name = 'GetMorganGenerator'
fp_func_module = 'rdkit.Chem.AllChem'
kwargs = dict(atomInvariantsGenerator=rdkit.Chem.rdFingerprintGenerator.GetMorganFeatureAtomInvGen(),
radius=2, fpSize=2048, countSimulation=True)
else:
raise NotImplementedError(f"Use `rdkit` or `maccs` or `morgan` as fps")
fp_func = getattr(importlib.import_module(fp_func_module), fp_func_name)
return fp_func, fp_func_name, fp_func_module, kwargs
def randomize_smiles_rotated(smiles: str, with_order_reversal: bool = True) -> str:
"""
Randomize a SMILES string by doing a cyclic rotation of the atomic indices.
Adapted from https://github.com/GLambard/SMILES-X/blob/758478663030580a363a9ee61c11f6d6448e18a1/SMILESX/augm.py#L19.
The outputs of this function can be reproduced by setting the seed with random.seed().
Raises:
InvalidSmiles: for invalid molecules.
Args:
smiles: SMILES string to randomize.
with_order_reversal: whether to reverse the atom order with 50% chance.
Returns:
Randomized SMILES string.
"""
mol = Chem.MolFromSmiles(smiles, sanitize=False)
n_atoms = mol.GetNumAtoms()
# Generate random values
rotation_index = np.random.randint(0, n_atoms - 1)
reverse_order = with_order_reversal and np.random.choice([True, False])
# Generate new atom indices order
atoms = list(range(n_atoms))
new_atoms_order = (
atoms[rotation_index % len(atoms) :] + atoms[: rotation_index % len(atoms)]
)
if reverse_order:
new_atoms_order.reverse()
mol = Chem.RenumberAtoms(mol, new_atoms_order)
return Chem.MolToSmiles(mol, canonical=False) |