from __future__ import annotations import sqlite3 import gradio as gr from infer import ModelInference from model import ModelCLR from matchms.importing import load_from_mgf,load_from_msp import matchms.filtering as msfilters import numpy as np from rdkit.Chem import Draw from rdkit.Chem.Descriptors import ExactMolWt from rdkit import Chem import torch import yaml import pickle import subprocess import pandas as pd from sklearn.metrics.pairwise import cosine_similarity import tempfile import shutil import os import matplotlib.pyplot as plt import gradio as gr from typing import Iterable from gradio.themes.base import Base from gradio.themes.utils import colors, fonts, sizes from huggingface_hub import hf_hub_download import time import concurrent.futures class Seafoam(Base): def __init__( self, *, primary_hue: colors.Color | str = colors.emerald, secondary_hue: colors.Color | str = colors.blue, neutral_hue: colors.Color | str = colors.blue, spacing_size: sizes.Size | str = sizes.spacing_md, radius_size: sizes.Size | str = sizes.radius_md, text_size: sizes.Size | str = sizes.text_lg, font: fonts.Font | str | Iterable[fonts.Font | str] = ( fonts.GoogleFont("Quicksand"), "ui-sans-serif", "sans-serif", ), font_mono: fonts.Font | str | Iterable[fonts.Font | str] = ( fonts.GoogleFont("IBM Plex Mono"), "ui-monospace", "monospace", ), ): super().__init__( primary_hue=primary_hue, secondary_hue=secondary_hue, neutral_hue=neutral_hue, spacing_size=spacing_size, radius_size=radius_size, text_size=text_size, font=font, font_mono=font_mono, ) super().set( #body_background_fill="repeating-linear-gradient(45deg, *primary_200, *primary_200 10px, *primary_50 10px, *primary_50 20px)", body_background_fill_dark="repeating-linear-gradient(45deg, *primary_800, *primary_800 10px, *primary_900 10px, *primary_900 20px)", button_primary_background_fill="linear-gradient(90deg, *primary_300, *secondary_400)", button_primary_background_fill_hover="linear-gradient(90deg, *primary_200, *secondary_300)", button_primary_text_color="white", button_primary_background_fill_dark="linear-gradient(90deg, *primary_600, *secondary_800)", slider_color="*secondary_300", slider_color_dark="*secondary_600", block_title_text_weight="600", block_border_width="3px", block_shadow="*shadow_drop_lg", button_large_padding="17px", body_text_color="#000000", ) seafoam = Seafoam() custom_css = """ """ def spectrum_processing(s): """This is how one would typically design a desired pre- and post- processing pipeline.""" s = msfilters.normalize_intensities(s) s = msfilters.select_by_mz(s, mz_from=0, mz_to=1500) return s def draw_mass_spectrum(peak_data_path): ms2 = list(load_from_msp(peak_data_path.name))[0] ms2 = spectrum_processing(ms2) Mz = np.array(ms2.mz) Intens = np.array(ms2.intensities) plt.figure(figsize=(6,3)) for i in range(len(Mz)): plt.axvline(x=Mz[i], ymin=0, ymax=Intens[i],c='red') plt.xlabel("m/z") plt.ylabel("Intensity") plt.title("Mass Spectrum") return plt def search_structure_from_mass(structureDB,mass, ppm): structures=pd.DataFrame() mmin = mass - mass*ppm/10**6 mmax = mass + mass*ppm/10**6 structures = structureDB[(structureDB['MonoisotopicMass'] >= mmin) & (structureDB['MonoisotopicMass'] <= mmax)] return structures conn = None def initialize_db(): global conn if conn is None: dataset_repo = "Tingxie/CSU-MS2-DB" db_filename = "csu_ms2_db.db" token = os.getenv("HF_TOKEN") print("Starting large file download and DB connection...") db_path = hf_hub_download(repo_id=dataset_repo, filename=db_filename, repo_type="dataset", token=token) conn = sqlite3.connect(db_path, check_same_thread=False) print("DB initialization complete.") return conn #dataset_repo = "Tingxie/CSU-MS2-DB" #db_filename = "csu_ms2_db.db" #token = os.getenv("HF_TOKEN") #db_path = hf_hub_download(repo_id=dataset_repo, filename=db_filename, repo_type="dataset", token=token) #conn = sqlite3.connect(db_path, check_same_thread=False) device='cpu' pretrain_model_path_low,pretrain_model_path_median,pretrain_model_path_high='model/low_energy/checkpoints/model.pth','model/median_energy/checkpoints/model.pth','model/high_energy/checkpoints/model.pth' config_path = "model/low_energy/checkpoints/config.yaml" config = yaml.load(open(config_path, "r"), Loader=yaml.FullLoader) model_low = ModelCLR(**config["model_config"]).to(device) model_median = ModelCLR(**config["model_config"]).to(device) model_high = ModelCLR(**config["model_config"]).to(device) state_dict_low = torch.load(pretrain_model_path_low, map_location=device) state_dict_median = torch.load(pretrain_model_path_median, map_location=device) state_dict_high = torch.load(pretrain_model_path_high, map_location=device) model_low.load_state_dict(state_dict_low) model_low.eval() model_median.load_state_dict(state_dict_median) model_median.eval() model_high.load_state_dict(state_dict_high) model_high.eval() def generate_file(file_obj): global tmpdir shutil.copy(file_obj.name, tmpdir) FileName=os.path.basename(file_obj.name) NewfilePath=os.path.join(tmpdir,FileName) return NewfilePath def MS2Embedding(spectra): spec_mzs = [spec.mz for spec in [spectra]] spec_intens = [spec.intensities for spec in [spectra]] num_peaks = [len(i) for i in spec_mzs] spec_mzs = [np.around(spec_mz, decimals=4) for spec_mz in spec_mzs] if len(spec_mzs[0]) > 300: spec_mzs = [spec_mzs[0][-300:]] spec_intens = [spec_intens[0][-300:]] num_peaks=[300] else: spec_mzs = [np.pad(spec_mz, (0, 300 - len(spec_mz)), mode='constant', constant_values=0) for spec_mz in spec_mzs] spec_intens = [np.pad(spec_inten, (0, 300 - len(spec_inten)), mode='constant', constant_values=0) for spec_inten in spec_intens] spec_mzs= torch.tensor(spec_mzs).float() spec_intens= torch.tensor(spec_intens).float() num_peaks = torch.LongTensor(num_peaks) spec_tensor_low,spec_mask_low = model_low.ms_encoder(spec_mzs,spec_intens,num_peaks) spec_tensor_low=model_low.spec_esa(spec_tensor_low,spec_mask_low) spec_tensor_low = model_low.spec_proj(spec_tensor_low) spec_tensor_low = spec_tensor_low/spec_tensor_low.norm(dim=-1, keepdim=True) spec_tensor_median,spec_mask_median = model_median.ms_encoder(spec_mzs,spec_intens,num_peaks) spec_tensor_median=model_median.spec_esa(spec_tensor_median,spec_mask_median) spec_tensor_median = model_median.spec_proj(spec_tensor_median) spec_tensor_median = spec_tensor_median/spec_tensor_median.norm(dim=-1, keepdim=True) spec_tensor_high,spec_mask_high = model_high.ms_encoder(spec_mzs,spec_intens,num_peaks) spec_tensor_high=model_high.spec_esa(spec_tensor_high,spec_mask_high) spec_tensor_high = model_high.spec_proj(spec_tensor_high) spec_tensor_high = spec_tensor_high/spec_tensor_high.norm(dim=-1, keepdim=True) return np.array(spec_tensor_low.tolist()[0]),np.array(spec_tensor_median.tolist()[0]),np.array(spec_tensor_high.tolist()[0]) def calculate_cosine_similarity(vector1, vector2): return cosine_similarity(vector1.reshape(1, -1), vector2.reshape(1, -1))[0][0] def retrieve_similarity_scores( conn_obj, table_name, target_mass,collision_energy, ms2_embedding_low, ms2_embedding_median, ms2_embedding_high): cur = conn_obj.cursor() if table_name == "CSU_MS2_DB": table_name = 'ConSSDB' if table_name == "BloodExp: blood exposome database": table_name = 'BloodexpDB' if table_name == "ChEBI: products of nature or synthetic products database": table_name = 'ChebiDB' if table_name == "ChemFOnt: Biochemical database including primary metabolites, secondary metabolites, natural products, etc": table_name = 'ChemfontDB' if table_name == "ContaminantDB: Contaminant data from different online references and databases on contaminants": table_name = 'ContaminantdbDB' if table_name == "DrugBank: drug biochemical and pharmacological information database": table_name = 'DrugbankDB' if table_name == "ECMDB: database of small molecule metabolites found in or produced by Escherichia coli": table_name = 'EcmdbDB' if table_name == "Exposome-Explorer: biomarkers of exposure to environmental risk factors for diseases": table_name = 'ExposomeDB' if table_name == "Foodb: food constituent database": table_name = 'FoodbDB' if table_name == "HMDB: human metabolome database": table_name = 'HmdbDB' if table_name == "KEGG: a collection of small molecules, biopolymers, and other chemical substances": table_name = 'KeggDB' if table_name == "KNApSAcK: integrated metabolite-plant species database": table_name = 'KnapsackDB' if table_name == "MCDB: small molecule metabolites found in cow milk": table_name = 'MilkDB' if table_name == "MiMeDB: taxonomic, microbiological, and body-site location data on most known human microbes": table_name = 'MimedbDB' if table_name == "NANPDB: database of natural products isolated from native organisms of Northern Africa": table_name = 'NanpdbDB' if table_name == "NPAtlas: natural products atlas database": table_name = 'NpatlasDB' if table_name == "Phenol-Explorer: Polyphenols": table_name = 'PhenolDB' if table_name == "PMHub: plant metabolite database": table_name = 'PmhubDB' if table_name == "PMN: plant metabolite database": table_name = 'PmnDB' if table_name == "SMPDB: small molecule pathway database": table_name = 'SmpdbDB' if table_name == "STOFF-IDENT: database of water relevant substances": table_name = 'StoffDB' if table_name == "T3DB: toxic exposome database": table_name = 'T3dbDB' if table_name == "TCMSP: traditional chinese medicine systems pharmacology database": table_name = 'TcmspDB' if table_name == "YMDB: yeast metabolome database": table_name = 'YmdbDB' target_mass = target_mass-1.008 tolerance = target_mass * 20 / 1000000 query = f""" SELECT SMILES FROM {table_name} WHERE MonoisotopicMass >= ? - ? AND MonoisotopicMass <= ? + ? """ cur.execute(query, (target_mass, tolerance, target_mass, tolerance)) filtered_smiles = cur.fetchall() similarity_scores = [] for smile in filtered_smiles: query = f""" SELECT low_energy_embedding, median_energy_embedding, high_energy_embedding FROM {table_name} WHERE SMILES = ? """ cur.execute(query, (smile[0],)) row = cur.fetchone() if row is None: return None low_energy_embedding_db = np.array(pickle.loads(row[0]), dtype=np.float64) median_energy_embedding_db = np.array(pickle.loads(row[1]), dtype=np.float64) high_energy_embedding_db = np.array(pickle.loads(row[2]), dtype=np.float64) low_energy_embedding_db,median_energy_embedding_db,high_energy_embedding_db = torch.tensor(low_energy_embedding_db).float(),torch.tensor(median_energy_embedding_db).float(),torch.tensor(high_energy_embedding_db).float() low_similarity =(ms2_embedding_low @ low_energy_embedding_db.t()).item() median_similarity = (ms2_embedding_median @ median_energy_embedding_db.t()).item() high_similarity = (ms2_embedding_high @ high_energy_embedding_db.t()).item() ''' low_similarity = calculate_cosine_similarity(ms2_embedding_low, low_energy_embedding_db) median_similarity = calculate_cosine_similarity(ms2_embedding_median, median_energy_embedding_db) high_similarity = calculate_cosine_similarity(ms2_embedding_high, high_energy_embedding_db)''' similarity_scores.append((smile, low_similarity, median_similarity, high_similarity)) weighted_similarity_scores = [] for smile, low_similarity, median_similarity, high_similarity in similarity_scores: if collision_energy <=15: weighted_similarity = 0.4 * low_similarity + 0.3 * median_similarity + 0.3 * high_similarity weighted_similarity_scores.append((smile, weighted_similarity)) elif collision_energy >15 and collision_energy <= 25: weighted_similarity = 0.3 * low_similarity + 0.4 * median_similarity + 0.3 * high_similarity weighted_similarity_scores.append((smile, weighted_similarity)) elif collision_energy > 25: weighted_similarity = 0.2 * low_similarity + 0.3 * median_similarity + 0.5 * high_similarity weighted_similarity_scores.append((smile, weighted_similarity)) weighted_similarity_scores.sort(key=lambda x: x[1], reverse=True) top_10_smiles = weighted_similarity_scores[:10] cur.close() #conn.close() return top_10_smiles def get_topK_result(library,ms_feature, smiles_feature, topK): if topK >= len(library): topK = len(library) with torch.no_grad(): ms_smiles_distances_tmp = ( ms_feature.unsqueeze(0) @ smiles_feature.t()).cpu() scores_, indices_ = ms_smiles_distances_tmp.topk(topK, dim=1, largest=True, sorted=True) candidates=[library[i] for i in indices_.tolist()[0]] indices=indices_.tolist()[0] scores=scores_.tolist()[0] return indices, scores, candidates def rank_lib(conn_obj, database_name,spectrum_path,instrument_type,adduct,parent_Mass,collision_energy): ms2 = list(load_from_msp(spectrum_path.name))[0] ms2 = spectrum_processing(ms2) collision_energy=float(collision_energy) parent_Mass=float(parent_Mass) ms2_embedding_low,ms2_embedding_median,ms2_embedding_high = MS2Embedding(ms2) ms2_embedding_low,ms2_embedding_median,ms2_embedding_high = torch.tensor(ms2_embedding_low).float(),torch.tensor(ms2_embedding_median).float(),torch.tensor(ms2_embedding_high).float() top_10_smiles = retrieve_similarity_scores(conn_obj, database_name,parent_Mass,collision_energy,ms2_embedding_low,ms2_embedding_median,ms2_embedding_high) smis = [x[0][0] for x in top_10_smiles] scores = [x[1] for x in top_10_smiles] images,image_descrips=[],[] bw_draw_options = Draw.MolDrawOptions() bw_draw_options.useBWAtomPalette() for smi in smis: try: mol = Chem.MolFromSmiles(smi) images.append(Draw.MolToImage(mol, options=bw_draw_options)) except: images.append('NAN') for i in range(len(smis)): image_descrips.append((images[i],'SMILES: '+smis[i]+' ' + 'Score: '+str(scores[i]))) #top_10_results = pd.DataFrame({'SMILES':[x[0] for x in top_10_smiles],'Struture':images,'Score':[x[1] for x in top_10_smiles],'Rank':list(range(10))}) return image_descrips def rank_user_lib(candidate_file,spectrum_path,instrument_type,adduct,parent_Mass,collision_energy): model_inference_low = ModelInference(config_path=config_path, pretrain_model_path=pretrain_model_path_low, device="cpu") model_inference_median = ModelInference(config_path=config_path, pretrain_model_path=pretrain_model_path_median, device="cpu") model_inference_high = ModelInference(config_path=config_path, pretrain_model_path=pretrain_model_path_high, device="cpu") collision_energy=float(collision_energy) users_candidate = pd.read_csv(candidate_file.name) user_candidate_smiles = list(users_candidate['SMILES']) candidate = pd.DataFrame(columns=['SMILES','MonoisotopicMass']) for smi in user_candidate_smiles: mol = Chem.MolFromSmiles(smi) MonoisotopicMass = ExactMolWt(mol) candidate.loc[len(candidate.index)] = [smi,MonoisotopicMass] parent_Mass=float(parent_Mass) query_ms=parent_Mass-1.008 sub_reference_library = search_structure_from_mass(candidate,query_ms,10) if len(sub_reference_library) == 0: sub_reference_library = search_structure_from_mass(candidate,query_ms,30) if len(sub_reference_library) == 0: sub_reference_library = candidate.copy() candidate_smiles = list(sub_reference_library['SMILES']) ms2 = list(load_from_msp(spectrum_path.name))[0] ms2 = spectrum_processing(ms2) ms2_embedding_low,ms2_embedding_median,ms2_embedding_high = MS2Embedding(ms2) ms2_embedding_low,ms2_embedding_median,ms2_embedding_high = torch.tensor(ms2_embedding_low).float(),torch.tensor(ms2_embedding_median).float(),torch.tensor(ms2_embedding_high).float() contexts = [] for i in range(0, len(candidate_smiles),64): contexts.append(candidate_smiles[i:i + 64]) result_low = [model_inference_low.smiles_encode(i).cpu() for i in contexts] result_low = torch.cat(result_low, 0) result_median = [model_inference_median.smiles_encode(i).cpu() for i in contexts] result_median = torch.cat(result_median, 0) result_high = [model_inference_high.smiles_encode(i).cpu() for i in contexts] result_high = torch.cat(result_high, 0) low_similarity = ms2_embedding_low @ result_low.t() median_similarity = ms2_embedding_median @ result_median.t() high_similarity = ms2_embedding_high @ result_high.t() low_similarity = low_similarity.numpy() median_similarity = median_similarity.numpy() high_similarity = high_similarity.numpy() if collision_energy <=15: weighted_similarity = 0.4 * low_similarity + 0.3 * median_similarity + 0.3 * high_similarity elif collision_energy >15 and collision_energy <= 25: weighted_similarity = 0.3 * low_similarity + 0.4 * median_similarity + 0.3 * high_similarity elif collision_energy > 25: weighted_similarity = 0.2 * low_similarity + 0.3 * median_similarity + 0.5 * high_similarity weighted_similarity_scores=[(candidate_smiles[i],weighted_similarity[i]) for i in range(len(candidate_smiles))] weighted_similarity_scores.sort(key=lambda x: x[1], reverse=True) top_10_smiles = weighted_similarity_scores[:10] smis = [x[0] for x in top_10_smiles] scores = [x[1] for x in top_10_smiles] images,image_descrips=[],[] bw_draw_options = Draw.MolDrawOptions() bw_draw_options.useBWAtomPalette() for smi in smis: try: mol = Chem.MolFromSmiles(smi) images.append(Draw.MolToImage(mol, options=bw_draw_options)) except: images.append('NAN') for i in range(len(smis)): image_descrips.append((images[i],'SMILES: '+smis[i]+' ' + 'Score: '+str(scores[i]))) return image_descrips with gr.Blocks(theme=seafoam) as demo: gr.HTML(custom_css) gr.Markdown('
🔍 CSU-MS2 web server
') db_conn_state = gr.State(None) with gr.Row(): with gr.Column(): peak_data = gr.File(file_count="single", label="Upload MS/MS spectrum file in .msp format", elem_classes=".file-upload-height") spectrum_output = gr.Plot(label="Mass Spectrum",elem_id="custom_plot") peak_data.change(fn=draw_mass_spectrum, inputs=[peak_data], outputs=[spectrum_output]) with gr.Row(): instru=gr.Dropdown(["HCD"], label="Instrument Type") ionmode=gr.Dropdown(["[M+H]+"], label="Adduct Type") par_ion_mass=gr.Textbox(label="Parent Ion Mass",placeholder="e.g., 180.00") collision_e=gr.Textbox(label="collision energy", placeholder="e.g., 40") gr.Examples( examples=[ ["example_spectrum_searched_csu-ms2-db.msp", "HCD", "[M+H]+", "336.1735", "40"] ], inputs=[peak_data, instru, ionmode, par_ion_mass, collision_e], outputs=[spectrum_output], label="Upload Example Spectrum" ) with gr.Tab(label="📶 Struture library", elem_id='custom_tab'): with gr.Column(): dataset = gr.Dropdown(["CSU_MS2_DB", "BloodExp: blood exposome database", "ChEBI: products of nature or synthetic products database", "ChemFOnt: Biochemical database including primary metabolites, secondary metabolites, natural products, etc", "ContaminantDB: Contaminant data from different online references and databases on contaminants", "DrugBank: drug biochemical and pharmacological information database", "ECMDB: database of small molecule metabolites found in or produced by Escherichia coli", "Exposome-Explorer: biomarkers of exposure to environmental risk factors for diseases", "Foodb: food constituent database", "HMDB: human metabolome database", "KEGG: a collection of small molecules, biopolymers, and other chemical substances", "KNApSAcK: integrated metabolite-plant species database", "MCDB: small molecule metabolites found in cow milk", "MiMeDB: taxonomic, microbiological, and body-site location data on most known human microbes", "NANPDB: database of natural products isolated from native organisms of Northern Africa", "NPAtlas: natural products atlas database", "Phenol-Explorer: Polyphenols", "PMHub: plant metabolite database", "PMN: plant metabolite database", "SMPDB: small molecule pathway database", "STOFF-IDENT: database of water relevant substances", "T3DB: toxic exposome database", "TCMSP: traditional chinese medicine systems pharmacology database", "YMDB: yeast metabolome database"], label="Choose a structure library") with gr.Column(): lib_button = gr.Button("Cross-Modal Retrieval") lib_output = gr.Gallery(height='auto',columns=4,elem_classes="gallery-height",label='Cross-modal retrieval results') with gr.Tab("📁 Upload structure file"): with gr.Row(): with gr.Column(): user_dataset= gr.File(file_count="single", label="Upload the candidate structure file in csv format, columns=['SMIELS']",elem_classes="file-upload-height2") gr.Examples( examples=[ ["user-defined structure file for spectrum1.csv"] ], inputs=[user_dataset], label="Upload Example structural file" ) with gr.Column(): user_button = gr.Button("Cross-Modal Retrieval") user_output = gr.Gallery(height='auto',columns=4,elem_classes="gallery-height",label='Cross-modal retrieval results') demo.load(fn=initialize_db, inputs=None, outputs=db_conn_state, queue=True, show_progress="full") lib_button.click(rank_lib, inputs=[db_conn_state, dataset,peak_data,instru,ionmode,par_ion_mass,collision_e], outputs=lib_output) user_button.click(rank_user_lib, inputs=[user_dataset,peak_data,instru,ionmode,par_ion_mass,collision_e], outputs=user_output) demo.launch(share=True)