Spaces:
Sleeping
Sleeping
| import importlib | |
| from typing import Optional, Tuple, Union, List | |
| import numpy as np | |
| import pandas as pd | |
| from tqdm import tqdm | |
| from multiprocessing import Pool | |
| from pathlib import Path | |
| from rdkit import Chem | |
| from FPSim2 import FPSim2Engine | |
| import rdkit | |
| from rdkit import Chem, RDLogger | |
| from rdkit.Chem import DataStructs, Descriptors | |
| from rdkit.DataStructs import BulkTanimotoSimilarity | |
| from sklearn.cluster import DBSCAN | |
| import scipy | |
| RDLogger.DisableLog('rdApp.*') | |
| class BoostWrapper(object): | |
| """ Help joblib to deal with boost functions """ | |
| def __init__(self, method_name, module_name): | |
| self.method_name = method_name | |
| self.module = importlib.import_module(module_name) | |
| def method(self): | |
| return getattr(self.module, self.method_name) | |
| def __call__(self, *args, **kwargs): | |
| return self.method(*args, **kwargs) | |
| def cluster_fpsim2(distance_path, smiles_h5_path=None, dist_eps=0.15): | |
| """ Cluster precomputed FPSim2 distance matrix using DBSCAN algorithm """ | |
| if isinstance(distance_path, str): | |
| distance_path = Path(distance_path, smiles_h5_path=None) | |
| if smiles_h5_path is None: | |
| smiles_h5_path = distance_path.parent / 'all_smiles.h5' | |
| precomputed_indices = FPSim2Engine(smiles_h5_path).fps[:, 0] | |
| map_precomputed = np.argsort(precomputed_indices) # maps original smiles order to FPSim2 order | |
| precomputed_distance = scipy.sparse.load_npz(distance_path) | |
| db = DBSCAN(eps=dist_eps, min_samples=1, metric='precomputed', n_jobs=-1) | |
| labels = db.fit_predict(precomputed_distance) | |
| # df_ = pd.DataFrame(data=smiles.keys(), index=list(smiles.values()), columns=['SMILES']) | |
| # df_ = df_.sort_index() | |
| # df_['cluster'] = labels[map_precomputed] | |
| return labels[map_precomputed] | |
| def tanimoto_smiles(mol1, mol2, fp='rdkit', bits=2048, radius=2): | |
| if isinstance(mol1, str): | |
| mol1 = Chem.MolFromSmiles(mol1) | |
| if isinstance(mol2, str): | |
| mol2 = Chem.MolFromSmiles(mol2) | |
| _supported_fps = { | |
| 'rdkit': Chem.RDKFingerprint, | |
| 'morgan': Chem.rdMolDescriptors.GetMorganFingerprintAsBitVect, | |
| 'maccs': Chem.rdMolDescriptors.GetMACCSKeysFingerprint, | |
| } | |
| if fp not in _supported_fps: | |
| raise ValueError(f"Fingerprint {fp} is not supported, available fps {_supported_fps.keys()}") | |
| ffp = None | |
| if fp == 'rdkit': | |
| ffp = lambda x: _supported_fps[fp](x, fpSize=bits) | |
| elif fp == 'morgan': | |
| ffp = lambda x: _supported_fps[fp](x, fpSize=bits, radius=radius, nBits=bits) | |
| elif fp == 'maccs': | |
| ffp = _supported_fps[fp] | |
| return rdkit.DataStructs.TanimotoSimilarity(ffp(mol1), ffp(mol2)) | |
| def validate_smile(smile): | |
| try: | |
| mol = Chem.MolFromSmiles(smile) | |
| Chem.SanitizeMol(mol) | |
| return smile | |
| except Exception: | |
| return None | |
| def calc_chem_desc(smiles): | |
| rdkit_features = {'MolWt': rdkit.Chem.Descriptors.MolWt, | |
| 'MolLogP': rdkit.Chem.Descriptors.MolLogP, | |
| 'NumRotatableBonds': rdkit.Chem.Descriptors.NumRotatableBonds, | |
| 'CalcTPSA': rdkit.Chem.rdMolDescriptors.CalcTPSA, | |
| 'RingCount': rdkit.Chem.Descriptors.RingCount, | |
| } | |
| if isinstance(smiles[0], str): | |
| mols = smiles_to_mols(smiles) | |
| elif isinstance(smiles[0], rdkit.Chem.rdchem.Mol): | |
| mols = smiles | |
| else: | |
| raise TypeError(f'smiles must be a string or a rdkit.Chem.rdchem.Mol: {type(smiles[0])}') | |
| res = {} | |
| for name, func in rdkit_features.items(): | |
| res[name] = np.asarray([func(m) if m is not None else np.nan for m in mols ]) | |
| return pd.DataFrame(res) | |
| def smiles_to_mols(smiles, n_jobs=8): | |
| if isinstance(smiles, (list, tuple, np.ndarray)): | |
| pass | |
| elif isinstance(smiles, pd.Series): | |
| smiles = smiles.tolist() | |
| else: | |
| raise TypeError(f"{type(smiles)=}") | |
| assert len(smiles) > 0 | |
| assert isinstance(smiles[0], str), f"expect smiles string, got f{smiles[0]}" | |
| mols = joblib.Parallel(n_jobs=n_jobs)( | |
| joblib.delayed(BoostWrapper('MolFromSmiles', 'rdkit.Chem.rdmolfiles', ))(smi) for smi in smiles) | |
| return mols | |
| def smiles_to_fps(smiles_or_mols, finger_type='rdkit', n_jobs=8, fp_param=None): | |
| if isinstance(smiles_or_mols, (list, tuple, np.ndarray)): | |
| pass | |
| elif isinstance(smiles_or_mols, pd.Series): | |
| smiles_or_mols = smiles_or_mols.tolist() | |
| else: | |
| raise TypeError(f"{type(smiles_or_mols)=}") | |
| assert len(smiles_or_mols) > 0 | |
| assert isinstance(smiles_or_mols[0], | |
| (str, rdkit.Chem.rdchem.Mol)), f"variable {smiles_or_mols[0]} has type {type(smiles_or_mols[0])}" | |
| if isinstance(smiles_or_mols[0], str): | |
| mols = smiles_to_mols(smiles_or_mols) | |
| else: | |
| mols = smiles_or_mols | |
| if fp_param is None: | |
| fp_param = {} | |
| fp_func, fp_func_name, fp_func_module, fp_params = _find_fingerprint_function(finger_type) | |
| fp_params.update(fp_param) | |
| if finger_type == 'morgan': | |
| fp_func = fp_func(**fp_params).GetFingerprint | |
| fp_params = {} | |
| fps = joblib.Parallel(n_jobs=n_jobs, prefer="threads")( | |
| joblib.delayed(fp_func)(mol, **fp_params) for mol in mols) | |
| return fps | |
| def _find_fingerprint_function(finger_type: str) -> Tuple[callable, str, str, dict]: | |
| kwargs = {} | |
| if finger_type == 'rdkit': | |
| fp_func_name = 'RDKFingerprint' | |
| fp_func_module = 'rdkit.Chem' | |
| elif finger_type == 'maccs': | |
| fp_func_name = 'GetMACCSKeysFingerprint' | |
| fp_func_module = 'rdkit.Chem.rdMolDescriptors' | |
| elif finger_type == 'morgan': | |
| fp_func_name = 'GetMorganGenerator' | |
| fp_func_module = 'rdkit.Chem.AllChem' | |
| kwargs = dict(atomInvariantsGenerator=rdkit.Chem.rdFingerprintGenerator.GetMorganFeatureAtomInvGen(), | |
| radius=2, fpSize=2048, countSimulation=True) | |
| else: | |
| raise NotImplementedError(f"Use `rdkit` or `maccs` or `morgan` as fps") | |
| fp_func = getattr(importlib.import_module(fp_func_module), fp_func_name) | |
| return fp_func, fp_func_name, fp_func_module, kwargs | |
| def randomize_smiles_rotated(smiles: str, with_order_reversal: bool = True) -> str: | |
| """ | |
| Randomize a SMILES string by doing a cyclic rotation of the atomic indices. | |
| Adapted from https://github.com/GLambard/SMILES-X/blob/758478663030580a363a9ee61c11f6d6448e18a1/SMILESX/augm.py#L19. | |
| The outputs of this function can be reproduced by setting the seed with random.seed(). | |
| Raises: | |
| InvalidSmiles: for invalid molecules. | |
| Args: | |
| smiles: SMILES string to randomize. | |
| with_order_reversal: whether to reverse the atom order with 50% chance. | |
| Returns: | |
| Randomized SMILES string. | |
| """ | |
| mol = Chem.MolFromSmiles(smiles, sanitize=False) | |
| n_atoms = mol.GetNumAtoms() | |
| # Generate random values | |
| rotation_index = np.random.randint(0, n_atoms - 1) | |
| reverse_order = with_order_reversal and np.random.choice([True, False]) | |
| # Generate new atom indices order | |
| atoms = list(range(n_atoms)) | |
| new_atoms_order = ( | |
| atoms[rotation_index % len(atoms) :] + atoms[: rotation_index % len(atoms)] | |
| ) | |
| if reverse_order: | |
| new_atoms_order.reverse() | |
| mol = Chem.RenumberAtoms(mol, new_atoms_order) | |
| return Chem.MolToSmiles(mol, canonical=False) |