Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| """ | |
| Author: Philipp Seidl, Philipp Renz | |
| ELLIS Unit Linz, LIT AI Lab, Institute for Machine Learning | |
| Johannes Kepler University Linz | |
| Contact: seidl@ml.jku.at | |
| Molutils contains functions that aid in handling molecules or templates | |
| """ | |
| import logging | |
| import re | |
| import warnings | |
| from itertools import product, permutations | |
| from multiprocessing import Pool | |
| from tqdm.contrib.concurrent import process_map | |
| from tqdm.notebook import tqdm | |
| import swifter | |
| import rdkit.RDLogger as rkl | |
| from rdkit import Chem | |
| from rdkit.Chem import AllChem | |
| from rdkit.Chem.rdMolDescriptors import GetMorganFingerprint | |
| from rdkit.Chem.rdmolops import FastFindRings | |
| from rdkit.Chem.rdMHFPFingerprint import MHFPEncoder | |
| from scipy import sparse | |
| from sklearn.feature_extraction import DictVectorizer | |
| import warnings | |
| import rdkit.RDLogger as rkl | |
| import numpy as np | |
| log = logging.getLogger(__name__) | |
| logger = rkl.logger() | |
| def remove_attom_mapping(smiles): | |
| """ removes a number after a ':' """ | |
| return re.sub(r':\d+', '', str(smiles)) | |
| def canonicalize_smi(smi, is_smarts=False, remove_atom_mapping=True): | |
| r""" | |
| Canonicalize SMARTS from https://github.com/rxn4chemistry/rxnfp/blob/master/rxnfp/tokenization.py#L249 | |
| """ | |
| mol = Chem.MolFromSmarts(smi) | |
| if not mol: | |
| raise ValueError("Molecule not canonicalizable") | |
| if remove_atom_mapping: | |
| for atom in mol.GetAtoms(): | |
| if atom.HasProp("molAtomMapNumber"): | |
| atom.ClearProp("molAtomMapNumber") | |
| return Chem.MolToSmiles(mol) | |
| def canonicalize_template(smarts): | |
| smarts = str(smarts) | |
| # remove attom-mapping | |
| #smarts = remove_attom_mapping(smarts) | |
| # order the list of smiles + canonicalize it | |
| results = [] | |
| for part in smarts.split('>>'): | |
| a = part.split('.') | |
| a = [canonicalize_smi(x, is_smarts=True, remove_atom_mapping=True) for x in a] | |
| #a = [remove_attom_mapping(x) for x in a] | |
| a.sort() | |
| results.append( '.'.join(a) ) | |
| return '>>'.join(results) | |
| def ebv2np(ebv): | |
| """Explicit bit vector returned by rdkit to numpy array. """ | |
| return np.frombuffer(bytes(ebv.ToBitString(), 'utf-8'), 'u1') - ord('0') | |
| def smiles2morgan(smiles, radius=2): | |
| """ computes ecfp from smiles """ | |
| return GetMorganFingerprint(smiles, radius) | |
| def getFingerprint(smiles, fp_size=4096, radius=2, is_smarts=False, which='morgan', sanitize=True): | |
| """maccs+morganc+topologicaltorsion+erg+atompair+pattern+rdkc""" | |
| if isinstance(smiles, list): | |
| return np.array([getFingerprint(smi, fp_size, radius, is_smarts, which) for smi in smiles]).max(0) # max pooling if it's list of lists | |
| if is_smarts: | |
| mol = Chem.MolFromSmarts(str(smiles), mergeHs=False) | |
| #mol.UpdatePropertyCache() #Correcting valence info | |
| #FastFindRings(mol) #Providing ring info | |
| else: | |
| mol = Chem.MolFromSmiles(str(smiles), sanitize=False) | |
| if mol is None: | |
| msg = f"{smiles} couldn't be converted to a fingerprint using 0's instead" | |
| logger.warning(msg) | |
| #warnings.warn(msg) | |
| return np.zeros(fp_size).astype(np.bool) | |
| if sanitize: | |
| faild_op = Chem.SanitizeMol(mol, catchErrors=True) | |
| FastFindRings(mol) #Providing ring info | |
| mol.UpdatePropertyCache(strict=False) #Correcting valence info # important operation | |
| def mol2np(mol, which, fp_size): | |
| is_dict = False | |
| if which=='morgan': | |
| fp = AllChem.GetMorganFingerprintAsBitVect(mol, radius, nBits=fp_size, useFeatures=False, useChirality=True) | |
| elif which=='rdk': | |
| fp = Chem.RDKFingerprint(mol, fpSize=fp_size, maxPath=6) | |
| elif which=='rdkc': | |
| # https://greglandrum.github.io/rdkit-blog/similarity/reference/2021/05/26/similarity-threshold-observations1.html | |
| # -- maxPath 6 found to be better for retrieval in databases | |
| fp = AllChem.UnfoldedRDKFingerprintCountBased(mol, maxPath=6).GetNonzeroElements() | |
| is_dict = True | |
| elif which=='morganc': | |
| fp = AllChem.GetMorganFingerprint(mol, radius, useChirality=True, useBondTypes=True, useFeatures=True, useCounts=True).GetNonzeroElements() | |
| is_dict = True | |
| elif which=='topologicaltorsion': | |
| fp = AllChem.GetTopologicalTorsionFingerprint(mol).GetNonzeroElements() | |
| is_dict = True | |
| elif which=='maccs': | |
| fp = AllChem.GetMACCSKeysFingerprint(mol) | |
| elif which=='erg': | |
| v = AllChem.GetErGFingerprint(mol) | |
| fp = {idx:v[idx] for idx in np.nonzero(v)[0]} | |
| is_dict = True | |
| elif which=='atompair': | |
| fp = AllChem.GetAtomPairFingerprint(mol).GetNonzeroElements() | |
| is_dict = True | |
| elif which=='pattern': | |
| fp = Chem.PatternFingerprint(mol, fpSize=fp_size) | |
| elif which=='ecfp4': | |
| # roughly equivalent to ECFP4 | |
| fp = AllChem.GetMorganFingerprintAsBitVect(mol, 2, nBits=fp_size, useFeatures=False, useChirality=True) | |
| elif which=='layered': | |
| fp = AllChem.LayeredFingerprint(mol, fpSize=fp_size, maxPath=7) | |
| elif which=='mhfp': | |
| #TODO check if one can avoid instantiating the MHFP encoder | |
| fp = MHFPEncoder().EncodeMol(mol, radius=radius, rings=True, isomeric=False, kekulize=False, min_radius=1) | |
| fp = {f:1 for f in fp} | |
| is_dict = True | |
| elif not (type(which)==str): | |
| fp = which(mol) | |
| if is_dict: | |
| nd = np.zeros(fp_size) | |
| for k in fp: | |
| nk = k%fp_size #remainder | |
| #print(nk, k, fp_size) | |
| #3160 36322170 3730 | |
| #print(nd[nk], fp[k]) | |
| if nd[nk]!=0: | |
| #print('c',end='') | |
| nd[nk] = nd[nk]+fp[k] #pooling colisions | |
| nd[nk] = fp[k] | |
| return nd #np.log(1+nd) # discussion with segler | |
| return ebv2np(fp) | |
| """ + for folding * for concat """ | |
| cc_symb = '*' | |
| if ('+' in which) or (cc_symb in which): | |
| concat = False | |
| split_sym = '+' | |
| if cc_symb in which: | |
| concat=True | |
| split_sym = '*' | |
| np_fp = np.zeros(fp_size) | |
| remaining_fps = (which.count(split_sym)+1) | |
| fp_length_remain = fp_size | |
| for fp_type in which.split(split_sym): | |
| if concat: | |
| fpp = mol2np(mol, fp_type, fp_length_remain//remaining_fps) | |
| np_fp[(fp_size-fp_length_remain):(fp_size-fp_length_remain+len(fpp))] += fpp | |
| fp_length_remain -= len(fpp) | |
| remaining_fps -=1 | |
| else: | |
| try: | |
| fpp = mol2np(mol, fp_type, fp_size) | |
| np_fp[:len(fpp)] += fpp | |
| except: | |
| pass | |
| #print(fp_type,end='') | |
| return np.log(1 + np_fp) | |
| else: | |
| return mol2np(mol, which, fp_size) | |
| def _getFingerprint(inp): | |
| return getFingerprint(inp[0], inp[1], inp[2], inp[3], inp[4]) | |
| def disable_rdkit_logging(): | |
| """ | |
| Disables RDKit whiny logging. | |
| """ | |
| import rdkit.rdBase as rkrb | |
| import rdkit.RDLogger as rkl | |
| logger.setLevel(rkl.ERROR) | |
| rkrb.DisableLog('rdApp.error') | |
| def convert_smiles_to_fp(list_of_smiles, fp_size=2048, is_smarts=False, which='morgan', radius=2, njobs=1, verbose=False): | |
| """ | |
| list of smiles can be list of lists, than the resulting array will pe badded to the max list len | |
| which: morgan, rdk, ecfp4, or object | |
| NOTE: morgan or ecfp4 throws error for is_smarts | |
| """ | |
| inp = [(smi, fp_size, radius, is_smarts, which) for smi in list_of_smiles] | |
| #print(inp) | |
| if verbose: print(f'starting pool with {njobs} workers') | |
| if njobs>1: | |
| #with Pool(njobs) as pool: | |
| # fps = pool.map(_getFingerprint, inp) | |
| fps = process_map(_getFingerprint, inp, max_workers=njobs, chunksize=1, mininterval=0) | |
| else: | |
| fps = [getFingerprint(smi, fp_size=fp_size, radius=radius, is_smarts=is_smarts, which=which) for smi in list_of_smiles] | |
| return np.array(fps) | |
| def convert_smartes_to_fp(list_of_smarts, fp_size=2048): | |
| if isinstance(list_of_smarts, np.ndarray): | |
| list_of_smarts = list_of_smarts.tolist() | |
| if isinstance(list_of_smarts, list): | |
| if isinstance(list_of_smarts[0], list): | |
| pad = len(max(list_of_smarts, key=len)) | |
| fps = [[getTemplateFingerprint(smarts, fp_size=fp_size) for smarts in sample] | |
| + [np.zeros(fp_size, dtype=np.bool)] * (pad - len(sample)) # zero padding | |
| for sample in list_of_smarts] | |
| else: | |
| fps = [[getTemplateFingerprint(smarts, fp_size=fp_size) for smarts in list_of_smarts]] | |
| return np.asarray(fps) | |
| def get_reactants_from_smarts(smarts): | |
| """ | |
| from a (forward-)reaction given as a smart, only returns the reactants (not e.g. solvents or reagents) | |
| returns list of smiles or empty list | |
| """ | |
| from rdkit.Chem import RDConfig | |
| import sys | |
| sys.path.append(RDConfig.RDContribDir) | |
| from RxnRoleAssignment import identifyReactants | |
| try: | |
| rdk_reaction = AllChem.ReactionFromSmarts(smarts) | |
| rx_idx = identifyReactants.identifyReactants(rdk_reaction)[0][0] | |
| except ValueError: | |
| return [] | |
| # TODO what if a product is recognized as a reactanat.. is that possible?? | |
| return [Chem.MolToSmiles(rdk_reaction.GetReactants()[i]) for i in rx_idx] | |
| def smarts2rdkfp(smart, fp_size=2048): | |
| mol = Chem.MolFromSmarts(str(smart)) | |
| if mol is None: return np.zeros(fp_size).astype(np.bool) | |
| return AllChem.RDKFingerprint(mol) | |
| # fp = np.asarray(fp).astype(np.bool) # takes ages =/ | |
| def smiles2rdkfp(smiles, fp_size=2048): | |
| mol = Chem.MolFromSmiles(str(smiles)) | |
| if mol is None: return np.zeros(fp_size).astype(np.bool) | |
| return AllChem.RDKFingerprint(mol) | |
| def mol2morganfp(mol, radius=2, fp_size=2048): | |
| try: | |
| Chem.SanitizeMol(mol) # due to error --> see https://sourceforge.net/p/rdkit/mailman/message/34828604/ | |
| except: | |
| pass | |
| # print(mol) | |
| # return np.zeros(fp_size).astype(np.bool) | |
| # TODO | |
| return AllChem.GetMorganFingerprintAsBitVect(mol, radius, nBits=fp_size) | |
| def smarts2morganfp(smart, fp_size=2048, radius=2): | |
| mol = Chem.MolFromSmarts(str(smart)) | |
| if mol is None: return np.zeros(fp_size).astype(np.bool) | |
| return mol2morganfp(mol) | |
| def smiles2morganfp(smiles, fp_size=2048, radius=2): | |
| mol = Chem.MolFromSmiles(str(smiles)) | |
| if mol is None: return np.zeros(fp_size).astype(np.bool) | |
| return mol2morganfp(mol) | |
| def smarts2fp(smart, which='morgan', fp_size=2048, radius=2): | |
| if which == 'rdk': | |
| return smarts2rdkfp(smart, fp_size=fp_size) | |
| else: | |
| return smarts2morganfp(smart, fp_size=fp_size, radius=radius) | |
| def smiles2fp(smiles, which='morgan', fp_size=2048, radius=2): | |
| if which == 'rdk': | |
| return smiles2rdkfp(smiles, fp_size=fp_size) | |
| else: | |
| return smiles2morganfp(smiles, fp_size=fp_size, radius=radius) | |
| class FP_featurizer(): | |
| "FP_featurizer: Fingerprint featurizer" | |
| def __init__(self, | |
| fp_types = ['MACCS','Morgan2CBF', 'Morgan6CBF', 'ErG','AtomPair','TopologicalTorsion','RDK','ECFP6'], | |
| max_features = 4096, counts=True, log_scale=True, folding=None, collision_pooling='max'): | |
| self.v = DictVectorizer(sparse=True, dtype=np.uint16) | |
| self.max_features = max_features | |
| self.idx_col = None | |
| self.counts = counts | |
| self.fp_types = [fp_types] if isinstance(fp_types, str) else fp_types | |
| self.log_scale = log_scale # from discussion with segler | |
| self.folding = None | |
| self.colision_pooling = collision_pooling | |
| def compute_fp_list(self, smiles_list, is_smarts=False): | |
| fp_list = [] | |
| for smiles in smiles_list: | |
| try: | |
| if isinstance(smiles, list): | |
| smiles = smiles[0] | |
| if is_smarts: | |
| mol = Chem.MolFromSmarts(smiles) | |
| else: | |
| mol = Chem.MolFromSmiles(smiles) #TODO small hack only applicable here!!! | |
| fp_dict = {} | |
| for fp_type in self.fp_types: | |
| fp_dict.update( fingerprintTypes[fp_type](mol) ) #returns a dict | |
| fp_list.append(fp_dict) | |
| except: | |
| fp_list.append({}) | |
| return fp_list | |
| def fit(self, x_train, is_smarts=False): | |
| fp_list = self.compute_fp_list(x_train, is_smarts=is_smarts) | |
| Xraw = self.v.fit_transform(fp_list) | |
| # compute variance of a csr_matrix E[x**2] - E[x]**2 | |
| axis = 0 | |
| Xraw_sqrd = Xraw.copy() | |
| Xraw_sqrd.data **= 2 | |
| var_col = Xraw_sqrd.mean(axis) - np.square(Xraw.mean(axis)) | |
| #idx_col = (-np.array((Xraw>0).var(axis=0)).argpartition(self.max_features)) | |
| #idx_col = np.array((Xraw>0).sum(axis=0)>=self.min_fragm_occur).flatten() | |
| self.idx_col = (-np.array(var_col)).flatten().argpartition(min(self.max_features, Xraw.shape[1]-1))[:min(self.max_features, Xraw.shape[1])] | |
| print(f'from {var_col.shape[1]} to {len(self.idx_col)}') | |
| return self.scale(Xraw[:,self.idx_col].toarray()) | |
| def transform(self, x_test, is_smarts=False): | |
| fp_list = self.compute_fp_list(x_test, is_smarts=is_smarts) | |
| X_raw = self.v.transform(fp_list) | |
| return self.scale(X_raw[:,self.idx_col].toarray()) | |
| def scale(self, X): | |
| if self.log_scale: | |
| return np.log(1 + X) | |
| return X | |
| def save(self, path='data/fpfeat.pkl'): | |
| import pickle | |
| with open(path, 'wb') as output: | |
| pickle.dump(self, output, pickle.HIGHEST_PROTOCOL) | |
| def load(self, path='data/fpfeat.pkl'): | |
| import pickle | |
| with open(path, 'rb') as input: | |
| self = pickle.load(input) | |
| def getTemplateFingerprintOnBits(smarts, fp_size=2048): | |
| rxn = AllChem.ReactionFromSmarts(str(smarts)) | |
| #construct a structural fingerprint for a ChemicalReaction by concatenating the reactant fingerprint and the product fingerprint | |
| return (AllChem.CreateStructuralFingerprintForReaction(rxn)).GetOnBits() | |
| def calc_template_fingerprint_group_mapping(template_list, fp_size, save_path=''): | |
| """ | |
| calculate the mapping from old idx to new idx for the templates | |
| returns a set with a numpy array with the mapping and the indices to take | |
| """ | |
| templ_df = pd.DataFrame() | |
| templ_df['smarts'] = template_list | |
| templ_df['templ_emb'] = templ_df['smarts'].swifter.apply(lambda smarts: str(list(getTemplateFingerprintOnBits(smarts, fp_size)))) | |
| templ_df['idx_orig'] = [ii for ii in range(len(templ_df))] | |
| grouped_templ = templ_df.groupby('templ_emb').apply(lambda x: x.index.tolist()) | |
| grouped_templ = templ_df.groupby('templ_emb') | |
| grouped_templ = grouped_templ.min().sort_values('idx_orig') | |
| grouped_templ['new_idx'] = range(len(grouped_templ)) | |
| new_templ_df = templ_df.join(grouped_templ, on='templ_emb',how='right', lsuffix='_l', rsuffix='_r').sort_values('idx_orig_l') | |
| map_orig2new = new_templ_df['new_idx'].values | |
| take_those_indices_from_orig = grouped_templ.idx_orig.values | |
| if save_path!='': | |
| suffix_maporig2new = '_maporig2new.npy' | |
| suffix_takethose = '_tfp_take_idxs.npy' | |
| np.save(f'{save_path}{suffix_maporig2new}', map_orig2new,allow_pickle=False) | |
| np.save(f'{save_path}{suffix_takethose}', take_those_indices_from_orig,allow_pickle=False) | |
| return (map_orig2new, take_those_indices_from_orig) | |
| class ECFC_featurizer(): | |
| def __init__(self, radius=6, min_fragm_occur=50, useChirality=True, useFeatures=False): | |
| self.v = DictVectorizer(sparse=True, dtype=np.uint16) | |
| self.min_fragm_occur=min_fragm_occur | |
| self.idx_col = None | |
| self.radius=radius | |
| self.useChirality = useChirality | |
| self.useFeatures = useFeatures | |
| def compute_fp_list(self, smiles_list): | |
| fp_list = [] | |
| for smiles in smiles_list: | |
| try: | |
| if isinstance(smiles, list): | |
| smiles = smiles[0] | |
| mol = Chem.MolFromSmiles(smiles) #TODO small hack only applicable here!!! | |
| fp_list.append( AllChem.GetMorganFingerprint(mol, self.radius, useChirality=self.useChirality, | |
| useFeatures=self.useFeatures).GetNonzeroElements() ) #returns a dict | |
| except: | |
| fp_list.append({}) | |
| return fp_list | |
| def fit(self, x_train): | |
| fp_list = self.compute_fp_list(x_train) | |
| Xraw = self.v.fit_transform(fp_list) | |
| idx_col = np.array((Xraw>0).sum(axis=0)>=self.min_fragm_occur).flatten() | |
| self.idx_col = idx_col | |
| return Xraw[:,self.idx_col].toarray() | |
| def transform(self, x_test): | |
| fp_list = self.compute_fp_list(x_test) | |
| X_raw = self.v.transform(fp_list) | |
| return X_raw[:,self.idx_col].toarray() | |
| def ecfp2dict(mol, radius=3): | |
| #SECFP (SMILES Extended Connectifity Fingerprint) | |
| # from mhfp.encoder import MHFPEncoder | |
| from mhfp.encoder import MHFPEncoder | |
| v = MHFPEncoder.secfp_from_mol(mol, length=4068, radius=radius, rings=True, kekulize=True, min_radius=1) | |
| return {f'ECFP{radius*2}_'+str(idx):1 for idx in np.nonzero(v)[0]} | |
| def erg2dict(mol): | |
| v = AllChem.GetErGFingerprint(mol) | |
| return {'erg'+str(idx):v[idx] for idx in np.nonzero(v)[0]} | |
| def morgan2dict(mol, radius=2, useChirality=True, useBondTypes=True, useFeatures=True, useConts=True): | |
| mdic = AllChem.GetMorganFingerprint(mol, radius=radius, useChirality=useChirality, useBondTypes=True, | |
| useFeatures=True, useCounts=True).GetNonzeroElements() | |
| return {f'm{radius}{useChirality}{useBondTypes}{useFeatures}'+str(kk):mdic[kk]for kk in mdic} | |
| def atompair2dict(mol): | |
| mdic = AllChem.GetAtomPairFingerprint(mol).GetNonzeroElements() | |
| return {f'ap'+str(kk):mdic[kk]for kk in mdic} | |
| def tt2dict(mol): | |
| mdic = AllChem.GetTopologicalTorsionFingerprint(mol).GetNonzeroElements() | |
| return {f'tt'+str(kk):mdic[kk]for kk in mdic} | |
| def rdk2dict(mol): | |
| mdic = AllChem.UnfoldedRDKFingerprintCountBased(mol).GetNonzeroElements() | |
| return {f'rdk'+str(kk):mdic[kk]for kk in mdic} | |
| def pattern2dict(mol): | |
| mdic = AllChem.PatternFingerprint(mol, fpSize=16384).GetOnBits() | |
| return {'pt'+str(kk):1 for kk in mdic} | |
| fingerprintTypes = { | |
| 'MACCS' : lambda k: {'MCCS'+str(ob):1 for ob in AllChem.GetMACCSKeysFingerprint(k).GetOnBits()}, | |
| 'Morgan2CBF' : lambda mol: morgan2dict(mol, 2, True, True, True, True), | |
| 'Morgan4CBF' : lambda mol: morgan2dict(mol, 4, True, True, True, True), | |
| 'Morgan6CBF' : lambda mol: morgan2dict(mol, 6, True, True, True, True), | |
| 'ErG' : erg2dict, | |
| 'AtomPair' : atompair2dict, | |
| 'TopologicalTorsion' : tt2dict, | |
| #'RDK' : lambda k: {'MCCS'+str(ob):1 for ob in AllChem.RDKFingerprint(k).GetOnBits()}, | |
| 'RDK' : rdk2dict, | |
| 'ECFP6' : lambda mol: ecfp2dict(mol, radius=3), | |
| 'Pattern': pattern2dict, | |
| } | |
| def smarts2appl(product_smarts, template_product_smarts, fpsize=2048, v=False, use_tqdm=False, njobs=1, nsplits=1): | |
| """This takes in a list of product smiles (misnamed in code) and a list of product sides | |
| of templates and calculates which templates are applicable to which product. | |
| This is basically a substructure search. Maybe there are faster versions but I wrote this one. | |
| Args: | |
| product_smarts: List of smiles of molecules to check. | |
| template_product_smarts: List of substructures to check | |
| fpsize: fingerprint size to use in screening | |
| v: if v then information will be printed | |
| use_tdqm: if True then a progressbar will be displayed but slows down the computation. | |
| njobs: how many parallel jobs to run in parallel. | |
| nsplits: how many splits should be made along the product_smarts list. Useful to avoid memory | |
| explosion. | |
| Returns: list of tuples (i,j) that indicates the product i has substructure j. | |
| """ | |
| if v: print("Calculating template molecules") | |
| template_mols = [Chem.MolFromSmarts(s) for s in template_product_smarts] | |
| if v: print("Calculating template fingerprints") | |
| template_ebvs = [Chem.PatternFingerprint(m, fpSize=fpsize) for m in template_mols] | |
| if v: print(f'Building template ints: [{len(template_mols)}, {fpsize}]') | |
| template_ints = [int(e.ToBitString(), base=2) for e in template_ebvs] | |
| del template_ebvs | |
| if njobs == 1 and nsplits == 1: | |
| return _smarts2appl(product_smarts, template_product_smarts, template_ints, fpsize, v, use_tqdm) | |
| elif nsplits == 1: | |
| nsplits = njobs | |
| # split products into batches | |
| product_splits = np.array_split(np.array(product_smarts), nsplits) | |
| ioffsets = [0] + list(np.cumsum([p.shape[0] for p in product_splits[:-1]])) | |
| inps = [(ps, template_product_smarts, template_ints, fpsize, v, use_tqdm, ioff, 0) for ps, ioff in zip(product_splits, ioffsets)] | |
| if v: print("Creating workers") | |
| #results = process_map(__smarts2appl, inps, max_workers=njobs, chunksize=1) | |
| with Pool(njobs) as pool: | |
| results = pool.starmap(_smarts2appl, inps) | |
| imatch = np.concatenate([r[0] for r in results]) | |
| jmatch = np.concatenate([r[1] for r in results]) | |
| return imatch, jmatch | |
| def __smarts2appl(inp): | |
| return _smarts2appl(*inp) | |
| def _smarts2appl(product_smarts, template_product_smarts, template_ints, fpsize=2048, v=False, use_tqdm=True, ioffset=0, joffset=0): | |
| """See smarts2appl for a description""" | |
| if v: print("Calculating product molecules") | |
| product_mols = [Chem.MolFromSmiles(s) for s in product_smarts] | |
| if v: print("Calculating product fingerprints") | |
| product_ebvs = [Chem.PatternFingerprint(m, fpSize=fpsize) for m in product_mols] | |
| if v: print(f'Building product ints: [{len(product_mols)}, {fpsize}]') | |
| # This loads each fingerprint into a python integer on which we can use bitwise operations. | |
| product_ints = [int(e.ToBitString(), base=2) for e in product_ebvs] | |
| del product_ebvs | |
| # product_mols = {i: m for i,m in enumerate(product_mols)} | |
| if v: print('Checking symbolically') | |
| # buffer for template molecules. This are handed over as smarts as they are slow to pickle | |
| template_mols = {} | |
| # create iterator and add progressbar if use_tqdm is True | |
| iterator = product(enumerate(product_ints), enumerate(template_ints)) | |
| if use_tqdm: | |
| nelem = len(product_ints) * len(template_ints) | |
| iterator = tqdm(iterator, total=nelem, miniters=1_000_000) | |
| imatch = [] | |
| jmatch = [] | |
| for (i, p_int), (j, t_int) in iterator: | |
| if (p_int & t_int) == t_int: # fingerprint based screen | |
| p = product_mols[i] | |
| t = template_mols.get(j, False) | |
| if not t: | |
| t = Chem.MolFromSmarts(template_product_smarts[j]) | |
| template_mols[j] = t | |
| if p.HasSubstructMatch(t): | |
| imatch.append(i) | |
| jmatch.append(j) | |
| if v: print("Finished loop") | |
| return np.array(imatch)+ioffset, np.array(jmatch)+joffset | |
| def extract_from_reaction(reaction, radius=1, verbose=False): | |
| """adapted from rdchiral package""" | |
| from rdchiral.template_extractor import mols_from_smiles_list, replace_deuterated, get_fragments_for_changed_atoms, expand_changed_atom_tags, canonicalize_transform, get_changed_atoms | |
| reactants = mols_from_smiles_list(replace_deuterated(reaction['reactants']).split('.')) | |
| products = mols_from_smiles_list(replace_deuterated(reaction['products']).split('.')) | |
| # if rdkit cant understand molecule, return | |
| if None in reactants: return {'reaction_id': reaction['_id']} | |
| if None in products: return {'reaction_id': reaction['_id']} | |
| # try to sanitize molecules | |
| try: | |
| #for i in range(len(reactants)): | |
| # reactants[i] = AllChem.RemoveHs(reactants[i]) # *might* not be safe | |
| #for i in range(len(products)): | |
| # products[i] = AllChem.RemoveHs(products[i]) # *might* not be safe | |
| #[Chem.SanitizeMol(mol) for mol in reactants + products] # redundant w/ RemoveHs | |
| for mol in reactants + products: | |
| Chem.SanitizeMol(mol, catchErrors=True) | |
| FastFindRings(mol) #Providing ring info | |
| mol.UpdatePropertyCache(strict=False) #Correcting valence info # important operation | |
| #changed | |
| #[Chem.SanitizeMol(mol, catchErrors=True) for mol in reactants + products] # redundant w/ RemoveHs | |
| #[mol.UpdatePropertyCache() for mol in reactants + products] | |
| except Exception as e: | |
| # can't sanitize -> skip | |
| print(e) | |
| print('Could not load SMILES or sanitize') | |
| print('ID: {}'.format(reaction['_id'])) | |
| return {'reaction_id': reaction['_id']} | |
| are_unmapped_product_atoms = False | |
| extra_reactant_fragment = '' | |
| for product in products: | |
| prod_atoms = product.GetAtoms() | |
| if sum([a.HasProp('molAtomMapNumber') for a in prod_atoms]) < len(prod_atoms): | |
| if verbose: print('Not all product atoms have atom mapping') | |
| if verbose: print('ID: {}'.format(reaction['_id'])) | |
| are_unmapped_product_atoms = True | |
| if are_unmapped_product_atoms: # add fragment to template | |
| for product in products: | |
| prod_atoms = product.GetAtoms() | |
| # Get unmapped atoms | |
| unmapped_ids = [ | |
| a.GetIdx() for a in prod_atoms if not a.HasProp('molAtomMapNumber') | |
| ] | |
| if len(unmapped_ids) > MAXIMUM_NUMBER_UNMAPPED_PRODUCT_ATOMS: | |
| # Skip this example - too many unmapped product atoms! | |
| return | |
| # Define new atom symbols for fragment with atom maps, generalizing fully | |
| atom_symbols = ['[{}]'.format(a.GetSymbol()) for a in prod_atoms] | |
| # And bond symbols... | |
| bond_symbols = ['~' for b in product.GetBonds()] | |
| if unmapped_ids: | |
| extra_reactant_fragment += AllChem.MolFragmentToSmiles( | |
| product, unmapped_ids, | |
| allHsExplicit = False, isomericSmiles = USE_STEREOCHEMISTRY, | |
| atomSymbols = atom_symbols, bondSymbols = bond_symbols | |
| ) + '.' | |
| if extra_reactant_fragment: | |
| extra_reactant_fragment = extra_reactant_fragment[:-1] | |
| if verbose: print(' extra reactant fragment: {}'.format(extra_reactant_fragment)) | |
| # Consolidate repeated fragments (stoichometry) | |
| extra_reactant_fragment = '.'.join(sorted(list(set(extra_reactant_fragment.split('.'))))) | |
| if None in reactants + products: | |
| print('Could not parse all molecules in reaction, skipping') | |
| print('ID: {}'.format(reaction['_id'])) | |
| return {'reaction_id': reaction['_id']} | |
| # Calculate changed atoms | |
| changed_atoms, changed_atom_tags, err = get_changed_atoms(reactants, products) | |
| if err: | |
| if verbose: | |
| print('Could not get changed atoms') | |
| print('ID: {}'.format(reaction['_id'])) | |
| return | |
| if not changed_atom_tags: | |
| if verbose: | |
| print('No atoms changed?') | |
| print('ID: {}'.format(reaction['_id'])) | |
| # print('Reaction SMILES: {}'.format(example_doc['RXN_SMILES'])) | |
| return {'reaction_id': reaction['_id']} | |
| try: | |
| # Get fragments for reactants | |
| reactant_fragments, intra_only, dimer_only = get_fragments_for_changed_atoms(reactants, changed_atom_tags, | |
| radius = radius, expansion = [], category = 'reactants') | |
| # Get fragments for products | |
| # (WITHOUT matching groups but WITH the addition of reactant fragments) | |
| product_fragments, _, _ = get_fragments_for_changed_atoms(products, changed_atom_tags, | |
| radius = radius-1, expansion = expand_changed_atom_tags(changed_atom_tags, reactant_fragments), | |
| category = 'products') | |
| except ValueError as e: | |
| if verbose: | |
| print(e) | |
| print(reaction['_id']) | |
| return {'reaction_id': reaction['_id']} | |
| # Put together and canonicalize (as best as possible) | |
| rxn_string = '{}>>{}'.format(reactant_fragments, product_fragments) | |
| rxn_canonical = canonicalize_transform(rxn_string) | |
| # Change from inter-molecular to intra-molecular | |
| rxn_canonical_split = rxn_canonical.split('>>') | |
| rxn_canonical = rxn_canonical_split[0][1:-1].replace(').(', '.') + \ | |
| '>>' + rxn_canonical_split[1][1:-1].replace(').(', '.') | |
| reactants_string = rxn_canonical.split('>>')[0] | |
| products_string = rxn_canonical.split('>>')[1] | |
| retro_canonical = products_string + '>>' + reactants_string | |
| # Load into RDKit | |
| rxn = AllChem.ReactionFromSmarts(retro_canonical) | |
| # edited | |
| #if rxn.Validate()[1] != 0: | |
| # print('Could not validate reaction successfully') | |
| # print('ID: {}'.format(reaction['_id'])) | |
| # print('retro_canonical: {}'.format(retro_canonical)) | |
| # if VERBOSE: raw_input('Pausing...') | |
| # return {'reaction_id': reaction['_id']} | |
| n_warning, n_errors = rxn.Validate() | |
| if n_errors: | |
| # resolves some errors | |
| rxn = AllChem.ReactionFromSmarts(AllChem.ReactionToSmiles(rxn)) | |
| n_warning, n_errors = rxn.Validate() | |
| template = { | |
| 'products': products_string, | |
| 'reactants': reactants_string, | |
| 'reaction_smarts': retro_canonical, | |
| 'intra_only': intra_only, | |
| 'dimer_only': dimer_only, | |
| 'reaction_id': reaction['_id'], | |
| 'necessary_reagent': extra_reactant_fragment, | |
| 'num_errors': n_errors, | |
| 'num_warnings': n_warning, | |
| } | |
| return template | |
| def extract_template(rxn_smi, radius=1): | |
| if isinstance(rxn_smi, str): | |
| reaction = { | |
| 'reactants': rxn_smi.split('>')[0], | |
| 'products': rxn_smi.split('>')[-1], | |
| 'id': rxn_smi, | |
| '_id': rxn_smi | |
| } | |
| else: | |
| reaction = rxn_smi | |
| try: | |
| res = extract_from_reaction(reaction, radius=radius) | |
| return res['reaction_smarts'] # returns a retro-template | |
| except: | |
| msg = f'failed to extract template from "{rxn_smi}"' | |
| log.warning(msg) | |
| return None | |
| def getTemplateFingerprint(smarts, fp_size=4096): | |
| """ CreateStructuralFingerprintForReaction """ | |
| if isinstance(smarts, (list,)): | |
| return np.vstack([getTemplateFingerprint(sm) for sm in smarts]) | |
| rxn = AllChem.ReactionFromSmarts(str(smarts)) | |
| if rxn is None: | |
| msg = f"{smarts} couldn't be converted to a fingerprint using 0's instead" | |
| log.warning(msg) | |
| #warnings.warn(msg) | |
| return np.zeros(fp_size).astype(np.bool) | |
| return np.array(list(AllChem.CreateStructuralFingerprintForReaction(rxn, )), dtype=np.bool) |