Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import numpy as np | |
| from flare.data.transforms import SpecBinner, SpecBinnerLog, SpecFormulaFeaturizer, SpecFormulaMzFeaturizer, SpecMzIntTokenizer | |
| from massspecgym.data.transforms import SpecTransform, MolTransform | |
| from flare.data.transforms import MolToGraph | |
| import flare.data.datasets as jestr_datasets | |
| import typing as T | |
| from flare.definitions import MSGYM_FORMULA_VECTOR_NORM, MSGYM_STANDARD_MH, PRECURSOR_INTENSITY | |
| import matchms | |
| import tqdm | |
| class Subformula_Loader: | |
| """ | |
| :param dir_path: path to folder containing either MIST or SIRIUS formulas, automatically parses the file type as needed | |
| :param use_prec_mz: add precursor m/z when fragment precursor peak is not present or remove precursor peak when their is no fragment precursor peak | |
| """ | |
| def __init__(self, spectra_view, dir_path, use_prec_mz=True, formula_source='default') -> None: | |
| self.dir_path = dir_path | |
| self.use_prec_mz = use_prec_mz | |
| self.formula_source = formula_source | |
| if spectra_view == 'SpecFormula': | |
| self.load = self.load_subformula_data | |
| elif spectra_view == "SpecFormulaMz": | |
| self.load = self.load_subformula_dict | |
| else: | |
| raise Exception("Spectra view is not supported.") | |
| def __call__(self, ids, form_list, prec_mz_list): | |
| id_to_form_spec = {} | |
| print("Processing formula spectra") | |
| for id, curr_form, curr_prec_mz in tqdm.tqdm(zip(ids, form_list, prec_mz_list), total=len(ids)): | |
| data = self.load(id, curr_form, curr_prec_mz) | |
| if data is not None: | |
| id_to_form_spec[id] = data | |
| return id_to_form_spec | |
| def load_mist_data(self, data, curr_form, curr_prec_mz): | |
| '''MIST subformula format:https://github.com/samgoldman97/mist/blob/main_v2/src/mist/utils/spectra_utils.py | |
| ''' | |
| try: | |
| mzs = np.array(data['output_tbl']['mz']) | |
| formulas = np.array(data['output_tbl']['formula']) | |
| intensities = np.array(data['output_tbl']['ms2_inten']) | |
| if curr_form not in formulas and self.use_prec_mz: | |
| mzs = np.concatenate([mzs, [curr_prec_mz]]) | |
| formulas = np.concatenate([formulas, [curr_form]]) | |
| intensities = np.concatenate([intensities, [PRECURSOR_INTENSITY]]) | |
| elif curr_form in formulas and self.use_prec_mz: | |
| idx = np.where(formulas == curr_form)[0][0] | |
| intensities[idx] = PRECURSOR_INTENSITY | |
| # sort by mzs | |
| ind = mzs.argsort() | |
| mzs = mzs[ind] | |
| formulas = formulas[ind] | |
| intensities = intensities[ind] | |
| return {'formulas': formulas, 'formula_mzs': mzs, 'formula_intensities': intensities} | |
| except: | |
| return None | |
| def load_magma_data(self, data, curr_form, curr_prec_mz): | |
| np.random.seed(42) | |
| formula_to_intensity = {} | |
| formula_to_mz = {} | |
| # data is None | |
| if data is None: | |
| if self.use_prec_mz: | |
| return {'formulas': [curr_form], 'formula_mzs': [curr_prec_mz], 'formula_intensities': [PRECURSOR_INTENSITY]} | |
| else: | |
| return {'formulas': [], 'formula_mzs': [], 'formula_intensities': []} | |
| # randomly choose 1 formula for each peak, keep largest intensity for each formula | |
| if self.formula_source.endswith('1'): | |
| for f, m, i in zip(data['subformulas'], data['mz'], data['intensities']): | |
| if not f: | |
| continue | |
| selected_f = np.random.choice(f) | |
| if selected_f in formula_to_intensity: | |
| if i > formula_to_intensity[selected_f]: | |
| formula_to_intensity[selected_f] = i | |
| formula_to_mz[selected_f] = m | |
| else: | |
| formula_to_intensity[selected_f] = i | |
| formula_to_mz[selected_f] = m | |
| # take all formulas, divide intensity by number of formulas, keep largest intensity for each formula | |
| elif self.formula_source.endswith('all'): | |
| for f, m, i in zip(data['subformulas'], data['mz'], data['intensities']): | |
| if not f: | |
| continue | |
| for fi in f: | |
| if fi in formula_to_intensity: | |
| if i/len(f) > formula_to_intensity[fi]: | |
| formula_to_intensity[fi] = i/len(f) | |
| formula_to_mz[fi] = m | |
| else: | |
| formula_to_intensity[fi] = i/len(f) | |
| formula_to_mz[fi] = m | |
| else: | |
| raise Exception(f"Formula source not supported: {self.formula_source}") | |
| mzs = list(formula_to_mz.values()) | |
| formulas = list(formula_to_mz.keys()) | |
| intensities = list(formula_to_intensity.values()) | |
| # add precursor mz | |
| if self.use_prec_mz: | |
| if curr_form in formulas: | |
| intensities[formulas.index(curr_form)] = PRECURSOR_INTENSITY | |
| else: | |
| formulas.append(curr_form) | |
| intensities.append(PRECURSOR_INTENSITY) | |
| mzs.append(curr_prec_mz) | |
| # sort by mzs | |
| mzs = np.array(mzs) | |
| formulas = np.array(formulas) | |
| intensities = np.array(intensities) | |
| ind = mzs.argsort() | |
| mzs = mzs[ind] | |
| formulas = formulas[ind] | |
| intensities = intensities[ind] | |
| return {'formulas': formulas, 'formula_mzs': mzs, 'formula_intensities': intensities} | |
| def load_sirius_data(self, data): | |
| try: | |
| mzs = np.array([entry['mz'] for entry in data['fragments']]) | |
| formulas = np.array([entry['molecularFormula'] for entry in data['fragments']]) | |
| intensities = np.array([entry['relativeIntensity'] for entry in data['fragments'] ]) | |
| intensities[formulas == data['molecularFormula']] = PRECURSOR_INTENSITY | |
| if not self.use_prec_mz: # removing precursor formula | |
| not_append_prec_mz = np.array([len(entry['peaks']) != 0 for entry in data['fragments']]) | |
| mzs = mzs[not_append_prec_mz] | |
| formulas = formulas[not_append_prec_mz] | |
| intensities = intensities[not_append_prec_mz] | |
| # sort by mzs | |
| ind = mzs.argsort() | |
| mzs = mzs[ind] | |
| formulas = formulas[ind] | |
| intensities = intensities[ind] | |
| return {'formulas': formulas, 'formula_mzs': mzs, 'formula_intensities': intensities} | |
| except: | |
| return None | |
| def load_subformula_data(self, spec_id: str, curr_form: str, curr_prec_mz: float): | |
| try: | |
| file = os.path.join(self.dir_path, spec_id+".json") | |
| with open(file) as f: | |
| data = json.load(f) | |
| if self.formula_source == 'sirius': | |
| return self.load_sirius_data(data) | |
| elif self.formula_source.startswith('magma'): | |
| return self.load_magma_data(data, curr_form, curr_prec_mz) | |
| else: | |
| return self.load_mist_data(data, curr_form, curr_prec_mz) | |
| except: | |
| return None | |
| def load_subformula_dict(self, spec_id: str): | |
| '''MIST subformula format:https://github.com/samgoldman97/mist/blob/main_v2/src/mist/utils/spectra_utils.py | |
| ''' | |
| try: | |
| file = os.path.join(self.dir_path, spec_id+".json") | |
| with open(file) as f: | |
| data = json.load(f) | |
| mzs = np.array(data['output_tbl']['mz']) | |
| formulas = np.array(data['output_tbl']['formula']) | |
| intensities = np.array(data['output_tbl']['ms2_inten']) | |
| mz_to_formulas = {mz:f for mz, f in zip(mzs, formulas)} | |
| for mz, f in zip(mzs, formulas): | |
| mz_to_formulas[mz] = f | |
| ind = mzs.argsort() | |
| mzs = mzs[ind] | |
| formulas = formulas[ind] | |
| intensities = intensities[ind] | |
| return {'formulas': mz_to_formulas, 'formula_mzs': mzs, 'formula_intensities': intensities} | |
| except: | |
| return None | |
| def make_tmp_subformula_spectra(row): | |
| return {'formulas':[row['formula']], 'formula_mzs':[float(row['precursor_mz'])], 'formula_intensities':[1.0]} | |
| def get_spec_featurizer(spectra_view: T.Union[str, list[str]], | |
| params) -> T.Union[SpecTransform, T.Dict[str, SpecTransform]]: | |
| featurizers = {"BinnedSpectra": SpecBinner, | |
| "SpecBinnerLog": SpecBinnerLog, | |
| "SpecFormula": SpecFormulaFeaturizer, | |
| "SpecFormulaMz": SpecFormulaMzFeaturizer, | |
| 'SpecMzIntTokens': SpecMzIntTokenizer} | |
| spectra_featurizer = {} | |
| if isinstance(spectra_view, str): | |
| spectra_view = [spectra_view] | |
| for view in spectra_view: | |
| featurizer_params = {'max_mz': params['max_mz']} | |
| if view in ["BinnedSpectra", "SpecBinnerLog"]: | |
| featurizer_params.update({'bin_width': params['bin_width']}) | |
| elif view in ["SpecFormula", "SpecFormulaMz"]: | |
| featurizer_params.update({'element_list': params['element_list'], 'add_intensities': params['add_intensities'], 'formula_normalize_vector': MSGYM_FORMULA_VECTOR_NORM}) | |
| if view in ("SpecFormulaMz", 'SpecMzIntTokens'): | |
| featurizer_params.update({'mz_mean_std': MSGYM_STANDARD_MH, 'mask_precursor': params['mask_precursor']}) | |
| # featurizer_params.update({'mask_precursor': params['mask_precursor']}) | |
| spectra_featurizer[view] = featurizers[view](**featurizer_params) | |
| return spectra_featurizer | |
| def get_mol_featurizer(molecule_view: T.Union[str, T.List[str]], params) -> MolTransform: | |
| featurizes = {'MolGraph':MolToGraph} | |
| mol_featurizer = {} | |
| if isinstance(molecule_view, str): | |
| molecule_view = [molecule_view] | |
| for view in molecule_view: | |
| featurizer_params = {} | |
| if view in ('MolGraph'): | |
| featurizer_params.update({'atom_feature': params['atom_feature'], 'bond_feature': params['bond_feature'], 'element_list': params['element_list']}) | |
| if len(molecule_view) == 1: | |
| return featurizes[view](**featurizer_params) | |
| mol_featurizer[view] = featurizes[view](**featurizer_params) | |
| return mol_featurizer | |
| def get_test_ms_dataset(spectra_view: T.Union[str, T.List[str]], | |
| mol_view: T.Union[str, T.List[str]], | |
| spectra_featurizer: SpecTransform, | |
| mol_featurizer: MolTransform, | |
| params): | |
| use_formulas = False | |
| views = [] | |
| for v in [spectra_view, mol_view]: | |
| if isinstance(v, str): | |
| views.append(v) | |
| else: views.extend(v) | |
| views = frozenset(views) | |
| dataset_params = {'spectra_view': spectra_view, 'pth': params['dataset_pth'], 'spec_transform': spectra_featurizer, 'mol_transform': mol_featurizer, "candidates_pth": params['candidates_pth']} | |
| if "SpecFormula" in views or "SpecFormulaMz" in views: | |
| dataset_params.update({'subformula_dir_pth': params['subformula_dir_pth'], 'use_magma': params['formula_source'].startswith('magma'), 'formula_source':params['formula_source']}) | |
| use_formulas = True | |
| # if params['use_cons_spec']: | |
| # dataset_params.update({'cons_spec_dir_pth': params['cons_spec_dir_pth']}) | |
| # if 'use_NL_spec' in params and params['use_NL_spec']: | |
| # dataset_params.update({'NL_spec_dir_pth': params['NL_spec_dir_pth']}) | |
| # if params['pred_fp'] or params['use_fp']: | |
| # dataset_params.update({'fp_dir_pth': '', 'fp_size': params['fp_size'], 'fp_radius': params['fp_radius']}) | |
| return jestr_datasets.ExpandedRetrievalDataset(use_formulas=use_formulas, **dataset_params) | |
| def get_ms_dataset(spectra_view: str, | |
| mol_view: str, | |
| spectra_featurizer: SpecTransform, | |
| mol_featurizer: MolTransform, | |
| params): | |
| # set up dataset_parameters | |
| dataset_params = {'pth': params['dataset_pth'], 'spec_transform': spectra_featurizer, 'mol_transform': mol_featurizer, 'spectra_view': spectra_view} | |
| use_formulas = False | |
| if "SpecFormula" in spectra_view: | |
| dataset_params.update({'subformula_dir_pth': params['subformula_dir_pth'], 'formula_source': params['formula_source']}) | |
| use_formulas = True | |
| # select dataset | |
| if use_formulas: | |
| return jestr_datasets.MassSpecDataset_PeakFormulas(**dataset_params) | |
| return jestr_datasets.JESTR1_MassSpecDataset(**dataset_params) | |
| class PrepMatchMS: | |
| def __init__(self, spectra_view) -> None: | |
| if spectra_view == 'SpecFormula': | |
| self.prepare = self.specFormula | |
| elif spectra_view == "SpecFormulaMz": | |
| self.prepare = self.specFormulaMz | |
| elif spectra_view in ('SpecBinnerLog', 'BinnedSpectra', 'SpecMzIntTokenizer'): | |
| self.prepare = self.specMzInt | |
| else: | |
| raise Exception("Spectra view is not supported.") | |
| def specFormulaMz(self, row): | |
| return matchms.Spectrum( | |
| mz = np.array([float(m) for m in row["mzs"].split(",")]), | |
| intensities = np.array( | |
| [float(i) for i in row["intensities"].split(",")] | |
| ), | |
| metadata = {'precursor_mz': row['precursor_mz'], 'formulas': row['formulas']} | |
| ) | |
| def specFormula(self, row): | |
| return matchms.Spectrum( | |
| mz = np.array(row['formula_mzs']), | |
| intensities = np.array(row['formula_intensities']), | |
| metadata = {'precursor_mz': row['precursor_mz'], 'formulas': np.array(row['formulas']), 'precursor_formula': row['precursor_formula']} | |
| ) | |
| def specMzInt(self, row): | |
| return matchms.Spectrum( | |
| mz = row['mzs'], | |
| intensities = row['intensities'], | |
| metadata = {'precursor_mz': row['precursor_mz']} | |
| ) |