import os import json import numpy as np from flare.data.transforms import SpecBinner, SpecBinnerLog, SpecFormulaFeaturizer, SpecFormulaMzFeaturizer, SpecMzIntTokenizer from massspecgym.data.transforms import SpecTransform, MolTransform from flare.data.transforms import MolToGraph import flare.data.datasets as jestr_datasets import typing as T from flare.definitions import MSGYM_FORMULA_VECTOR_NORM, MSGYM_STANDARD_MH, PRECURSOR_INTENSITY import matchms import tqdm class Subformula_Loader: """ :param dir_path: path to folder containing either MIST or SIRIUS formulas, automatically parses the file type as needed :param use_prec_mz: add precursor m/z when fragment precursor peak is not present or remove precursor peak when their is no fragment precursor peak """ def __init__(self, spectra_view, dir_path, use_prec_mz=True, formula_source='default') -> None: self.dir_path = dir_path self.use_prec_mz = use_prec_mz self.formula_source = formula_source if spectra_view == 'SpecFormula': self.load = self.load_subformula_data elif spectra_view == "SpecFormulaMz": self.load = self.load_subformula_dict else: raise Exception("Spectra view is not supported.") def __call__(self, ids, form_list, prec_mz_list): id_to_form_spec = {} print("Processing formula spectra") for id, curr_form, curr_prec_mz in tqdm.tqdm(zip(ids, form_list, prec_mz_list), total=len(ids)): data = self.load(id, curr_form, curr_prec_mz) if data is not None: id_to_form_spec[id] = data return id_to_form_spec def load_mist_data(self, data, curr_form, curr_prec_mz): '''MIST subformula format:https://github.com/samgoldman97/mist/blob/main_v2/src/mist/utils/spectra_utils.py ''' try: mzs = np.array(data['output_tbl']['mz']) formulas = np.array(data['output_tbl']['formula']) intensities = np.array(data['output_tbl']['ms2_inten']) if curr_form not in formulas and self.use_prec_mz: mzs = np.concatenate([mzs, [curr_prec_mz]]) formulas = np.concatenate([formulas, [curr_form]]) intensities = np.concatenate([intensities, [PRECURSOR_INTENSITY]]) elif curr_form in formulas and self.use_prec_mz: idx = np.where(formulas == curr_form)[0][0] intensities[idx] = PRECURSOR_INTENSITY # sort by mzs ind = mzs.argsort() mzs = mzs[ind] formulas = formulas[ind] intensities = intensities[ind] return {'formulas': formulas, 'formula_mzs': mzs, 'formula_intensities': intensities} except: return None def load_magma_data(self, data, curr_form, curr_prec_mz): np.random.seed(42) formula_to_intensity = {} formula_to_mz = {} # data is None if data is None: if self.use_prec_mz: return {'formulas': [curr_form], 'formula_mzs': [curr_prec_mz], 'formula_intensities': [PRECURSOR_INTENSITY]} else: return {'formulas': [], 'formula_mzs': [], 'formula_intensities': []} # randomly choose 1 formula for each peak, keep largest intensity for each formula if self.formula_source.endswith('1'): for f, m, i in zip(data['subformulas'], data['mz'], data['intensities']): if not f: continue selected_f = np.random.choice(f) if selected_f in formula_to_intensity: if i > formula_to_intensity[selected_f]: formula_to_intensity[selected_f] = i formula_to_mz[selected_f] = m else: formula_to_intensity[selected_f] = i formula_to_mz[selected_f] = m # take all formulas, divide intensity by number of formulas, keep largest intensity for each formula elif self.formula_source.endswith('all'): for f, m, i in zip(data['subformulas'], data['mz'], data['intensities']): if not f: continue for fi in f: if fi in formula_to_intensity: if i/len(f) > formula_to_intensity[fi]: formula_to_intensity[fi] = i/len(f) formula_to_mz[fi] = m else: formula_to_intensity[fi] = i/len(f) formula_to_mz[fi] = m else: raise Exception(f"Formula source not supported: {self.formula_source}") mzs = list(formula_to_mz.values()) formulas = list(formula_to_mz.keys()) intensities = list(formula_to_intensity.values()) # add precursor mz if self.use_prec_mz: if curr_form in formulas: intensities[formulas.index(curr_form)] = PRECURSOR_INTENSITY else: formulas.append(curr_form) intensities.append(PRECURSOR_INTENSITY) mzs.append(curr_prec_mz) # sort by mzs mzs = np.array(mzs) formulas = np.array(formulas) intensities = np.array(intensities) ind = mzs.argsort() mzs = mzs[ind] formulas = formulas[ind] intensities = intensities[ind] return {'formulas': formulas, 'formula_mzs': mzs, 'formula_intensities': intensities} def load_sirius_data(self, data): try: mzs = np.array([entry['mz'] for entry in data['fragments']]) formulas = np.array([entry['molecularFormula'] for entry in data['fragments']]) intensities = np.array([entry['relativeIntensity'] for entry in data['fragments'] ]) intensities[formulas == data['molecularFormula']] = PRECURSOR_INTENSITY if not self.use_prec_mz: # removing precursor formula not_append_prec_mz = np.array([len(entry['peaks']) != 0 for entry in data['fragments']]) mzs = mzs[not_append_prec_mz] formulas = formulas[not_append_prec_mz] intensities = intensities[not_append_prec_mz] # sort by mzs ind = mzs.argsort() mzs = mzs[ind] formulas = formulas[ind] intensities = intensities[ind] return {'formulas': formulas, 'formula_mzs': mzs, 'formula_intensities': intensities} except: return None def load_subformula_data(self, spec_id: str, curr_form: str, curr_prec_mz: float): try: file = os.path.join(self.dir_path, spec_id+".json") with open(file) as f: data = json.load(f) if self.formula_source == 'sirius': return self.load_sirius_data(data) elif self.formula_source.startswith('magma'): return self.load_magma_data(data, curr_form, curr_prec_mz) else: return self.load_mist_data(data, curr_form, curr_prec_mz) except: return None def load_subformula_dict(self, spec_id: str): '''MIST subformula format:https://github.com/samgoldman97/mist/blob/main_v2/src/mist/utils/spectra_utils.py ''' try: file = os.path.join(self.dir_path, spec_id+".json") with open(file) as f: data = json.load(f) mzs = np.array(data['output_tbl']['mz']) formulas = np.array(data['output_tbl']['formula']) intensities = np.array(data['output_tbl']['ms2_inten']) mz_to_formulas = {mz:f for mz, f in zip(mzs, formulas)} for mz, f in zip(mzs, formulas): mz_to_formulas[mz] = f ind = mzs.argsort() mzs = mzs[ind] formulas = formulas[ind] intensities = intensities[ind] return {'formulas': mz_to_formulas, 'formula_mzs': mzs, 'formula_intensities': intensities} except: return None def make_tmp_subformula_spectra(row): return {'formulas':[row['formula']], 'formula_mzs':[float(row['precursor_mz'])], 'formula_intensities':[1.0]} def get_spec_featurizer(spectra_view: T.Union[str, list[str]], params) -> T.Union[SpecTransform, T.Dict[str, SpecTransform]]: featurizers = {"BinnedSpectra": SpecBinner, "SpecBinnerLog": SpecBinnerLog, "SpecFormula": SpecFormulaFeaturizer, "SpecFormulaMz": SpecFormulaMzFeaturizer, 'SpecMzIntTokens': SpecMzIntTokenizer} spectra_featurizer = {} if isinstance(spectra_view, str): spectra_view = [spectra_view] for view in spectra_view: featurizer_params = {'max_mz': params['max_mz']} if view in ["BinnedSpectra", "SpecBinnerLog"]: featurizer_params.update({'bin_width': params['bin_width']}) elif view in ["SpecFormula", "SpecFormulaMz"]: featurizer_params.update({'element_list': params['element_list'], 'add_intensities': params['add_intensities'], 'formula_normalize_vector': MSGYM_FORMULA_VECTOR_NORM}) if view in ("SpecFormulaMz", 'SpecMzIntTokens'): featurizer_params.update({'mz_mean_std': MSGYM_STANDARD_MH, 'mask_precursor': params['mask_precursor']}) # featurizer_params.update({'mask_precursor': params['mask_precursor']}) spectra_featurizer[view] = featurizers[view](**featurizer_params) return spectra_featurizer def get_mol_featurizer(molecule_view: T.Union[str, T.List[str]], params) -> MolTransform: featurizes = {'MolGraph':MolToGraph} mol_featurizer = {} if isinstance(molecule_view, str): molecule_view = [molecule_view] for view in molecule_view: featurizer_params = {} if view in ('MolGraph'): featurizer_params.update({'atom_feature': params['atom_feature'], 'bond_feature': params['bond_feature'], 'element_list': params['element_list']}) if len(molecule_view) == 1: return featurizes[view](**featurizer_params) mol_featurizer[view] = featurizes[view](**featurizer_params) return mol_featurizer def get_test_ms_dataset(spectra_view: T.Union[str, T.List[str]], mol_view: T.Union[str, T.List[str]], spectra_featurizer: SpecTransform, mol_featurizer: MolTransform, params): use_formulas = False views = [] for v in [spectra_view, mol_view]: if isinstance(v, str): views.append(v) else: views.extend(v) views = frozenset(views) dataset_params = {'spectra_view': spectra_view, 'pth': params['dataset_pth'], 'spec_transform': spectra_featurizer, 'mol_transform': mol_featurizer, "candidates_pth": params['candidates_pth']} if "SpecFormula" in views or "SpecFormulaMz" in views: dataset_params.update({'subformula_dir_pth': params['subformula_dir_pth'], 'use_magma': params['formula_source'].startswith('magma'), 'formula_source':params['formula_source']}) use_formulas = True # if params['use_cons_spec']: # dataset_params.update({'cons_spec_dir_pth': params['cons_spec_dir_pth']}) # if 'use_NL_spec' in params and params['use_NL_spec']: # dataset_params.update({'NL_spec_dir_pth': params['NL_spec_dir_pth']}) # if params['pred_fp'] or params['use_fp']: # dataset_params.update({'fp_dir_pth': '', 'fp_size': params['fp_size'], 'fp_radius': params['fp_radius']}) return jestr_datasets.ExpandedRetrievalDataset(use_formulas=use_formulas, **dataset_params) def get_ms_dataset(spectra_view: str, mol_view: str, spectra_featurizer: SpecTransform, mol_featurizer: MolTransform, params): # set up dataset_parameters dataset_params = {'pth': params['dataset_pth'], 'spec_transform': spectra_featurizer, 'mol_transform': mol_featurizer, 'spectra_view': spectra_view} use_formulas = False if "SpecFormula" in spectra_view: dataset_params.update({'subformula_dir_pth': params['subformula_dir_pth'], 'formula_source': params['formula_source']}) use_formulas = True # select dataset if use_formulas: return jestr_datasets.MassSpecDataset_PeakFormulas(**dataset_params) return jestr_datasets.JESTR1_MassSpecDataset(**dataset_params) class PrepMatchMS: def __init__(self, spectra_view) -> None: if spectra_view == 'SpecFormula': self.prepare = self.specFormula elif spectra_view == "SpecFormulaMz": self.prepare = self.specFormulaMz elif spectra_view in ('SpecBinnerLog', 'BinnedSpectra', 'SpecMzIntTokenizer'): self.prepare = self.specMzInt else: raise Exception("Spectra view is not supported.") def specFormulaMz(self, row): return matchms.Spectrum( mz = np.array([float(m) for m in row["mzs"].split(",")]), intensities = np.array( [float(i) for i in row["intensities"].split(",")] ), metadata = {'precursor_mz': row['precursor_mz'], 'formulas': row['formulas']} ) def specFormula(self, row): return matchms.Spectrum( mz = np.array(row['formula_mzs']), intensities = np.array(row['formula_intensities']), metadata = {'precursor_mz': row['precursor_mz'], 'formulas': np.array(row['formulas']), 'precursor_formula': row['precursor_formula']} ) def specMzInt(self, row): return matchms.Spectrum( mz = row['mzs'], intensities = row['intensities'], metadata = {'precursor_mz': row['precursor_mz']} )