File size: 8,303 Bytes
d9df210
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import os
import json
import numpy as np

from mvp.data.transforms import SpecBinner, SpecBinnerLog, SpecFormulaFeaturizer
from massspecgym.data.transforms import SpecTransform, MolTransform
from mvp.data.transforms import MolToGraph
import mvp.data.datasets as jestr_datasets
import typing as T
from mvp.definitions import MSGYM_FORMULA_VECTOR_NORM
import matchms

class Subformula_Loader:
    def __init__(self, spectra_view, dir_path) -> None:

        self.dir_path = dir_path
        if spectra_view == 'SpecFormula':
            self.load = self.load_subformula_data
        elif spectra_view == "SpecFormulaMz":
            self.load = self.load_subformula_dict
        else:
            raise Exception("Spectra view is not supported.")

    def __call__(self, ids):
        id_to_form_spec = {}
        for id in ids:
            data = self.load(id)
            if data:
                id_to_form_spec[id] = data
        
        return id_to_form_spec
    
    def load_subformula_data(self, spec_id: str):
        '''MIST subformula format:https://github.com/samgoldman97/mist/blob/main_v2/src/mist/utils/spectra_utils.py 
        '''
        try:
            file = os.path.join(self.dir_path, spec_id+".json")
            with open(file) as f:
                data = json.load(f)
            mzs = np.array(data['output_tbl']['mz'])
            formulas = np.array(data['output_tbl']['formula'])
            intensities = np.array(data['output_tbl']['ms2_inten'])

            # sort by mzs
            ind = mzs.argsort()
            mzs = mzs[ind]
            formulas = formulas[ind]
            intensities = intensities[ind]
            return {'formulas': formulas, 'formula_mzs': mzs, 'formula_intensities': intensities}
        except:
            return None

    def load_subformula_dict(self, spec_id: str):
        '''MIST subformula format:https://github.com/samgoldman97/mist/blob/main_v2/src/mist/utils/spectra_utils.py 
        '''
        try:
            file = os.path.join(self.dir_path, spec_id+".json")
            with open(file) as f:
                data = json.load(f)
            mzs = np.array(data['output_tbl']['mz'])
            formulas = np.array(data['output_tbl']['formula'])
            intensities = np.array(data['output_tbl']['ms2_inten'])

            mz_to_formulas = {mz:f for mz, f in zip(mzs, formulas)}
            for mz, f in zip(mzs, formulas):
                mz_to_formulas[mz] = f
            
            ind = mzs.argsort()
            mzs = mzs[ind]
            formulas = formulas[ind]
            intensities = intensities[ind]
            return {'formulas': mz_to_formulas, 'formula_mzs': mzs, 'formula_intensities': intensities}
        except:
            return None

def make_tmp_subformula_spectra(row):
        return {'formulas':[row['formula']], 'formula_mzs':[float(row['precursor_mz'])], 'formula_intensities':[1.0]}

def get_spec_featurizer(spectra_view: T.Union[str, list[str]],
                         params) -> T.Union[SpecTransform, T.Dict[str, SpecTransform]]:
    
    featurizers = {"BinnedSpectra": SpecBinner,
        "SpecBinnerLog": SpecBinnerLog,
        "SpecFormula": SpecFormulaFeaturizer}

    spectra_featurizer = {}

    if isinstance(spectra_view, str):
        spectra_view = [spectra_view]

    for view in spectra_view:
        featurizer_params = {'max_mz': params['max_mz']}
        if view in ["BinnedSpectra", "SpecBinnerLog"]:
            featurizer_params.update({'bin_width': params['bin_width']})
        elif view in ["SpecFormula"]:
            featurizer_params.update({'element_list': params['element_list'], 'add_intensities': params['add_intensities'], 'formula_normalize_vector': MSGYM_FORMULA_VECTOR_NORM})
        
        spectra_featurizer[view] = featurizers[view](**featurizer_params)

    return spectra_featurizer

def get_mol_featurizer(molecule_view: T.Union[str, T.List[str]], params) -> MolTransform:
    featurizes = {'MolGraph':MolToGraph}
    mol_featurizer = {}

    if isinstance(molecule_view, str):
        molecule_view = [molecule_view]
    for view in molecule_view:
        featurizer_params = {}
        if view in ('MolGraph'):
            featurizer_params.update({'atom_feature': params['atom_feature'], 'bond_feature': params['bond_feature'], 'element_list': params['element_list']})
        
        if len(molecule_view) == 1:
            return featurizes[view](**featurizer_params)

        mol_featurizer[view] = featurizes[view](**featurizer_params)
    
    return mol_featurizer

def get_test_ms_dataset(spectra_view: T.Union[str, T.List[str]],
                 mol_view: T.Union[str, T.List[str]],
                 spectra_featurizer: SpecTransform,
                 mol_featurizer: MolTransform,
                 params,
                external_test: bool = False,):
    
    use_formulas = False
    
    views = []
    for v in [spectra_view, mol_view]:
        if isinstance(v, str):
            views.append(v)
        else: views.extend(v)
    views = frozenset(views)

    dataset_params = {'spectra_view': spectra_view, 'pth': params['dataset_pth'], 'spec_transform': spectra_featurizer, 'mol_transform': mol_featurizer, "candidates_pth": params['candidates_pth']}
    if "SpecFormula" in views or "SpecFormulaMz" in views:
        dataset_params.update({'subformula_dir_pth': params['subformula_dir_pth']})
        use_formulas = True 
    
    if params['use_cons_spec']:
        dataset_params.update({'cons_spec_dir_pth': params['cons_spec_dir_pth']})

    if params['pred_fp'] or params['use_fp']:
        dataset_params.update({'fp_dir_pth': '', 'fp_size': params['fp_size'], 'fp_radius': params['fp_radius']})

    return jestr_datasets.ExpandedRetrievalDataset(use_formulas=use_formulas, external_test=external_test, **dataset_params)
    
def get_ms_dataset(spectra_view: str,
                 mol_view: str,
                 spectra_featurizer: SpecTransform,
                 mol_featurizer: MolTransform,
                 params):
    

    # set up dataset_parameters
    dataset_params = {'pth': params['dataset_pth'], 'spec_transform': spectra_featurizer, 'mol_transform': mol_featurizer, 'spectra_view': spectra_view}
    use_formulas = False
    if "SpecFormula" in spectra_view:
        dataset_params.update({'subformula_dir_pth': params['subformula_dir_pth']})
        use_formulas = True

    if params['pred_fp'] or params['use_fp']:
        dataset_params.update({'fp_dir_pth': params['fp_dir_pth']})
        
    if params['use_cons_spec']:
        dataset_params.update({'cons_spec_dir_pth': params['cons_spec_dir_pth']})

    # select dataset
    if params['aug_cands']:
        return jestr_datasets.MassSpecDataset_Candidates(**dataset_params)
    elif use_formulas:
        return jestr_datasets.MassSpecDataset_PeakFormulas(**dataset_params)
    
    return jestr_datasets.JESTR1_MassSpecDataset(**dataset_params)

class PrepMatchMS:
    def __init__(self, spectra_view) -> None:

        if spectra_view == 'SpecFormula':
            self.prepare = self.specFormula
        elif spectra_view == "SpecFormulaMz":
            self.prepare = self.specFormulaMz
        elif spectra_view in ('SpecBinnerLog', 'BinnedSpectra', 'SpecMzIntTokenizer'):
            self.prepare = self.specMzInt
        else:
            raise Exception("Spectra view is not supported.")
        
    def specFormulaMz(self, row):
        
        return matchms.Spectrum(
            mz = np.array([float(m) for m in row["mzs"].split(",")]),
            intensities = np.array(
                        [float(i) for i in row["intensities"].split(",")]
                    ),
            metadata = {'precursor_mz': row['precursor_mz'], 'formulas': row['formulas']}
        )
    
    def specFormula(self, row):

        return matchms.Spectrum(
            mz = np.array(row['formula_mzs']),
            intensities = np.array(row['formula_intensities']),
            metadata = {'precursor_mz': row['precursor_mz'], 'formulas': np.array(row['formulas']), 'precursor_formula': row['precursor_formula']}
        )
    
    def specMzInt(self, row):
        return matchms.Spectrum(
            mz = row['mzs'],
            intensities = row['intensities'],
            metadata = {'precursor_mz': row['precursor_mz']}
        )