Spaces:
Sleeping
Sleeping
File size: 5,445 Bytes
78ba665 86f6f17 78ba665 86f6f17 78ba665 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
from matchms.importing import load_from_mgf
import yaml
import numpy as np
from mvp.subformula_assign.utils.spectra_utils import assign_subforms
import tempfile
import json
import os
from functools import partial
from pytorch_lightning import Trainer
from massspecgym.models.base import Stage
from mvp.data.data_module import TestDataModule
from mvp.data.datasets import ContrastiveDataset
from mvp.utils.data import get_spec_featurizer, get_mol_featurizer, get_test_ms_dataset
from mvp.utils.models import get_model
import pandas as pd
# check formspec requirements
def check_formspec_requirements(spectra):
for spec in spectra:
if 'formula' not in spec.metadata or 'adduct' not in spec.metadata:
return False
return True
# preprocess spectra
def preprocess_spectra(mgf_path, model_choice, mass_diff_thresh=20, dataset_pth=None, subformula_dir=None):
if dataset_pth is None:
dataset_pth = os.path.join(tempfile.gettempdir(), f"mvp_data.tsv")
if subformula_dir is None:
subformula_dir = os.path.join(tempfile.gettempdir(), f"mvp_subformulae")
os.makedirs(subformula_dir, exist_ok=True)
# load mgf file
spectra = list(load_from_mgf(mgf_path))
columns = ['identifier', 'formula', 'adduct', 'precursor_mz', 'precursor_formula', 'mzs', 'intensities', 'fold']
data = []
try:
for spec in spectra:
identifier = spec.metadata['title']
formula = spec.metadata.get('formula', None)
adduct = spec.metadata.get('adduct', None)
precursor_mz = spec.metadata['precursor_mz']
precursor_formula = spec.metadata['formula'] # technically incorrect, but we don't use it
mzs = spec.peaks.mz
intensities = spec.peaks.intensities
if model_choice == "formSpec":
if formula is None or adduct is None:
return None, None
ms = [(m, i) for m, i in zip(mzs, intensities)]
# annotate peaks
x = assign_subforms(formula, np.array(ms), adduct, mass_diff_thresh=mass_diff_thresh)
if x['output_tbl'] is None:
continue
# save json file
json_file = os.path.join(subformula_dir, f"{identifier}.json")
with open(json_file, 'w') as f:
json.dump(x['output_tbl'], f)
mzs = ','.join([str(m) for m in mzs])
intensities = ','.join([str(i) for i in intensities])
data.append([identifier, formula, adduct, precursor_mz, precursor_formula, mzs, intensities, 'test'])
df = pd.DataFrame(data, columns=columns)
df.to_csv(dataset_pth, sep='\t', index=False)
return dataset_pth, subformula_dir
except Exception as e:
return None, None
def setup_config(model_choice, dataset_pth, candidates_pth, subformula_dir):
if model_choice == "binnedSpec":
param_file = f"mvp/params_binnedSpec.yaml"
checkpoint_path = f"pretrained_models/msgym_binnedSpec.ckpt"
elif model_choice == "formSpec":
param_file = f"mvp/params_formSpec.yaml"
checkpoint_path = f"pretrained_models/msgym_formSpec.ckpt"
# load yaml
with open(param_file, 'r') as f:
params = yaml.safe_load(f)
params['dataset_pth'] = dataset_pth
params['candidates_pth'] = candidates_pth
params['subformula_dir_pth'] = subformula_dir
params['experiment_dir'] = tempfile.mkdtemp()
params['checkpoint_pth'] = checkpoint_path
params['df_test_path'] = os.path.join(params['experiment_dir'], f"results_{model_choice}.pkl")
return params
def run_inference(params):
# Load dataset
spec_featurizer = get_spec_featurizer(params['spectra_view'], params)
mol_featurizer = get_mol_featurizer(params['molecule_view'], params)
dataset = get_test_ms_dataset(params['spectra_view'], params['molecule_view'], spec_featurizer, mol_featurizer, params, external_test=True)
# Init data module
collate_fn = partial(ContrastiveDataset.collate_fn, spec_enc=params['spec_enc'], spectra_view=params['spectra_view'], stage=Stage.TEST)
data_module = TestDataModule(
dataset=dataset,
collate_fn=collate_fn,
split_pth=params['split_pth'],
batch_size=params['batch_size'],
num_workers=params['num_workers']
)
model = get_model(params['model'], params)
# print(model.hparams)
model.df_test_path = params['df_test_path']
model.external_test = True
model.hparams['use_fp'] = False
model.hparams["contr_views"] = [['spec_enc', 'mol_enc']]
model.hparams['use_cons_spec'] = False
# Init trainer
trainer = Trainer(
accelerator='cpu',
devices=1,
default_root_dir=params['experiment_dir'],
)
# Prepare data module to test
data_module.prepare_data()
data_module.setup(stage="test")
# Test
trainer.test(model, datamodule=data_module)
if __name__ == "__main__":
# test run
mgf_path = "data/app/data.mgf"
model_choice = "formSpec"
candidates_pth = "data/app/identifier_to_candidates.json"
mass_diff_thresh = 20
dataset_pth, subformula_dir = preprocess_spectra(mgf_path, model_choice, mass_diff_thresh=mass_diff_thresh)
params = setup_config(model_choice, dataset_pth, candidates_pth, subformula_dir)
run_inference(params) |