| import matplotlib |
| from itertools import chain |
| import os |
| os.environ['MPLCONFIGDIR'] = os.getcwd() + "/configs/" |
| import re |
| import hnswlib |
| import string |
| import random |
| import shutil |
| import pickle |
| import numpy as np |
| import pandas as pd |
| from itertools import chain |
| from matplotlib.figure import Figure |
| import matchms.filtering as msfilters |
| from hnswlib import Index |
| from rdkit import Chem |
| from rdkit.Chem import Draw, rdFMCS |
| from molmass import Formula |
| from matchms.Spectrum import Spectrum |
| from matchms.importing import load_from_mgf,load_from_msp |
| from gensim.models import Word2Vec |
| from core.identification import identify_unknown, match_spectrum |
| import matplotlib.pyplot as plt |
| from matplotlib.figure import Figure |
| import gradio as gr |
| from matchms.plotting import plot_spectrum |
| from matchms.importing import load_from_msp,load_from_mgf |
| from matplotlib import pyplot as plt |
| import matplotlib |
| import numpy as np |
| import pandas as pd |
| from rdkit import Chem |
| from rdkit.Chem import Draw |
| import os |
| from zipfile import ZipFile |
| import time |
| import pickle |
| from matchms import filtering as msfilters |
| from rdkit import Chem |
| from rdkit.Chem import Draw, rdFMCS |
| from molmass import Formula |
| from matchms.plotting import plot_spectra_mirror |
| from zipfile import ZipFile |
| import hashlib |
| import zipfile |
| matplotlib.use('Agg') |
|
|
|
|
|
|
| default_index_positive = 'data/references_index_positive_spec2vec.bin' |
| default_index_negative = 'data/references_index_negative_spec2vec.bin' |
| default_reference_positive = 'data/references_spectrums_positive.pickle' |
| default_reference_negative = 'data/references_spectrums_negative.pickle' |
| print('Start Loading database') |
| default_database = pd.read_csv('data/DeepMassStructureDB-v1.0.csv', low_memory=False) |
| print('Start Loading Word2Vec') |
| deepmass_positive = Word2Vec.load("model/Ms2Vec_allGNPSpositive.hdf5") |
| deepmass_negative = Word2Vec.load("model/Ms2Vec_allGNPSnegative.hdf5") |
| print('Start Loading negative reference') |
|
|
| with open(default_reference_negative, 'rb') as file: |
| reference_negative = pickle.load(file) |
| print('Start Loading positive reference') |
| with open(default_reference_positive, 'rb') as file: |
| reference_positive = pickle.load(file) |
| print('Start Loading hnsw index') |
| index_negative = Index(space = 'l2', dim = 300) |
| index_negative.load_index(default_index_negative) |
|
|
| index_positive = Index(space = 'l2', dim = 300) |
| index_positive.load_index(default_index_positive) |
|
|
| precursors_positive = np.array([s.get('precursor_mz') for s in reference_positive]) |
| precursors_negative = np.array([s.get('precursor_mz') for s in reference_negative]) |
| print('Finish!!!') |
| print('-'*100) |
|
|
| def identify_pos(spectrum): |
| return identify_unknown(spectrum,index_positive,deepmass_positive,reference_positive,default_database) |
| def identify_neg(spectrum): |
| return identify_unknown(spectrum,index_negative,deepmass_negative,reference_negative,default_database) |
|
|
| def match_pos(spectrum): |
| return match_spectrum(spectrum,precursors_positive,reference_positive) |
|
|
| def match_neg(spectrum): |
| return match_spectrum(spectrum,precursors_negative,reference_negative) |
|
|
|
|
|
|
| def plot_2_spectrum(spectrum,reference,loss=False): |
|
|
| mz, abunds = spectrum.peaks.mz, spectrum.peaks.intensities |
| mz1, abunds1 = reference.peaks.mz, reference.peaks.intensities |
| if loss: |
| try: |
| spectrum = msfilters.add_parent_mass(spectrum) |
| spectrum = msfilters.add_losses(spectrum, loss_mz_from=10.0, loss_mz_to=2000.0) |
| reference = msfilters.add_parent_mass(reference) |
| reference = msfilters.add_losses(reference, loss_mz_from=10.0, loss_mz_to=2000.0) |
| mz, abunds = spectrum.losses.mz, spectrum.losses.intensities |
| mz1, abunds1 = reference.losses.mz, reference.losses.intensities |
| except: |
| print('Cannot Plot Losses') |
| return |
| abunds /= np.max(abunds) |
| abunds1 /= np.max(abunds1) |
|
|
| fig = Figure(figsize=(2, 1), dpi=300) |
| fig.subplots_adjust(top=0.95,bottom=0.3,left=0.18,right=0.95) |
|
|
| axes = fig.add_subplot(111) |
| axes.tick_params(width=0.8,labelsize=3) |
| axes.spines['bottom'].set_linewidth(0.5) |
| axes.spines['left'].set_linewidth(0.5) |
| axes.spines['right'].set_linewidth(0.5) |
| axes.spines['top'].set_linewidth(0.5) |
| axes.tick_params(width=0.8,labelsize=3) |
| axes.vlines(mz, ymin=0, ymax=abunds, color='r', lw = 0.5) |
| axes.vlines(mz1, ymin = 0, ymax = -abunds1, color='b', lw = 0.5) |
| axes.axhline(y=0,color='black', lw = 0.5) |
| axes.set_xlabel('m/z', fontsize = 3.5) |
| axes.set_ylabel('abundance', fontsize = 3.5) |
| return fig |
|
|
| def show_ref_spectrum(cur_spectrum,evt: gr.SelectData): |
| line_num = evt.index[0] |
| fig_loss = plot_2_spectrum(cur_spectrum,cur_spectrum.metadata['reference'][line_num],loss=True) |
| fig = plot_2_spectrum(cur_spectrum,cur_spectrum.metadata['reference'][line_num],loss=False) |
| return fig_loss,fig |
|
|
| def plot_2_mol(smi_anno,smi_ref,hightlight=True): |
| mol_anno = Chem.MolFromSmiles(smi_anno) |
| mol_ref = Chem.MolFromSmiles(smi_ref) |
| if hightlight: |
| mcs = rdFMCS.FindMCS([mol_anno, mol_ref], bondCompare=rdFMCS.BondCompare.CompareOrderExact, |
| matchValences = True, ringMatchesRingOnly = True) |
| mcs_str = mcs.smartsString |
| mcs_mol = Chem.MolFromSmarts(mcs_str) |
| allsubs_anno = tuple(chain.from_iterable(mol_anno.GetSubstructMatches(mcs_mol))) |
| allsubs_ref = tuple(chain.from_iterable(mol_ref.GetSubstructMatches(mcs_mol))) |
| else: |
| allsubs_anno = () |
| allsubs_ref = () |
|
|
| ref_img = Draw.MolToImage(mol_ref, highlightAtoms=allsubs_ref, wedgeBonds=False) |
| anno_img = Draw.MolToImage(mol_anno, highlightAtoms=allsubs_anno, wedgeBonds=False) |
| return anno_img,ref_img |
|
|
| def show_mol(structure_state,cur_spectrum,evt: gr.SelectData): |
| line_num = evt.index[0] |
| ref_smi = cur_spectrum.metadata['reference'][line_num].metadata['smiles'] |
| anno_img,ref_img = plot_2_mol(structure_state,ref_smi) |
| return anno_img,ref_img |
|
|
| def show_info(cur_spectrum,evt: gr.SelectData): |
| line_num = evt.index[0] |
| d = cur_spectrum.metadata['reference'][line_num].metadata |
| df = pd.DataFrame.from_dict(d, orient='index', columns=['value']) |
| df.reset_index(inplace=True) |
| df.rename(columns={'index': 'key'}, inplace=True) |
| return df |
|
|
| def show_ref_spectrums(spectrum_state,structure_obj,evt: gr.SelectData): |
| line_num = evt.index[0] |
| smi_anno = structure_obj['CanonicalSMILES'][line_num] |
| current_reference = spectrum_state.metadata['reference'] |
| annotation = spectrum_state.metadata['annotation'] |
| i = np.where(annotation['CanonicalSMILES'].values == smi_anno)[0][0] |
| reference_table = [] |
| for s in current_reference: |
| if 'smiles' in s.metadata.keys(): |
| smiles = s.metadata['smiles'] |
| else: |
| smiles = '' |
| if 'compound_name' in s.metadata.keys(): |
| name = s.metadata['compound_name'] |
| else: |
| name = smiles |
| if 'adduct' in s.metadata.keys(): |
| adduct = s.metadata['adduct'] |
| else: |
| adduct = '' |
| if 'parent_mass' in s.metadata.keys(): |
| parent_mass = s.metadata['parent_mass'] |
| else: |
| parent_mass = '' |
| if 'database' in s.metadata.keys(): |
| ref_database = s.metadata['database'] |
| else: |
| ref_database = '' |
| reference_table.append([name, adduct, smiles, parent_mass, ref_database]) |
| reference_table = pd.DataFrame(reference_table, columns = ['name', 'adduct', 'smiles', 'parent_mass', 'database']) |
|
|
| return reference_table,smi_anno |
|
|
|
|
|
|
| def show_formula(res_state,evt: gr.SelectData): |
| print(evt) |
| print(evt.__dict__) |
| print(f"You selected {evt.value} at {evt.index} from {evt.target}") |
|
|
| |
| line_num = evt.index[0] |
| formula_list = np.unique(res_state['Identified Spectrum'][line_num].metadata['annotation']['MolecularFormula']) |
| cur_spectrum = res_state['Identified Spectrum'][line_num] |
| formula_df = pd.DataFrame({ |
| 'Formula':formula_list |
| }) |
| return cur_spectrum,formula_df |
|
|
| def show_structure(spectrum_state,evt:gr.SelectData): |
| line_num = evt.index[0] |
| formula_list = spectrum_state.metadata['annotation']['MolecularFormula'] |
| select_formula = formula_list[line_num] |
|
|
| annotation = spectrum_state.metadata['annotation'] |
| structural_table = annotation.loc[annotation['MolecularFormula'] == select_formula,:] |
| structural_table = structural_table.reset_index(drop = True) |
| return select_formula,structural_table |
|
|
|
|
| def load_files(file_list): |
| spectrum_list = [] |
| for fileName in file_list: |
| spectrum_list += [s for s in load_from_mgf(fileName) if 'compound_name' in list(s.metadata.keys())] |
| titles = [s.metadata['compound_name'] for s in spectrum_list] |
| spectrums_df = pd.DataFrame({'title': titles, 'spectrum': spectrum_list}) |
|
|
| |
| name_list = spectrums_df[['title']] |
|
|
| return spectrums_df,name_list |
|
|
| def id_spectrum_list(spectrum_list,progress=None,is_deepmass=True): |
| res = [] |
| if is_deepmass: |
| for s in progress.tqdm(spectrum_list): |
| if 'ionmode' in s.metadata.keys(): |
| if s.metadata['ionmode'] == 'negative': |
| sn = identify_neg(s) |
| else: |
| sn = identify_pos(s) |
| else: |
| sn = identify_pos(s) |
| res.append(sn) |
| else: |
| for s in progress.tqdm(spectrum_list): |
| if 'ionmode' in s.metadata.keys(): |
| if s.metadata['ionmode'] == 'negative': |
| sn = match_neg(s) |
| else: |
| sn = match_pos(s) |
| else: |
| sn = match_pos(s) |
| res.append(sn) |
| return res |
|
|
| def deepms_click_fn(state_df, progress=gr.Progress()): |
| """点击run deepms的按钮触发事件 |
| |
| Args: |
| state_df (_type_): _description_ |
| 输入为一个dataframe,列名为title,spectrum |
| |
| Returns: |
| _type_: _description_ |
| 更新下列状态 |
| res_state,增加 identified spectrum字段,内为注释过的spectrum对象 |
| spectrum_state,设置选中的spectrum |
| formula_state,,设置选中的formula |
| structure_state,,设置选中的structure |
| |
| """ |
| |
| |
| |
| res = id_spectrum_list(state_df['spectrum'],progress) |
| |
| state_df['Identified Spectrum'] = res |
| |
| annotation = res[0].metadata['annotation'] |
| formula_list = np.unique(annotation['MolecularFormula']) |
| formula_df = pd.DataFrame({ |
| 'Formula':formula_list |
| }) |
|
|
| spectrum_state = res[0] |
| formula_state = annotation['MolecularFormula'][0] |
| structural_table = annotation.loc[annotation['MolecularFormula'] == formula_state,:] |
| structure_state = structural_table['CanonicalSMILES'][0] |
|
|
| return state_df ,spectrum_state,formula_state,structure_state,formula_df |
|
|
| def click_matchms_fn(state_df, progress=gr.Progress()): |
| res = id_spectrum_list(state_df['spectrum'],progress,is_deepmass=False) |
| |
| state_df['Identified Spectrum'] = res |
| |
| annotation = res[0].metadata['annotation'] |
| formula_list = np.unique(annotation['MolecularFormula']) |
| formula_df = pd.DataFrame({ |
| 'Formula':formula_list |
| }) |
|
|
| spectrum_state = res[0] |
| formula_state = annotation['MolecularFormula'][0] |
| structural_table = annotation.loc[annotation['MolecularFormula'] == formula_state,:] |
| structure_state = structural_table['CanonicalSMILES'][0] |
|
|
| return state_df ,spectrum_state,formula_state,structure_state,formula_df |
|
|
|
|
| def save_identification_csv(res_state): |
| file_list = [] |
| dir_path = './temp' |
| for s in res_state['Identified Spectrum']: |
| name = s.metadata['compound_name'] |
| if 'annotation' in s.metadata.keys(): |
| annotation = s.metadata['annotation'] |
| else: |
| annotation = pd.DataFrame(columns=['Title', 'MolecularFormula', 'CanonicalSMILES', 'InChIKey']) |
| path = os.path.join(dir_path,f'{name}.csv') |
| csv = annotation.to_csv(path) |
| file_list.append(path) |
| md5_obj = hashlib.md5() |
| md5_obj.update(str(file_list).encode('utf-8')) |
| md5_name = md5_obj.hexdigest() |
| zip_path = os.path.join(dir_path,f'{md5_name}.zip') |
| with ZipFile(zip_path,'w') as zip_obj: |
| for f in file_list: |
| zip_obj.write(f, compress_type=zipfile.ZIP_DEFLATED) |
| file_list.insert(0,zip_path) |
| return gr.File(file_list,visible=True) |
| |
| with gr.Blocks(title='DeepMS 2') as demo: |
| |
| |
| res_state = gr.State([]) |
| |
| spectrum_state = gr.State([]) |
| |
| formula_state = gr.State([]) |
| |
| structure_state = gr.State([]) |
|
|
|
|
| with gr.Row(elem_classes=['first_row']): |
| file_obj = gr.File(file_count = 'multiple',type='filepath', height=100) |
| download = gr.File(visible=False,interactive=False) |
| with gr.Row(elem_classes=['first_row']): |
| run_save_btn = gr.Button('Save') |
| run_deepms_btn = gr.Button('Run DeepMS', ) |
| run_matchms_btn = gr.Button('Run MatchMS') |
| |
| with gr.Row(elem_classes=['secend_row']): |
| with gr.Column(scale=1): |
| nav_obj = gr.DataFrame(headers=["name"],elem_classes=['scroll'],interactive=False, label='Navigator') |
| with gr.Column(scale=1): |
| formula_obj = gr.DataFrame(headers=["Formula"],elem_classes=['scroll'],interactive=False, label='Formula Finder') |
| with gr.Row(): |
| structure_obj = gr.DataFrame(headers=["Title","MolecularFormula","CanonicalSMILES","InChIKey","DeepMass Score"],interactive=False,elem_classes=['scroll'], label='Structure Finder') |
|
|
| with gr.Row(): |
| ref_spectrums = gr.DataFrame(label='Reference Spectrums',headers=['name','adduct','smiles','parent_mass','database'],interactive=False,height=300,column_widths='20%') |
| with gr.Row(): |
| with gr.Tab(label='Spectrum'): |
| with gr.Row(): |
| spectrum_plot_fig = gr.Plot(label='Spectrum') |
| spectrum_loss_plot_fig = gr.Plot(label='Loss') |
| with gr.Tab(label='Structure'): |
| with gr.Row(): |
| ann_structure_fig = gr.Image(label='Annotated Structure',height=200,width=200) |
| ref_structure_fig = gr.Image(label ='Reference Structure' ,height=200,width=200) |
| with gr.Tab(label='Information'): |
| information_obj = gr.DataFrame(interactive=False) |
| |
| |
|
|
| |
| file_obj.change( |
| load_files, |
| inputs=file_obj, |
| outputs=[ |
| res_state, |
| nav_obj, |
| ] |
| ) |
|
|
| nav_obj.select( |
| fn=show_formula, |
| inputs=[res_state], |
| outputs=[spectrum_state,formula_obj] |
| ) |
| formula_obj.select( |
| fn=show_structure, |
| inputs=[spectrum_state,], |
| outputs=[formula_state,structure_obj], |
| ) |
| structure_obj.select( |
| fn=show_ref_spectrums, |
| inputs=[spectrum_state,structure_obj], |
| outputs=[ref_spectrums,structure_state] |
| ) |
| |
| run_deepms_btn.click( |
| fn=deepms_click_fn, |
| inputs=[res_state], |
| outputs=[ |
| res_state, |
| spectrum_state, |
| formula_state, |
| structure_state, |
| formula_obj |
| ] |
| ) |
| run_matchms_btn.click( |
| fn=click_matchms_fn, |
| inputs=[res_state], |
| outputs=[ |
| res_state, |
| spectrum_state, |
| formula_state, |
| structure_state, |
| formula_obj |
| ] |
| ) |
|
|
| ref_spectrums.select( |
| fn=show_ref_spectrum, |
| inputs=[spectrum_state], |
| outputs=[ |
| spectrum_loss_plot_fig, |
| spectrum_plot_fig, |
| ] |
| ) |
|
|
| ref_spectrums.select( |
| fn=show_mol, |
| inputs=[structure_state,spectrum_state], |
| outputs=[ |
| ann_structure_fig, |
| ref_structure_fig, |
|
|
| ] |
| ) |
| ref_spectrums.select( |
| fn=show_info, |
| inputs=[spectrum_state], |
| outputs=[ |
| information_obj |
| ] |
| ) |
| run_save_btn.click( |
| fn=save_identification_csv, |
| inputs=[res_state], |
| outputs=[download] |
| ) |
| |
| if __name__ == '__main__': |
| print('Starting Webui!!!!') |
| demo.launch() |
| print('Started Webui!!!!') |
|
|
|
|
|
|
|
|